Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "time"
9
10 comatproto "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/lmittmann/tint"
13 slogGorm "github.com/orandin/slog-gorm"
14 "gorm.io/driver/sqlite"
15 "gorm.io/gorm"
16 "stream.place/streamplace/pkg/config"
17 "stream.place/streamplace/pkg/log"
18 "stream.place/streamplace/pkg/streamplace"
19)
20
21type DBModel struct {
22 DB *gorm.DB
23 CLI *config.CLI
24}
25
26type Model interface {
27 CreatePlayerEvent(event PlayerEventAPI) error
28 ListPlayerEvents(playerID string) ([]PlayerEvent, error)
29 PlayerReport(playerID string) (map[string]any, error)
30 ClearPlayerEvents() error
31
32 CreateSegment(segment *Segment) error
33 MostRecentSegments() ([]Segment, error)
34 LatestSegmentForUser(user string) (*Segment, error)
35 LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error)
36 CreateThumbnail(thumb *Thumbnail) error
37 LatestThumbnailForUser(user string) (*Thumbnail, error)
38 GetSegment(id string) (*Segment, error)
39 StartSegmentCleaner(ctx context.Context) error
40
41 GetIdentity(id string) (*Identity, error)
42 UpdateIdentity(ident *Identity) error
43
44 GetRepo(did string) (*Repo, error)
45 GetRepoByHandle(handle string) (*Repo, error)
46 GetRepoByHandleOrDID(arg string) (*Repo, error)
47 GetRepoBySigningKey(signingKey string) (*Repo, error)
48 GetAllRepos() ([]Repo, error)
49 UpdateRepo(repo *Repo) error
50
51 UpdateSigningKey(key *SigningKey) error
52 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error)
53 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error)
54 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error)
55
56 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error
57 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error)
58 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error)
59 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error)
60 DeleteFollow(ctx context.Context, userDID, rev string) error
61
62 CreateFeedPost(ctx context.Context, post *FeedPost) error
63 ListFeedPosts() ([]FeedPost, error)
64 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error)
65 GetFeedPost(cid string) (*FeedPost, error)
66 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error)
67
68 CreateLivestream(ctx context.Context, ls *Livestream) error
69 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error)
70 GetLivestreamByPostURI(postURI string) (*Livestream, error)
71 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error)
72
73 CreateBlock(ctx context.Context, block *Block) error
74 GetBlock(ctx context.Context, rkey string) (*Block, error)
75 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error)
76 DeleteBlock(ctx context.Context, rkey string) error
77
78 CreateChatMessage(ctx context.Context, message *ChatMessage) error
79 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error)
80 GetChatMessage(uri string) (*ChatMessage, error)
81 DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error
82
83 CreateGate(ctx context.Context, gate *Gate) error
84 DeleteGate(ctx context.Context, rkey string) error
85 GetGate(ctx context.Context, rkey string) (*Gate, error)
86 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error)
87
88 CreateChatProfile(ctx context.Context, profile *ChatProfile) error
89 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error)
90
91 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error
92 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error)
93 DeleteServerSettings(ctx context.Context, server string, repoDID string) error
94
95 CreateLabeler(did string) (*Labeler, error)
96 GetLabeler(did string) (*Labeler, error)
97 UpdateLabelerCursor(did string, cursor int64) error
98
99 CreateLabel(label *Label) error
100 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error)
101}
102
103var DBRevision = 2
104
105func MakeDB(dbURL string) (Model, error) {
106 sqliteSuffix := dbURL
107 if dbURL != ":memory:" {
108 // Ensure dbURL exists as a directory on the filesystem
109 if err := os.MkdirAll(dbURL, os.ModePerm); err != nil {
110 return nil, fmt.Errorf("error creating database directory: %w", err)
111 }
112 dbPath := filepath.Join(dbURL, fmt.Sprintf("index_%d.sqlite", DBRevision))
113 sqliteSuffix = dbPath
114 // if this isn't ":memory:", ensure that directory exists (eg, if db
115 // file is being initialized)
116 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil {
117 return nil, fmt.Errorf("error creating database path: %w", err)
118 }
119 }
120 log.Log(context.Background(), "starting database", "dbURL", sqliteSuffix)
121 dial := sqlite.Open(sqliteSuffix)
122
123 gormLogger := slogGorm.New(
124 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
125 TimeFormat: time.RFC3339,
126 })),
127 // slogGorm.WithTraceAll(),
128 )
129
130 db, err := gorm.Open(dial, &gorm.Config{
131 SkipDefaultTransaction: true,
132 TranslateError: true,
133 Logger: gormLogger,
134 })
135 if err != nil {
136 return nil, fmt.Errorf("error starting database: %w", err)
137 }
138 err = db.Exec("PRAGMA journal_mode=WAL;").Error
139 if err != nil {
140 return nil, fmt.Errorf("error setting journal mode: %w", err)
141 }
142 sqlDB, err := db.DB()
143 if err != nil {
144 return nil, fmt.Errorf("error getting database: %w", err)
145 }
146 sqlDB.SetMaxOpenConns(1)
147 for _, model := range []any{
148 PlayerEvent{},
149 Segment{},
150 Thumbnail{},
151 Identity{},
152 Repo{},
153 SigningKey{},
154 Follow{},
155 FeedPost{},
156 Livestream{},
157 Block{},
158 ChatMessage{},
159 ChatProfile{},
160 Gate{},
161 ServerSettings{},
162
163 Labeler{},
164 Label{},
165 } {
166 err = db.AutoMigrate(model)
167 if err != nil {
168 return nil, err
169 }
170 }
171 return &DBModel{DB: db}, nil
172}