Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/lmittmann/tint"
13 slogGorm "github.com/orandin/slog-gorm"
14 "github.com/streamplace/oatproxy/pkg/oatproxy"
15 "gorm.io/driver/sqlite"
16 "gorm.io/gorm"
17 "stream.place/streamplace/pkg/config"
18 "stream.place/streamplace/pkg/log"
19 "stream.place/streamplace/pkg/streamplace"
20)
21
22type DBModel struct {
23 DB *gorm.DB
24 CLI *config.CLI
25}
26
27type Model interface {
28 CreateNotification(token, repoDID string) error
29 ListNotifications() ([]Notification, error)
30
31 CreatePlayerEvent(event PlayerEventAPI) error
32 ListPlayerEvents(playerID string) ([]PlayerEvent, error)
33 PlayerReport(playerID string) (map[string]any, error)
34 ClearPlayerEvents() error
35
36 CreateSegment(segment *Segment) error
37 MostRecentSegments() ([]Segment, error)
38 LatestSegmentForUser(user string) (*Segment, error)
39 LatestSegmentsForUser(user string, limit int, before *time.Time) ([]Segment, error)
40 CreateThumbnail(thumb *Thumbnail) error
41 LatestThumbnailForUser(user string) (*Thumbnail, error)
42 GetSegment(id string) (*Segment, error)
43 StartSegmentCleaner(ctx context.Context) error
44
45 GetIdentity(id string) (*Identity, error)
46 UpdateIdentity(ident *Identity) error
47
48 GetRepo(did string) (*Repo, error)
49 GetRepoByHandle(handle string) (*Repo, error)
50 GetRepoByHandleOrDID(arg string) (*Repo, error)
51 GetRepoBySigningKey(signingKey string) (*Repo, error)
52 GetAllRepos() ([]Repo, error)
53 UpdateRepo(repo *Repo) error
54
55 UpdateSigningKey(key *SigningKey) error
56 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error)
57 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error)
58 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error)
59
60 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error
61 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error)
62 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error)
63 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error)
64 DeleteFollow(ctx context.Context, userDID, rev string) error
65 GetFollowersNotificationTokens(userDID string) ([]string, error)
66
67 CreateFeedPost(ctx context.Context, post *FeedPost) error
68 ListFeedPosts() ([]FeedPost, error)
69 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error)
70 GetFeedPost(cid string) (*FeedPost, error)
71 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error)
72
73 CreateLivestream(ctx context.Context, ls *Livestream) error
74 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error)
75 GetLivestreamByPostCID(postCID string) (*Livestream, error)
76 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error)
77
78 CreateBlock(ctx context.Context, block *Block) error
79 GetBlock(ctx context.Context, rkey string) (*Block, error)
80 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error)
81 DeleteBlock(ctx context.Context, rkey string) error
82
83 CreateChatMessage(ctx context.Context, message *ChatMessage) error
84 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error)
85 GetChatMessage(cid string) (*ChatMessage, error)
86
87 CreateChatProfile(ctx context.Context, profile *ChatProfile) error
88 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error)
89
90 CreateOAuthSession(id string, session *oatproxy.OAuthSession) error
91 LoadOAuthSession(id string) (*oatproxy.OAuthSession, error)
92 UpdateOAuthSession(id string, session *oatproxy.OAuthSession) error
93 ListOAuthSessions() ([]oatproxy.OAuthSession, error)
94 GetSessionByDID(did string) (*oatproxy.OAuthSession, error)
95
96 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error
97 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error)
98 DeleteServerSettings(ctx context.Context, server string, repoDID string) error
99}
100
101func MakeDB(dbURL string) (Model, error) {
102 log.Log(context.Background(), "starting database", "dbURL", dbURL)
103 sqliteSuffix := dbURL
104 if dbURL != ":memory:" {
105 if !strings.HasPrefix(dbURL, "sqlite://") {
106 dbURL = fmt.Sprintf("sqlite://%s", dbURL)
107 }
108 sqliteSuffix := dbURL[len("sqlite://"):]
109 // if this isn't ":memory:", ensure that directory exists (eg, if db
110 // file is being initialized)
111 if !strings.Contains(sqliteSuffix, ":?") {
112 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil {
113 return nil, fmt.Errorf("error creating database path: %w", err)
114 }
115 }
116 }
117 dial := sqlite.Open(sqliteSuffix)
118
119 gormLogger := slogGorm.New(
120 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
121 TimeFormat: time.RFC3339,
122 })),
123 // slogGorm.WithTraceAll(),
124 )
125
126 db, err := gorm.Open(dial, &gorm.Config{
127 SkipDefaultTransaction: true,
128 TranslateError: true,
129 Logger: gormLogger,
130 })
131 if err != nil {
132 return nil, fmt.Errorf("error starting database: %w", err)
133 }
134 err = db.Exec("PRAGMA journal_mode=WAL;").Error
135 if err != nil {
136 return nil, fmt.Errorf("error setting journal mode: %w", err)
137 }
138 sqlDB, err := db.DB()
139 if err != nil {
140 return nil, fmt.Errorf("error getting database: %w", err)
141 }
142 sqlDB.SetMaxOpenConns(1)
143 for _, model := range []any{
144 Notification{},
145 PlayerEvent{},
146 Segment{},
147 Thumbnail{},
148 Identity{},
149 Repo{},
150 SigningKey{},
151 Follow{},
152 FeedPost{},
153 Livestream{},
154 Block{},
155 ChatMessage{},
156 ChatProfile{},
157 oatproxy.OAuthSession{},
158 ServerSettings{},
159 } {
160 err = db.AutoMigrate(model)
161 if err != nil {
162 return nil, err
163 }
164 }
165 return &DBModel{DB: db}, nil
166}