Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "time"
9
10 comatproto "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "gorm.io/driver/sqlite"
14 "gorm.io/gorm"
15 "gorm.io/plugin/prometheus"
16 "stream.place/streamplace/pkg/config"
17 "stream.place/streamplace/pkg/log"
18 "stream.place/streamplace/pkg/streamplace"
19)
20
21type DBModel struct {
22 DB *gorm.DB
23}
24
25type Model interface {
26 CreatePlayerEvent(event PlayerEventAPI) error
27 ListPlayerEvents(playerID string) ([]PlayerEvent, error)
28 PlayerReport(playerID string) (map[string]any, error)
29 ClearPlayerEvents() error
30
31 GetIdentity(id string) (*Identity, error)
32 UpdateIdentity(ident *Identity) error
33
34 GetRepo(did string) (*Repo, error)
35 GetRepoByHandle(handle string) (*Repo, error)
36 GetRepoByHandleOrDID(arg string) (*Repo, error)
37 GetRepoBySigningKey(signingKey string) (*Repo, error)
38 GetAllRepos() ([]Repo, error)
39 SearchReposByHandle(query string, limit int) ([]Repo, error)
40 UpdateRepo(repo *Repo) error
41
42 UpdateSigningKey(key *SigningKey) error
43 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error)
44 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error)
45 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error)
46
47 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error
48 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error)
49 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error)
50 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error)
51 DeleteFollow(ctx context.Context, userDID, rev string) error
52
53 CreateFeedPost(ctx context.Context, post *FeedPost) error
54 ListFeedPosts() ([]FeedPost, error)
55 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error)
56 GetFeedPost(uri string) (*FeedPost, error)
57 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error)
58
59 CreateLivestream(ctx context.Context, ls *Livestream) error
60 GetLivestream(uri string) (*Livestream, error)
61 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error)
62 GetLivestreamByPostURI(postURI string) (*Livestream, error)
63 GetLatestLivestreams(limit int, before *time.Time, dids []string) ([]Livestream, error)
64
65 CreateTeleport(ctx context.Context, tp *Teleport) error
66 GetLatestTeleportForRepo(repoDID string) (*Teleport, error)
67 GetActiveTeleportsForRepo(repoDID string) ([]Teleport, error)
68 GetActiveTeleportsToRepo(targetDID string) ([]Teleport, error)
69 GetTeleportByURI(uri string) (*Teleport, error)
70 DeleteTeleport(ctx context.Context, uri string) error
71 DenyTeleport(ctx context.Context, uri string) error
72
73 CreateBlock(ctx context.Context, block *Block) error
74 GetBlock(ctx context.Context, rkey string) (*Block, error)
75 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error)
76 DeleteBlock(ctx context.Context, rkey string) error
77
78 CreateChatMessage(ctx context.Context, message *ChatMessage) error
79 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error)
80 GetChatMessage(uri string) (*ChatMessage, error)
81 DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error
82
83 CreateGate(ctx context.Context, gate *Gate) error
84 DeleteGate(ctx context.Context, rkey string) error
85 GetGate(ctx context.Context, rkey string) (*Gate, error)
86 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error)
87
88 CreatePinnedRecord(ctx context.Context, pin *PinnedRecord) error
89 DeletePinnedRecord(ctx context.Context, uri string) error
90 DeleteAllPinnedRecords(ctx context.Context, streamerDID string) error
91 GetPinnedRecord(ctx context.Context, uri string) (*PinnedRecord, error)
92 GetActivePinnedRecord(ctx context.Context, streamerDID string) (*PinnedRecord, error)
93
94 CreateChatProfile(ctx context.Context, profile *ChatProfile) error
95 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error)
96
97 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error
98 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error)
99 DeleteServerSettings(ctx context.Context, server string, repoDID string) error
100
101 CreateLabeler(did string) (*Labeler, error)
102 GetLabeler(did string) (*Labeler, error)
103 UpdateLabelerCursor(did string, cursor int64) error
104
105 CreateLabel(label *Label) error
106 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error)
107
108 UpdateBroadcastOrigin(ctx context.Context, origin *streamplace.BroadcastOrigin, aturi syntax.ATURI) error
109 GetRecentBroadcastOrigins(ctx context.Context) ([]*streamplace.BroadcastDefs_BroadcastOriginView, error)
110
111 CreateMetadataConfiguration(ctx context.Context, metadata *MetadataConfiguration) error
112 GetMetadataConfiguration(ctx context.Context, repoDID string) (*MetadataConfiguration, error)
113 DeleteMetadataConfiguration(ctx context.Context, repoDID string) error
114
115 CreateModerationDelegation(ctx context.Context, rec *streamplace.ModerationPermission, aturi syntax.ATURI) error
116 DeleteModerationDelegation(ctx context.Context, rkey string) error
117 GetModerationDelegation(ctx context.Context, streamerDID, moderatorDID string) (*streamplace.ModerationDefs_PermissionView, error)
118 GetModerationDelegations(ctx context.Context, streamerDID, moderatorDID string) ([]*streamplace.ModerationDefs_PermissionView, error)
119 GetModeratorDelegations(ctx context.Context, moderatorDID string) ([]*streamplace.ModerationDefs_PermissionView, error)
120 GetStreamerModerators(ctx context.Context, streamerDID string) ([]*streamplace.ModerationDefs_PermissionView, error)
121
122 GetRecommendation(userDID string) (*Recommendation, error)
123 UpsertRecommendation(rec *Recommendation) error
124
125 UpsertBskyProfile(ctx context.Context, aturi syntax.ATURI, profileBs []byte, wasStreamplace bool) error
126 GetBskyProfile(ctx context.Context, did string, wasStreamplace bool) (*bsky.ActorProfile, error)
127}
128
129var DBRevision = 2
130
131func MakeDB(dbURL string) (Model, error) {
132 sqliteSuffix := dbURL
133 if dbURL != ":memory:" {
134 // Ensure dbURL exists as a directory on the filesystem
135 if err := os.MkdirAll(dbURL, os.ModePerm); err != nil {
136 return nil, fmt.Errorf("error creating database directory: %w", err)
137 }
138 dbPath := filepath.Join(dbURL, fmt.Sprintf("index_%d.sqlite", DBRevision))
139 sqliteSuffix = dbPath
140 // if this isn't ":memory:", ensure that directory exists (eg, if db
141 // file is being initialized)
142 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil {
143 return nil, fmt.Errorf("error creating database path: %w", err)
144 }
145 }
146 log.Log(context.Background(), "starting database", "dbURL", sqliteSuffix)
147 dial := sqlite.Open(sqliteSuffix)
148
149 db, err := gorm.Open(dial, &gorm.Config{
150 SkipDefaultTransaction: true,
151 TranslateError: true,
152 Logger: config.GormLogger,
153 })
154 if err != nil {
155 return nil, fmt.Errorf("error starting database: %w", err)
156 }
157 err = db.Exec("PRAGMA journal_mode=WAL;").Error
158 if err != nil {
159 return nil, fmt.Errorf("error setting journal mode: %w", err)
160 }
161
162 err = db.Use(prometheus.New(prometheus.Config{
163 DBName: "index",
164 RefreshInterval: 10,
165 StartServer: false,
166 }))
167 if err != nil {
168 return nil, fmt.Errorf("error using prometheus plugin: %w", err)
169 }
170
171 sqlDB, err := db.DB()
172 if err != nil {
173 return nil, fmt.Errorf("error getting database: %w", err)
174 }
175 sqlDB.SetMaxOpenConns(1)
176 for _, model := range []any{
177 PlayerEvent{},
178 Identity{},
179 Repo{},
180 SigningKey{},
181 Follow{},
182 FeedPost{},
183 Livestream{},
184 Block{},
185 ChatMessage{},
186 ChatProfile{},
187 Gate{},
188 PinnedRecord{},
189 ServerSettings{},
190 Labeler{},
191 Label{},
192 BroadcastOrigin{},
193 MetadataConfiguration{},
194 Teleport{},
195 ModerationDelegation{},
196 Recommendation{},
197 BskyProfile{},
198 } {
199 err = db.AutoMigrate(model)
200 if err != nil {
201 return nil, err
202 }
203 }
204 return &DBModel{DB: db}, nil
205}