Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/lmittmann/tint"
13 slogGorm "github.com/orandin/slog-gorm"
14 "gorm.io/driver/sqlite"
15 "gorm.io/gorm"
16 "stream.place/streamplace/pkg/config"
17 "stream.place/streamplace/pkg/log"
18 "stream.place/streamplace/pkg/oproxy"
19 "stream.place/streamplace/pkg/streamplace"
20)
21
22type DBModel struct {
23 DB *gorm.DB
24 CLI *config.CLI
25}
26
27type Model interface {
28 CreateNotification(token, repoDID string) error
29 ListNotifications() ([]Notification, error)
30
31 CreatePlayerEvent(event PlayerEventAPI) error
32 ListPlayerEvents(playerId string) ([]PlayerEvent, error)
33 PlayerReport(playerId string) (map[string]any, error)
34 ClearPlayerEvents() error
35
36 CreateSegment(segment *Segment) error
37 MostRecentSegments() ([]Segment, error)
38 LatestSegmentForUser(user string) (*Segment, error)
39 LatestSegmentsForUser(user string, limit int, before *time.Time) ([]Segment, error)
40 CreateThumbnail(thumb *Thumbnail) error
41 LatestThumbnailForUser(user string) (*Thumbnail, error)
42 GetSegment(id string) (*Segment, error)
43 StartSegmentCleaner(ctx context.Context) error
44
45 GetIdentity(id string) (*Identity, error)
46 UpdateIdentity(ident *Identity) error
47
48 GetRepo(did string) (*Repo, error)
49 GetRepoByHandle(handle string) (*Repo, error)
50 GetRepoByHandleOrDID(arg string) (*Repo, error)
51 GetRepoBySigningKey(signingKey string) (*Repo, error)
52 UpdateRepo(repo *Repo) error
53
54 UpdateSigningKey(key *SigningKey) error
55 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error)
56 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error)
57
58 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error
59 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error)
60 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error)
61 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error)
62 DeleteFollow(ctx context.Context, userDID, rev string) error
63 GetFollowersNotificationTokens(userDID string) ([]string, error)
64
65 CreateFeedPost(ctx context.Context, post *FeedPost) error
66 ListFeedPosts() ([]FeedPost, error)
67 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error)
68 GetFeedPost(cid string) (*FeedPost, error)
69 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error)
70
71 CreateLivestream(ctx context.Context, ls *Livestream) error
72 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error)
73 GetLivestreamByPostCID(postCID string) (*Livestream, error)
74
75 CreateBlock(ctx context.Context, block *Block) error
76 GetBlock(ctx context.Context, rkey string) (*Block, error)
77 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error)
78 DeleteBlock(ctx context.Context, rkey string) error
79
80 CreateChatMessage(ctx context.Context, message *ChatMessage) error
81 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error)
82 GetChatMessage(cid string) (*ChatMessage, error)
83
84 CreateChatProfile(ctx context.Context, profile *ChatProfile) error
85 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error)
86
87 CreateOAuthSession(id string, session *oproxy.OAuthSession) error
88 LoadOAuthSession(id string) (*oproxy.OAuthSession, error)
89 UpdateOAuthSession(id string, session *oproxy.OAuthSession) error
90 ListOAuthSessions() ([]oproxy.OAuthSession, error)
91 GetSessionByDID(did string) (*oproxy.OAuthSession, error)
92}
93
94func MakeDB(dbURL string) (Model, error) {
95 log.Log(context.Background(), "starting database", "dbURL", dbURL)
96 sqliteSuffix := dbURL
97 if dbURL != ":memory:" {
98 if !strings.HasPrefix(dbURL, "sqlite://") {
99 dbURL = fmt.Sprintf("sqlite://%s", dbURL)
100 }
101 sqliteSuffix := dbURL[len("sqlite://"):]
102 // if this isn't ":memory:", ensure that directory exists (eg, if db
103 // file is being initialized)
104 if !strings.Contains(sqliteSuffix, ":?") {
105 os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm)
106 }
107 }
108 dial := sqlite.Open(sqliteSuffix)
109
110 gormLogger := slogGorm.New(
111 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
112 TimeFormat: time.RFC3339,
113 })),
114 // slogGorm.WithTraceAll(),
115 )
116
117 db, err := gorm.Open(dial, &gorm.Config{
118 SkipDefaultTransaction: true,
119 TranslateError: true,
120 Logger: gormLogger,
121 })
122 if err != nil {
123 return nil, fmt.Errorf("error starting database: %w", err)
124 }
125 err = db.Exec("PRAGMA journal_mode=WAL;").Error
126 if err != nil {
127 return nil, fmt.Errorf("error setting journal mode: %w", err)
128 }
129 sqlDB, err := db.DB()
130 if err != nil {
131 return nil, fmt.Errorf("error getting database: %w", err)
132 }
133 sqlDB.SetMaxOpenConns(1)
134 for _, model := range []any{
135 Notification{},
136 PlayerEvent{},
137 Segment{},
138 Thumbnail{},
139 Identity{},
140 Repo{},
141 SigningKey{},
142 Follow{},
143 FeedPost{},
144 Livestream{},
145 Block{},
146 ChatMessage{},
147 ChatProfile{},
148 oproxy.OAuthSession{},
149 } {
150 err = db.AutoMigrate(model)
151 if err != nil {
152 return nil, err
153 }
154 }
155 return &DBModel{DB: db}, nil
156}