Live video on the AT Protocol
at issue-705-2 214 lines 8.2 kB view raw
1package model 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "path/filepath" 8 "time" 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "gorm.io/driver/sqlite" 14 "gorm.io/gorm" 15 "gorm.io/plugin/prometheus" 16 "stream.place/streamplace/pkg/config" 17 "stream.place/streamplace/pkg/log" 18 "stream.place/streamplace/pkg/streamplace" 19) 20 21type DBModel struct { 22 DB *gorm.DB 23} 24 25type Model interface { 26 CreatePlayerEvent(event PlayerEventAPI) error 27 ListPlayerEvents(playerID string) ([]PlayerEvent, error) 28 PlayerReport(playerID string) (map[string]any, error) 29 ClearPlayerEvents() error 30 31 CreateSegment(segment *Segment) error 32 MostRecentSegments() ([]Segment, error) 33 LatestSegmentForUser(user string) (*Segment, error) 34 LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) 35 FilterLiveRepoDIDs(repoDIDs []string) ([]string, error) 36 CreateThumbnail(thumb *Thumbnail) error 37 LatestThumbnailForUser(user string) (*Thumbnail, error) 38 GetSegment(id string) (*Segment, error) 39 GetExpiredSegments(ctx context.Context) ([]Segment, error) 40 DeleteSegment(ctx context.Context, id string) error 41 StartSegmentCleaner(ctx context.Context) error 42 SegmentCleaner(ctx context.Context) error 43 44 GetIdentity(id string) (*Identity, error) 45 UpdateIdentity(ident *Identity) error 46 47 GetRepo(did string) (*Repo, error) 48 GetRepoByHandle(handle string) (*Repo, error) 49 GetRepoByHandleOrDID(arg string) (*Repo, error) 50 GetRepoBySigningKey(signingKey string) (*Repo, error) 51 GetAllRepos() ([]Repo, error) 52 SearchReposByHandle(query string, limit int) ([]Repo, error) 53 UpdateRepo(repo *Repo) error 54 55 UpdateSigningKey(key *SigningKey) error 56 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error) 57 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error) 58 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error) 59 60 UpdatePublisherKey(key *PublisherKey) error 61 GetPublisherKey(ctx context.Context, did, repoDID string) (*PublisherKey, error) 62 GetPublisherKeyByRKey(ctx context.Context, rkey string) (*PublisherKey, error) 63 GetPublisherKeysForRepo(repoDID string) ([]PublisherKey, error) 64 65 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error 66 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error) 67 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error) 68 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error) 69 DeleteFollow(ctx context.Context, userDID, rev string) error 70 71 CreateFeedPost(ctx context.Context, post *FeedPost) error 72 ListFeedPosts() ([]FeedPost, error) 73 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error) 74 GetFeedPost(uri string) (*FeedPost, error) 75 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error) 76 77 CreateLivestream(ctx context.Context, ls *Livestream) error 78 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) 79 GetLivestreamByPostURI(postURI string) (*Livestream, error) 80 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) 81 82 CreateTeleport(ctx context.Context, tp *Teleport) error 83 GetLatestTeleportForRepo(repoDID string) (*Teleport, error) 84 GetActiveTeleportsForRepo(repoDID string) ([]Teleport, error) 85 GetActiveTeleportsToRepo(targetDID string) ([]Teleport, error) 86 GetTeleportByURI(uri string) (*Teleport, error) 87 DeleteTeleport(ctx context.Context, uri string) error 88 DenyTeleport(ctx context.Context, uri string) error 89 90 CreateBlock(ctx context.Context, block *Block) error 91 GetBlock(ctx context.Context, rkey string) (*Block, error) 92 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error) 93 DeleteBlock(ctx context.Context, rkey string) error 94 95 CreateChatMessage(ctx context.Context, message *ChatMessage) error 96 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) 97 GetChatMessage(uri string) (*ChatMessage, error) 98 DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error 99 100 CreateGate(ctx context.Context, gate *Gate) error 101 DeleteGate(ctx context.Context, rkey string) error 102 GetGate(ctx context.Context, rkey string) (*Gate, error) 103 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error) 104 105 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 106 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 107 108 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error 109 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) 110 DeleteServerSettings(ctx context.Context, server string, repoDID string) error 111 112 CreateLabeler(did string) (*Labeler, error) 113 GetLabeler(did string) (*Labeler, error) 114 UpdateLabelerCursor(did string, cursor int64) error 115 116 CreateLabel(label *Label) error 117 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error) 118 119 UpdateBroadcastOrigin(ctx context.Context, origin *streamplace.BroadcastOrigin, aturi syntax.ATURI) error 120 GetRecentBroadcastOrigins(ctx context.Context) ([]*streamplace.BroadcastDefs_BroadcastOriginView, error) 121 122 CreateMetadataConfiguration(ctx context.Context, metadata *MetadataConfiguration) error 123 GetMetadataConfiguration(ctx context.Context, repoDID string) (*MetadataConfiguration, error) 124 DeleteMetadataConfiguration(ctx context.Context, repoDID string) error 125 126 CreateModerationDelegation(ctx context.Context, rec *streamplace.ModerationPermission, aturi syntax.ATURI) error 127 DeleteModerationDelegation(ctx context.Context, rkey string) error 128 GetModerationDelegation(ctx context.Context, streamerDID, moderatorDID string) (*streamplace.ModerationDefs_PermissionView, error) 129 GetModerationDelegations(ctx context.Context, streamerDID, moderatorDID string) ([]*streamplace.ModerationDefs_PermissionView, error) 130 GetModeratorDelegations(ctx context.Context, moderatorDID string) ([]*streamplace.ModerationDefs_PermissionView, error) 131 GetStreamerModerators(ctx context.Context, streamerDID string) ([]*streamplace.ModerationDefs_PermissionView, error) 132 133 GetRecommendation(userDID string) (*Recommendation, error) 134 UpsertRecommendation(rec *Recommendation) error 135} 136 137var DBRevision = 2 138 139func MakeDB(dbURL string) (Model, error) { 140 sqliteSuffix := dbURL 141 if dbURL != ":memory:" { 142 // Ensure dbURL exists as a directory on the filesystem 143 if err := os.MkdirAll(dbURL, os.ModePerm); err != nil { 144 return nil, fmt.Errorf("error creating database directory: %w", err) 145 } 146 dbPath := filepath.Join(dbURL, fmt.Sprintf("index_%d.sqlite", DBRevision)) 147 sqliteSuffix = dbPath 148 // if this isn't ":memory:", ensure that directory exists (eg, if db 149 // file is being initialized) 150 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil { 151 return nil, fmt.Errorf("error creating database path: %w", err) 152 } 153 } 154 log.Log(context.Background(), "starting database", "dbURL", sqliteSuffix) 155 dial := sqlite.Open(sqliteSuffix) 156 157 db, err := gorm.Open(dial, &gorm.Config{ 158 SkipDefaultTransaction: true, 159 TranslateError: true, 160 Logger: config.GormLogger, 161 }) 162 if err != nil { 163 return nil, fmt.Errorf("error starting database: %w", err) 164 } 165 err = db.Exec("PRAGMA journal_mode=WAL;").Error 166 if err != nil { 167 return nil, fmt.Errorf("error setting journal mode: %w", err) 168 } 169 170 err = db.Use(prometheus.New(prometheus.Config{ 171 DBName: "index", 172 RefreshInterval: 10, 173 StartServer: false, 174 })) 175 if err != nil { 176 return nil, fmt.Errorf("error using prometheus plugin: %w", err) 177 } 178 179 sqlDB, err := db.DB() 180 if err != nil { 181 return nil, fmt.Errorf("error getting database: %w", err) 182 } 183 sqlDB.SetMaxOpenConns(1) 184 for _, model := range []any{ 185 PlayerEvent{}, 186 Segment{}, 187 Thumbnail{}, 188 Identity{}, 189 Repo{}, 190 SigningKey{}, 191 PublisherKey{}, 192 Follow{}, 193 FeedPost{}, 194 Livestream{}, 195 Block{}, 196 ChatMessage{}, 197 ChatProfile{}, 198 Gate{}, 199 ServerSettings{}, 200 Labeler{}, 201 Label{}, 202 BroadcastOrigin{}, 203 MetadataConfiguration{}, 204 Teleport{}, 205 ModerationDelegation{}, 206 Recommendation{}, 207 } { 208 err = db.AutoMigrate(model) 209 if err != nil { 210 return nil, err 211 } 212 } 213 return &DBModel{DB: db}, nil 214}