Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "strings"
9 "time"
10
11 comatproto "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/api/bsky"
13 "github.com/lmittmann/tint"
14 slogGorm "github.com/orandin/slog-gorm"
15 "gorm.io/driver/sqlite"
16 "gorm.io/gorm"
17 "stream.place/streamplace/pkg/config"
18 "stream.place/streamplace/pkg/log"
19 "stream.place/streamplace/pkg/streamplace"
20)
21
22type DBModel struct {
23 DB *gorm.DB
24 CLI *config.CLI
25}
26
27type Model interface {
28 CreatePlayerEvent(event PlayerEventAPI) error
29 ListPlayerEvents(playerID string) ([]PlayerEvent, error)
30 PlayerReport(playerID string) (map[string]any, error)
31 ClearPlayerEvents() error
32
33 CreateSegment(segment *Segment) error
34 MostRecentSegments() ([]Segment, error)
35 LatestSegmentForUser(user string) (*Segment, error)
36 LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error)
37 CreateThumbnail(thumb *Thumbnail) error
38 LatestThumbnailForUser(user string) (*Thumbnail, error)
39 GetSegment(id string) (*Segment, error)
40 StartSegmentCleaner(ctx context.Context) error
41
42 GetIdentity(id string) (*Identity, error)
43 UpdateIdentity(ident *Identity) error
44
45 GetRepo(did string) (*Repo, error)
46 GetRepoByHandle(handle string) (*Repo, error)
47 GetRepoByHandleOrDID(arg string) (*Repo, error)
48 GetRepoBySigningKey(signingKey string) (*Repo, error)
49 GetAllRepos() ([]Repo, error)
50 UpdateRepo(repo *Repo) error
51
52 UpdateSigningKey(key *SigningKey) error
53 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error)
54 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error)
55 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error)
56
57 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error
58 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error)
59 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error)
60 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error)
61 DeleteFollow(ctx context.Context, userDID, rev string) error
62
63 CreateFeedPost(ctx context.Context, post *FeedPost) error
64 ListFeedPosts() ([]FeedPost, error)
65 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error)
66 GetFeedPost(cid string) (*FeedPost, error)
67 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error)
68
69 CreateLivestream(ctx context.Context, ls *Livestream) error
70 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error)
71 GetLivestreamByPostCID(postCID string) (*Livestream, error)
72 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error)
73
74 CreateBlock(ctx context.Context, block *Block) error
75 GetBlock(ctx context.Context, rkey string) (*Block, error)
76 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error)
77 DeleteBlock(ctx context.Context, rkey string) error
78
79 CreateChatMessage(ctx context.Context, message *ChatMessage) error
80 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error)
81 GetChatMessage(cid string) (*ChatMessage, error)
82
83 CreateGate(ctx context.Context, gate *Gate) error
84 DeleteGate(ctx context.Context, rkey string) error
85 GetGate(ctx context.Context, rkey string) (*Gate, error)
86 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error)
87
88 CreateChatProfile(ctx context.Context, profile *ChatProfile) error
89 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error)
90
91 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error
92 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error)
93 DeleteServerSettings(ctx context.Context, server string, repoDID string) error
94
95 CreateLabeler(did string) (*Labeler, error)
96 GetLabeler(did string) (*Labeler, error)
97 UpdateLabelerCursor(did string, cursor int64) error
98
99 CreateLabel(label *Label) error
100 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error)
101}
102
103func MakeDB(dbURL string) (Model, error) {
104 log.Log(context.Background(), "starting database", "dbURL", dbURL)
105 sqliteSuffix := dbURL
106 if dbURL != ":memory:" {
107 if !strings.HasPrefix(dbURL, "sqlite://") {
108 dbURL = fmt.Sprintf("sqlite://%s", dbURL)
109 }
110 sqliteSuffix := dbURL[len("sqlite://"):]
111 // if this isn't ":memory:", ensure that directory exists (eg, if db
112 // file is being initialized)
113 if !strings.Contains(sqliteSuffix, ":?") {
114 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil {
115 return nil, fmt.Errorf("error creating database path: %w", err)
116 }
117 }
118 }
119 dial := sqlite.Open(sqliteSuffix)
120
121 gormLogger := slogGorm.New(
122 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
123 TimeFormat: time.RFC3339,
124 })),
125 // slogGorm.WithTraceAll(),
126 )
127
128 db, err := gorm.Open(dial, &gorm.Config{
129 SkipDefaultTransaction: true,
130 TranslateError: true,
131 Logger: gormLogger,
132 })
133 if err != nil {
134 return nil, fmt.Errorf("error starting database: %w", err)
135 }
136 err = db.Exec("PRAGMA journal_mode=WAL;").Error
137 if err != nil {
138 return nil, fmt.Errorf("error setting journal mode: %w", err)
139 }
140 sqlDB, err := db.DB()
141 if err != nil {
142 return nil, fmt.Errorf("error getting database: %w", err)
143 }
144 sqlDB.SetMaxOpenConns(1)
145 for _, model := range []any{
146 PlayerEvent{},
147 Segment{},
148 Thumbnail{},
149 Identity{},
150 Repo{},
151 SigningKey{},
152 Follow{},
153 FeedPost{},
154 Livestream{},
155 Block{},
156 ChatMessage{},
157 ChatProfile{},
158 Gate{},
159 ServerSettings{},
160
161 Labeler{},
162 Label{},
163 } {
164 err = db.AutoMigrate(model)
165 if err != nil {
166 return nil, err
167 }
168 }
169 return &DBModel{DB: db}, nil
170}