Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/lmittmann/tint"
13 slogGorm "github.com/orandin/slog-gorm"
14 "gorm.io/driver/sqlite"
15 "gorm.io/gorm"
16 "stream.place/streamplace/pkg/config"
17 "stream.place/streamplace/pkg/log"
18 "stream.place/streamplace/pkg/oproxy"
19 "stream.place/streamplace/pkg/streamplace"
20)
21
22type DBModel struct {
23 DB *gorm.DB
24 CLI *config.CLI
25}
26
27type Model interface {
28 CreateNotification(token, repoDID string) error
29 ListNotifications() ([]Notification, error)
30
31 CreatePlayerEvent(event PlayerEventAPI) error
32 ListPlayerEvents(playerId string) ([]PlayerEvent, error)
33 PlayerReport(playerId string) (map[string]any, error)
34 ClearPlayerEvents() error
35
36 CreateSegment(segment *Segment) error
37 MostRecentSegments() ([]Segment, error)
38 MostRecentSegmentsWithStreamInfo() ([]SegmentWithStreamInfo, error)
39 LatestSegmentForUser(user string) (*Segment, error)
40 LatestSegmentsForUser(user string, limit int, before *time.Time) ([]Segment, error)
41 CreateThumbnail(thumb *Thumbnail) error
42 LatestThumbnailForUser(user string) (*Thumbnail, error)
43 GetSegment(id string) (*Segment, error)
44 StartSegmentCleaner(ctx context.Context) error
45
46 GetIdentity(id string) (*Identity, error)
47 UpdateIdentity(ident *Identity) error
48
49 GetRepo(did string) (*Repo, error)
50 GetRepoByHandle(handle string) (*Repo, error)
51 GetRepoByHandleOrDID(arg string) (*Repo, error)
52 GetRepoBySigningKey(signingKey string) (*Repo, error)
53 UpdateRepo(repo *Repo) error
54
55 UpdateSigningKey(key *SigningKey) error
56 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error)
57 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error)
58
59 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error
60 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error)
61 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error)
62 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error)
63 DeleteFollow(ctx context.Context, userDID, rev string) error
64 GetFollowersNotificationTokens(userDID string) ([]string, error)
65
66 CreateFeedPost(ctx context.Context, post *FeedPost) error
67 ListFeedPosts() ([]FeedPost, error)
68 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error)
69 GetFeedPost(cid string) (*FeedPost, error)
70 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error)
71
72 CreateLivestream(ctx context.Context, ls *Livestream) error
73 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error)
74 GetLivestreamByPostCID(postCID string) (*Livestream, error)
75
76 CreateBlock(ctx context.Context, block *Block) error
77 GetBlock(ctx context.Context, rkey string) (*Block, error)
78 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error)
79 DeleteBlock(ctx context.Context, rkey string) error
80
81 CreateChatMessage(ctx context.Context, message *ChatMessage) error
82 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error)
83 GetChatMessage(cid string) (*ChatMessage, error)
84
85 CreateChatProfile(ctx context.Context, profile *ChatProfile) error
86 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error)
87
88 CreateOAuthSession(id string, session *oproxy.OAuthSession) error
89 LoadOAuthSession(id string) (*oproxy.OAuthSession, error)
90 UpdateOAuthSession(id string, session *oproxy.OAuthSession) error
91 ListOAuthSessions() ([]oproxy.OAuthSession, error)
92}
93
94func MakeDB(dbURL string) (Model, error) {
95 log.Log(context.Background(), "starting database", "dbURL", dbURL)
96 sqliteSuffix := dbURL
97 if dbURL != ":memory:" {
98 if !strings.HasPrefix(dbURL, "sqlite://") {
99 dbURL = fmt.Sprintf("sqlite://%s", dbURL)
100 }
101 sqliteSuffix := dbURL[len("sqlite://"):]
102 // if this isn't ":memory:", ensure that directory exists (eg, if db
103 // file is being initialized)
104 if !strings.Contains(sqliteSuffix, ":?") {
105 os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm)
106 }
107 }
108 dial := sqlite.Open(sqliteSuffix)
109
110 gormLogger := slogGorm.New(
111 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
112 TimeFormat: time.RFC3339,
113 })),
114 // slogGorm.WithTraceAll(),
115 )
116
117 db, err := gorm.Open(dial, &gorm.Config{
118 SkipDefaultTransaction: true,
119 TranslateError: true,
120 Logger: gormLogger,
121 })
122 if err != nil {
123 return nil, fmt.Errorf("error starting database: %w", err)
124 }
125 err = db.Exec("PRAGMA journal_mode=WAL;").Error
126 if err != nil {
127 return nil, fmt.Errorf("error setting journal mode: %w", err)
128 }
129 sqlDB, err := db.DB()
130 if err != nil {
131 return nil, fmt.Errorf("error getting database: %w", err)
132 }
133 sqlDB.SetMaxOpenConns(1)
134 for _, model := range []any{
135 Notification{},
136 PlayerEvent{},
137 Segment{},
138 Thumbnail{},
139 Identity{},
140 Repo{},
141 SigningKey{},
142 Follow{},
143 FeedPost{},
144 Livestream{},
145 Block{},
146 ChatMessage{},
147 ChatProfile{},
148 oproxy.OAuthSession{},
149 } {
150 err = db.AutoMigrate(model)
151 if err != nil {
152 return nil, err
153 }
154 }
155 return &DBModel{DB: db}, nil
156}