Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/lmittmann/tint"
13 slogGorm "github.com/orandin/slog-gorm"
14 "gorm.io/driver/sqlite"
15 "gorm.io/gorm"
16 "stream.place/streamplace/pkg/config"
17 "stream.place/streamplace/pkg/log"
18 "stream.place/streamplace/pkg/oproxy"
19 "stream.place/streamplace/pkg/streamplace"
20)
21
22type DBModel struct {
23 DB *gorm.DB
24 CLI *config.CLI
25}
26
27type Model interface {
28 CreateNotification(token, repoDID string) error
29 ListNotifications() ([]Notification, error)
30
31 CreatePlayerEvent(event PlayerEventAPI) error
32 ListPlayerEvents(playerId string) ([]PlayerEvent, error)
33 PlayerReport(playerId string) (map[string]any, error)
34 ClearPlayerEvents() error
35
36 CreateSegment(segment *Segment) error
37 MostRecentSegments() ([]Segment, error)
38 LatestSegmentForUser(user string) (*Segment, error)
39 CreateThumbnail(thumb *Thumbnail) error
40 LatestThumbnailForUser(user string) (*Thumbnail, error)
41 GetSegment(id string) (*Segment, error)
42 StartSegmentCleaner(ctx context.Context) error
43
44 GetIdentity(id string) (*Identity, error)
45 UpdateIdentity(ident *Identity) error
46
47 GetRepo(did string) (*Repo, error)
48 GetRepoByHandle(handle string) (*Repo, error)
49 GetRepoByHandleOrDID(arg string) (*Repo, error)
50 GetRepoBySigningKey(signingKey string) (*Repo, error)
51 UpdateRepo(repo *Repo) error
52
53 UpdateSigningKey(key *SigningKey) error
54 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error)
55 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error)
56
57 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error
58 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error)
59 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error)
60 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error)
61 DeleteFollow(ctx context.Context, userDID, rev string) error
62 GetFollowersNotificationTokens(userDID string) ([]string, error)
63
64 CreateFeedPost(ctx context.Context, post *FeedPost) error
65 ListFeedPosts() ([]FeedPost, error)
66 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error)
67 GetFeedPost(cid string) (*FeedPost, error)
68 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error)
69
70 CreateLivestream(ctx context.Context, ls *Livestream) error
71 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error)
72 GetLivestreamByPostCID(postCID string) (*Livestream, error)
73
74 CreateBlock(ctx context.Context, block *Block) error
75 GetBlock(ctx context.Context, rkey string) (*Block, error)
76 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error)
77 DeleteBlock(ctx context.Context, rkey string) error
78
79 CreateChatMessage(ctx context.Context, message *ChatMessage) error
80 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error)
81 GetChatMessage(cid string) (*ChatMessage, error)
82
83 CreateChatProfile(ctx context.Context, profile *ChatProfile) error
84 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error)
85
86 CreateOAuthSession(id string, session *oproxy.OAuthSession) error
87 LoadOAuthSession(id string) (*oproxy.OAuthSession, error)
88 UpdateOAuthSession(id string, session *oproxy.OAuthSession) error
89 ListOAuthSessions() ([]oproxy.OAuthSession, error)
90}
91
92func MakeDB(dbURL string) (Model, error) {
93 log.Log(context.Background(), "starting database", "dbURL", dbURL)
94 sqliteSuffix := dbURL
95 if dbURL != ":memory:" {
96 if !strings.HasPrefix(dbURL, "sqlite://") {
97 dbURL = fmt.Sprintf("sqlite://%s", dbURL)
98 }
99 sqliteSuffix := dbURL[len("sqlite://"):]
100 // if this isn't ":memory:", ensure that directory exists (eg, if db
101 // file is being initialized)
102 if !strings.Contains(sqliteSuffix, ":?") {
103 os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm)
104 }
105 }
106 dial := sqlite.Open(sqliteSuffix)
107
108 gormLogger := slogGorm.New(
109 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
110 TimeFormat: time.RFC3339,
111 })),
112 // slogGorm.WithTraceAll(),
113 )
114
115 db, err := gorm.Open(dial, &gorm.Config{
116 SkipDefaultTransaction: true,
117 TranslateError: true,
118 Logger: gormLogger,
119 })
120 if err != nil {
121 return nil, fmt.Errorf("error starting database: %w", err)
122 }
123 err = db.Exec("PRAGMA journal_mode=WAL;").Error
124 if err != nil {
125 return nil, fmt.Errorf("error setting journal mode: %w", err)
126 }
127 sqlDB, err := db.DB()
128 if err != nil {
129 return nil, fmt.Errorf("error getting database: %w", err)
130 }
131 sqlDB.SetMaxOpenConns(1)
132 for _, model := range []any{
133 Notification{},
134 PlayerEvent{},
135 Segment{},
136 Thumbnail{},
137 Identity{},
138 Repo{},
139 SigningKey{},
140 Follow{},
141 FeedPost{},
142 Livestream{},
143 Block{},
144 ChatMessage{},
145 ChatProfile{},
146 oproxy.OAuthSession{},
147 } {
148 err = db.AutoMigrate(model)
149 if err != nil {
150 return nil, err
151 }
152 }
153 return &DBModel{DB: db}, nil
154}