Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "time"
9
10 comatproto "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "gorm.io/driver/sqlite"
14 "gorm.io/gorm"
15 "gorm.io/plugin/prometheus"
16 "stream.place/streamplace/pkg/config"
17 "stream.place/streamplace/pkg/log"
18 "stream.place/streamplace/pkg/streamplace"
19)
20
21type DBModel struct {
22 DB *gorm.DB
23}
24
25type Model interface {
26 CreatePlayerEvent(event PlayerEventAPI) error
27 ListPlayerEvents(playerID string) ([]PlayerEvent, error)
28 PlayerReport(playerID string) (map[string]any, error)
29 ClearPlayerEvents() error
30
31 CreateSegment(segment *Segment) error
32 MostRecentSegments() ([]Segment, error)
33 LatestSegmentForUser(user string) (*Segment, error)
34 LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error)
35 FilterLiveRepoDIDs(repoDIDs []string) ([]string, error)
36 CreateThumbnail(thumb *Thumbnail) error
37 LatestThumbnailForUser(user string) (*Thumbnail, error)
38 GetSegment(id string) (*Segment, error)
39 GetExpiredSegments(ctx context.Context) ([]Segment, error)
40 DeleteSegment(ctx context.Context, id string) error
41 StartSegmentCleaner(ctx context.Context) error
42 SegmentCleaner(ctx context.Context) error
43
44 GetIdentity(id string) (*Identity, error)
45 UpdateIdentity(ident *Identity) error
46
47 GetRepo(did string) (*Repo, error)
48 GetRepoByHandle(handle string) (*Repo, error)
49 GetRepoByHandleOrDID(arg string) (*Repo, error)
50 GetRepoBySigningKey(signingKey string) (*Repo, error)
51 GetAllRepos() ([]Repo, error)
52 SearchReposByHandle(query string, limit int) ([]Repo, error)
53 UpdateRepo(repo *Repo) error
54
55 UpdateSigningKey(key *SigningKey) error
56 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error)
57 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error)
58 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error)
59
60 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error
61 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error)
62 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error)
63 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error)
64 DeleteFollow(ctx context.Context, userDID, rev string) error
65
66 CreateFeedPost(ctx context.Context, post *FeedPost) error
67 ListFeedPosts() ([]FeedPost, error)
68 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error)
69 GetFeedPost(uri string) (*FeedPost, error)
70 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error)
71
72 CreateLivestream(ctx context.Context, ls *Livestream) error
73 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error)
74 GetLivestreamByPostURI(postURI string) (*Livestream, error)
75 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error)
76
77 CreateBlock(ctx context.Context, block *Block) error
78 GetBlock(ctx context.Context, rkey string) (*Block, error)
79 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error)
80 DeleteBlock(ctx context.Context, rkey string) error
81
82 CreateChatMessage(ctx context.Context, message *ChatMessage) error
83 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error)
84 GetChatMessage(uri string) (*ChatMessage, error)
85 DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error
86
87 CreateGate(ctx context.Context, gate *Gate) error
88 DeleteGate(ctx context.Context, rkey string) error
89 GetGate(ctx context.Context, rkey string) (*Gate, error)
90 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error)
91
92 CreateChatProfile(ctx context.Context, profile *ChatProfile) error
93 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error)
94
95 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error
96 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error)
97 DeleteServerSettings(ctx context.Context, server string, repoDID string) error
98
99 CreateLabeler(did string) (*Labeler, error)
100 GetLabeler(did string) (*Labeler, error)
101 UpdateLabelerCursor(did string, cursor int64) error
102
103 CreateLabel(label *Label) error
104 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error)
105
106 UpdateBroadcastOrigin(ctx context.Context, origin *streamplace.BroadcastOrigin, aturi syntax.ATURI) error
107 GetRecentBroadcastOrigins(ctx context.Context) ([]*streamplace.BroadcastDefs_BroadcastOriginView, error)
108
109 CreateMetadataConfiguration(ctx context.Context, metadata *MetadataConfiguration) error
110 GetMetadataConfiguration(ctx context.Context, repoDID string) (*MetadataConfiguration, error)
111 DeleteMetadataConfiguration(ctx context.Context, repoDID string) error
112
113 CreateModerationDelegation(ctx context.Context, rec *streamplace.ModerationPermission, aturi syntax.ATURI) error
114 DeleteModerationDelegation(ctx context.Context, rkey string) error
115 GetModerationDelegation(ctx context.Context, streamerDID, moderatorDID string) (*streamplace.ModerationDefs_PermissionView, error)
116 GetModerationDelegations(ctx context.Context, streamerDID, moderatorDID string) ([]*streamplace.ModerationDefs_PermissionView, error)
117 GetModeratorDelegations(ctx context.Context, moderatorDID string) ([]*streamplace.ModerationDefs_PermissionView, error)
118 GetStreamerModerators(ctx context.Context, streamerDID string) ([]*streamplace.ModerationDefs_PermissionView, error)
119
120 GetRecommendation(userDID string) (*Recommendation, error)
121 UpsertRecommendation(rec *Recommendation) error
122}
123
124var DBRevision = 2
125
126func MakeDB(dbURL string) (Model, error) {
127 sqliteSuffix := dbURL
128 if dbURL != ":memory:" {
129 // Ensure dbURL exists as a directory on the filesystem
130 if err := os.MkdirAll(dbURL, os.ModePerm); err != nil {
131 return nil, fmt.Errorf("error creating database directory: %w", err)
132 }
133 dbPath := filepath.Join(dbURL, fmt.Sprintf("index_%d.sqlite", DBRevision))
134 sqliteSuffix = dbPath
135 // if this isn't ":memory:", ensure that directory exists (eg, if db
136 // file is being initialized)
137 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil {
138 return nil, fmt.Errorf("error creating database path: %w", err)
139 }
140 }
141 log.Log(context.Background(), "starting database", "dbURL", sqliteSuffix)
142 dial := sqlite.Open(sqliteSuffix)
143
144 db, err := gorm.Open(dial, &gorm.Config{
145 SkipDefaultTransaction: true,
146 TranslateError: true,
147 Logger: config.GormLogger,
148 })
149 if err != nil {
150 return nil, fmt.Errorf("error starting database: %w", err)
151 }
152 err = db.Exec("PRAGMA journal_mode=WAL;").Error
153 if err != nil {
154 return nil, fmt.Errorf("error setting journal mode: %w", err)
155 }
156
157 err = db.Use(prometheus.New(prometheus.Config{
158 DBName: "index",
159 RefreshInterval: 10,
160 StartServer: false,
161 }))
162 if err != nil {
163 return nil, fmt.Errorf("error using prometheus plugin: %w", err)
164 }
165
166 sqlDB, err := db.DB()
167 if err != nil {
168 return nil, fmt.Errorf("error getting database: %w", err)
169 }
170 sqlDB.SetMaxOpenConns(1)
171 for _, model := range []any{
172 PlayerEvent{},
173 Segment{},
174 Thumbnail{},
175 Identity{},
176 Repo{},
177 SigningKey{},
178 Follow{},
179 FeedPost{},
180 Livestream{},
181 Block{},
182 ChatMessage{},
183 ChatProfile{},
184 Gate{},
185 ServerSettings{},
186 Labeler{},
187 Label{},
188 BroadcastOrigin{},
189 MetadataConfiguration{},
190 ModerationDelegation{},
191 Recommendation{},
192 } {
193 err = db.AutoMigrate(model)
194 if err != nil {
195 return nil, err
196 }
197 }
198 return &DBModel{DB: db}, nil
199}