Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "strings"
9 "time"
10
11 comatproto "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/api/bsky"
13 "github.com/lmittmann/tint"
14 slogGorm "github.com/orandin/slog-gorm"
15 "github.com/streamplace/oatproxy/pkg/oatproxy"
16 "gorm.io/driver/sqlite"
17 "gorm.io/gorm"
18 "stream.place/streamplace/pkg/config"
19 "stream.place/streamplace/pkg/log"
20 "stream.place/streamplace/pkg/streamplace"
21)
22
23type DBModel struct {
24 DB *gorm.DB
25 CLI *config.CLI
26}
27
28type Model interface {
29 CreateNotification(token, repoDID string) error
30 ListNotifications() ([]Notification, error)
31
32 CreatePlayerEvent(event PlayerEventAPI) error
33 ListPlayerEvents(playerID string) ([]PlayerEvent, error)
34 PlayerReport(playerID string) (map[string]any, error)
35 ClearPlayerEvents() error
36
37 CreateSegment(segment *Segment) error
38 MostRecentSegments() ([]Segment, error)
39 LatestSegmentForUser(user string) (*Segment, error)
40 LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error)
41 CreateThumbnail(thumb *Thumbnail) error
42 LatestThumbnailForUser(user string) (*Thumbnail, error)
43 GetSegment(id string) (*Segment, error)
44 StartSegmentCleaner(ctx context.Context) error
45
46 GetIdentity(id string) (*Identity, error)
47 UpdateIdentity(ident *Identity) error
48
49 GetRepo(did string) (*Repo, error)
50 GetRepoByHandle(handle string) (*Repo, error)
51 GetRepoByHandleOrDID(arg string) (*Repo, error)
52 GetRepoBySigningKey(signingKey string) (*Repo, error)
53 GetAllRepos() ([]Repo, error)
54 UpdateRepo(repo *Repo) error
55
56 UpdateSigningKey(key *SigningKey) error
57 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error)
58 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error)
59 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error)
60
61 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error
62 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error)
63 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error)
64 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error)
65 DeleteFollow(ctx context.Context, userDID, rev string) error
66 GetFollowersNotificationTokens(userDID string) ([]string, error)
67
68 CreateFeedPost(ctx context.Context, post *FeedPost) error
69 ListFeedPosts() ([]FeedPost, error)
70 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error)
71 GetFeedPost(cid string) (*FeedPost, error)
72 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error)
73
74 CreateLivestream(ctx context.Context, ls *Livestream) error
75 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error)
76 GetLivestreamByPostCID(postCID string) (*Livestream, error)
77 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error)
78
79 CreateBlock(ctx context.Context, block *Block) error
80 GetBlock(ctx context.Context, rkey string) (*Block, error)
81 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error)
82 DeleteBlock(ctx context.Context, rkey string) error
83
84 CreateChatMessage(ctx context.Context, message *ChatMessage) error
85 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error)
86 GetChatMessage(cid string) (*ChatMessage, error)
87
88 CreateGate(ctx context.Context, gate *Gate) error
89 DeleteGate(ctx context.Context, rkey string) error
90 GetGate(ctx context.Context, rkey string) (*Gate, error)
91 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error)
92
93 CreateChatProfile(ctx context.Context, profile *ChatProfile) error
94 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error)
95
96 CreateOAuthSession(id string, session *oatproxy.OAuthSession) error
97 LoadOAuthSession(id string) (*oatproxy.OAuthSession, error)
98 UpdateOAuthSession(id string, session *oatproxy.OAuthSession) error
99 ListOAuthSessions() ([]oatproxy.OAuthSession, error)
100 GetSessionByDID(did string) (*oatproxy.OAuthSession, error)
101
102 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error
103 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error)
104 DeleteServerSettings(ctx context.Context, server string, repoDID string) error
105
106 CreateCommitEvent(commit *comatproto.SyncSubscribeRepos_Commit, signedData string) error
107 GetCommitEventsSince(repoDID string, t time.Time) ([]*XrpcStreamEvent, error)
108 GetCommitEventsSinceSeq(repoDID string, seq int64) ([]*XrpcStreamEvent, error)
109 GetMostRecentCommitEvent(repoDID string) (*XrpcStreamEvent, error)
110
111 CreateLabeler(did string) (*Labeler, error)
112 GetLabeler(did string) (*Labeler, error)
113 UpdateLabelerCursor(did string, cursor int64) error
114
115 CreateLabel(label *Label) error
116 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error)
117}
118
119func MakeDB(dbURL string) (Model, error) {
120 log.Log(context.Background(), "starting database", "dbURL", dbURL)
121 sqliteSuffix := dbURL
122 if dbURL != ":memory:" {
123 if !strings.HasPrefix(dbURL, "sqlite://") {
124 dbURL = fmt.Sprintf("sqlite://%s", dbURL)
125 }
126 sqliteSuffix := dbURL[len("sqlite://"):]
127 // if this isn't ":memory:", ensure that directory exists (eg, if db
128 // file is being initialized)
129 if !strings.Contains(sqliteSuffix, ":?") {
130 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil {
131 return nil, fmt.Errorf("error creating database path: %w", err)
132 }
133 }
134 }
135 dial := sqlite.Open(sqliteSuffix)
136
137 gormLogger := slogGorm.New(
138 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
139 TimeFormat: time.RFC3339,
140 })),
141 // slogGorm.WithTraceAll(),
142 )
143
144 db, err := gorm.Open(dial, &gorm.Config{
145 SkipDefaultTransaction: true,
146 TranslateError: true,
147 Logger: gormLogger,
148 })
149 if err != nil {
150 return nil, fmt.Errorf("error starting database: %w", err)
151 }
152 err = db.Exec("PRAGMA journal_mode=WAL;").Error
153 if err != nil {
154 return nil, fmt.Errorf("error setting journal mode: %w", err)
155 }
156 sqlDB, err := db.DB()
157 if err != nil {
158 return nil, fmt.Errorf("error getting database: %w", err)
159 }
160 sqlDB.SetMaxOpenConns(1)
161 for _, model := range []any{
162 Notification{},
163 PlayerEvent{},
164 Segment{},
165 Thumbnail{},
166 Identity{},
167 Repo{},
168 SigningKey{},
169 Follow{},
170 FeedPost{},
171 Livestream{},
172 Block{},
173 ChatMessage{},
174 ChatProfile{},
175 Gate{},
176 oatproxy.OAuthSession{},
177 ServerSettings{},
178 XrpcStreamEvent{},
179 Labeler{},
180 Label{},
181 } {
182 err = db.AutoMigrate(model)
183 if err != nil {
184 return nil, err
185 }
186 }
187 return &DBModel{DB: db}, nil
188}