Live video on the AT Protocol
at eli/lexicon-dev 205 lines 7.9 kB view raw
1package model 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "path/filepath" 8 "time" 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "gorm.io/driver/sqlite" 14 "gorm.io/gorm" 15 "gorm.io/plugin/prometheus" 16 "stream.place/streamplace/pkg/config" 17 "stream.place/streamplace/pkg/log" 18 "stream.place/streamplace/pkg/streamplace" 19) 20 21type DBModel struct { 22 DB *gorm.DB 23} 24 25type Model interface { 26 CreatePlayerEvent(event PlayerEventAPI) error 27 ListPlayerEvents(playerID string) ([]PlayerEvent, error) 28 PlayerReport(playerID string) (map[string]any, error) 29 ClearPlayerEvents() error 30 31 GetIdentity(id string) (*Identity, error) 32 UpdateIdentity(ident *Identity) error 33 34 GetRepo(did string) (*Repo, error) 35 GetRepoByHandle(handle string) (*Repo, error) 36 GetRepoByHandleOrDID(arg string) (*Repo, error) 37 GetRepoBySigningKey(signingKey string) (*Repo, error) 38 GetAllRepos() ([]Repo, error) 39 SearchReposByHandle(query string, limit int) ([]Repo, error) 40 UpdateRepo(repo *Repo) error 41 42 UpdateSigningKey(key *SigningKey) error 43 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error) 44 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error) 45 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error) 46 47 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error 48 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error) 49 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error) 50 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error) 51 DeleteFollow(ctx context.Context, userDID, rev string) error 52 53 CreateFeedPost(ctx context.Context, post *FeedPost) error 54 ListFeedPosts() ([]FeedPost, error) 55 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error) 56 GetFeedPost(uri string) (*FeedPost, error) 57 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error) 58 59 CreateLivestream(ctx context.Context, ls *Livestream) error 60 GetLivestream(uri string) (*Livestream, error) 61 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) 62 GetLivestreamByPostURI(postURI string) (*Livestream, error) 63 GetLatestLivestreams(limit int, before *time.Time, dids []string) ([]Livestream, error) 64 65 CreateTeleport(ctx context.Context, tp *Teleport) error 66 GetLatestTeleportForRepo(repoDID string) (*Teleport, error) 67 GetActiveTeleportsForRepo(repoDID string) ([]Teleport, error) 68 GetActiveTeleportsToRepo(targetDID string) ([]Teleport, error) 69 GetTeleportByURI(uri string) (*Teleport, error) 70 DeleteTeleport(ctx context.Context, uri string) error 71 DenyTeleport(ctx context.Context, uri string) error 72 73 CreateBlock(ctx context.Context, block *Block) error 74 GetBlock(ctx context.Context, rkey string) (*Block, error) 75 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error) 76 DeleteBlock(ctx context.Context, rkey string) error 77 78 CreateChatMessage(ctx context.Context, message *ChatMessage) error 79 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) 80 GetChatMessage(uri string) (*ChatMessage, error) 81 DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error 82 83 CreateGate(ctx context.Context, gate *Gate) error 84 DeleteGate(ctx context.Context, rkey string) error 85 GetGate(ctx context.Context, rkey string) (*Gate, error) 86 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error) 87 88 CreatePinnedRecord(ctx context.Context, pin *PinnedRecord) error 89 DeletePinnedRecord(ctx context.Context, uri string) error 90 DeleteAllPinnedRecords(ctx context.Context, streamerDID string) error 91 GetPinnedRecord(ctx context.Context, uri string) (*PinnedRecord, error) 92 GetActivePinnedRecord(ctx context.Context, streamerDID string) (*PinnedRecord, error) 93 94 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 95 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 96 97 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error 98 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) 99 DeleteServerSettings(ctx context.Context, server string, repoDID string) error 100 101 CreateLabeler(did string) (*Labeler, error) 102 GetLabeler(did string) (*Labeler, error) 103 UpdateLabelerCursor(did string, cursor int64) error 104 105 CreateLabel(label *Label) error 106 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error) 107 108 UpdateBroadcastOrigin(ctx context.Context, origin *streamplace.BroadcastOrigin, aturi syntax.ATURI) error 109 GetRecentBroadcastOrigins(ctx context.Context) ([]*streamplace.BroadcastDefs_BroadcastOriginView, error) 110 111 CreateMetadataConfiguration(ctx context.Context, metadata *MetadataConfiguration) error 112 GetMetadataConfiguration(ctx context.Context, repoDID string) (*MetadataConfiguration, error) 113 DeleteMetadataConfiguration(ctx context.Context, repoDID string) error 114 115 CreateModerationDelegation(ctx context.Context, rec *streamplace.ModerationPermission, aturi syntax.ATURI) error 116 DeleteModerationDelegation(ctx context.Context, rkey string) error 117 GetModerationDelegation(ctx context.Context, streamerDID, moderatorDID string) (*streamplace.ModerationDefs_PermissionView, error) 118 GetModerationDelegations(ctx context.Context, streamerDID, moderatorDID string) ([]*streamplace.ModerationDefs_PermissionView, error) 119 GetModeratorDelegations(ctx context.Context, moderatorDID string) ([]*streamplace.ModerationDefs_PermissionView, error) 120 GetStreamerModerators(ctx context.Context, streamerDID string) ([]*streamplace.ModerationDefs_PermissionView, error) 121 122 GetRecommendation(userDID string) (*Recommendation, error) 123 UpsertRecommendation(rec *Recommendation) error 124 125 UpsertBskyProfile(ctx context.Context, aturi syntax.ATURI, profileBs []byte, wasStreamplace bool) error 126 GetBskyProfile(ctx context.Context, did string, wasStreamplace bool) (*bsky.ActorProfile, error) 127} 128 129var DBRevision = 2 130 131func MakeDB(dbURL string) (Model, error) { 132 sqliteSuffix := dbURL 133 if dbURL != ":memory:" { 134 // Ensure dbURL exists as a directory on the filesystem 135 if err := os.MkdirAll(dbURL, os.ModePerm); err != nil { 136 return nil, fmt.Errorf("error creating database directory: %w", err) 137 } 138 dbPath := filepath.Join(dbURL, fmt.Sprintf("index_%d.sqlite", DBRevision)) 139 sqliteSuffix = dbPath 140 // if this isn't ":memory:", ensure that directory exists (eg, if db 141 // file is being initialized) 142 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil { 143 return nil, fmt.Errorf("error creating database path: %w", err) 144 } 145 } 146 log.Log(context.Background(), "starting database", "dbURL", sqliteSuffix) 147 dial := sqlite.Open(sqliteSuffix) 148 149 db, err := gorm.Open(dial, &gorm.Config{ 150 SkipDefaultTransaction: true, 151 TranslateError: true, 152 Logger: config.GormLogger, 153 }) 154 if err != nil { 155 return nil, fmt.Errorf("error starting database: %w", err) 156 } 157 err = db.Exec("PRAGMA journal_mode=WAL;").Error 158 if err != nil { 159 return nil, fmt.Errorf("error setting journal mode: %w", err) 160 } 161 162 err = db.Use(prometheus.New(prometheus.Config{ 163 DBName: "index", 164 RefreshInterval: 10, 165 StartServer: false, 166 })) 167 if err != nil { 168 return nil, fmt.Errorf("error using prometheus plugin: %w", err) 169 } 170 171 sqlDB, err := db.DB() 172 if err != nil { 173 return nil, fmt.Errorf("error getting database: %w", err) 174 } 175 sqlDB.SetMaxOpenConns(1) 176 for _, model := range []any{ 177 PlayerEvent{}, 178 Identity{}, 179 Repo{}, 180 SigningKey{}, 181 Follow{}, 182 FeedPost{}, 183 Livestream{}, 184 Block{}, 185 ChatMessage{}, 186 ChatProfile{}, 187 Gate{}, 188 PinnedRecord{}, 189 ServerSettings{}, 190 Labeler{}, 191 Label{}, 192 BroadcastOrigin{}, 193 MetadataConfiguration{}, 194 Teleport{}, 195 ModerationDelegation{}, 196 Recommendation{}, 197 BskyProfile{}, 198 } { 199 err = db.AutoMigrate(model) 200 if err != nil { 201 return nil, err 202 } 203 } 204 return &DBModel{DB: db}, nil 205}