Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "time"
9
10 comatproto "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "gorm.io/driver/sqlite"
14 "gorm.io/gorm"
15 "gorm.io/plugin/prometheus"
16 "stream.place/streamplace/pkg/config"
17 "stream.place/streamplace/pkg/log"
18 "stream.place/streamplace/pkg/streamplace"
19)
20
21type DBModel struct {
22 DB *gorm.DB
23}
24
25type Model interface {
26 CreatePlayerEvent(event PlayerEventAPI) error
27 ListPlayerEvents(playerID string) ([]PlayerEvent, error)
28 PlayerReport(playerID string) (map[string]any, error)
29 ClearPlayerEvents() error
30
31 CreateSegment(segment *Segment) error
32 MostRecentSegments() ([]Segment, error)
33 LatestSegmentForUser(user string) (*Segment, error)
34 LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error)
35 CreateThumbnail(thumb *Thumbnail) error
36 LatestThumbnailForUser(user string) (*Thumbnail, error)
37 GetSegment(id string) (*Segment, error)
38 GetExpiredSegments(ctx context.Context) ([]Segment, error)
39 DeleteSegment(ctx context.Context, id string) error
40 StartSegmentCleaner(ctx context.Context) error
41 SegmentCleaner(ctx context.Context) error
42
43 GetIdentity(id string) (*Identity, error)
44 UpdateIdentity(ident *Identity) error
45
46 GetRepo(did string) (*Repo, error)
47 GetRepoByHandle(handle string) (*Repo, error)
48 GetRepoByHandleOrDID(arg string) (*Repo, error)
49 GetRepoBySigningKey(signingKey string) (*Repo, error)
50 GetAllRepos() ([]Repo, error)
51 UpdateRepo(repo *Repo) error
52
53 UpdateSigningKey(key *SigningKey) error
54 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error)
55 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error)
56 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error)
57
58 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error
59 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error)
60 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error)
61 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error)
62 DeleteFollow(ctx context.Context, userDID, rev string) error
63
64 CreateFeedPost(ctx context.Context, post *FeedPost) error
65 ListFeedPosts() ([]FeedPost, error)
66 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error)
67 GetFeedPost(uri string) (*FeedPost, error)
68 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error)
69
70 CreateLivestream(ctx context.Context, ls *Livestream) error
71 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error)
72 GetLivestreamByPostURI(postURI string) (*Livestream, error)
73 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error)
74
75 CreateBlock(ctx context.Context, block *Block) error
76 GetBlock(ctx context.Context, rkey string) (*Block, error)
77 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error)
78 DeleteBlock(ctx context.Context, rkey string) error
79
80 CreateChatMessage(ctx context.Context, message *ChatMessage) error
81 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error)
82 GetChatMessage(uri string) (*ChatMessage, error)
83 DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error
84
85 CreateGate(ctx context.Context, gate *Gate) error
86 DeleteGate(ctx context.Context, rkey string) error
87 GetGate(ctx context.Context, rkey string) (*Gate, error)
88 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error)
89
90 CreateChatProfile(ctx context.Context, profile *ChatProfile) error
91 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error)
92
93 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error
94 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error)
95 DeleteServerSettings(ctx context.Context, server string, repoDID string) error
96
97 CreateLabeler(did string) (*Labeler, error)
98 GetLabeler(did string) (*Labeler, error)
99 UpdateLabelerCursor(did string, cursor int64) error
100
101 CreateLabel(label *Label) error
102 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error)
103
104 UpdateBroadcastOrigin(ctx context.Context, origin *streamplace.BroadcastOrigin, aturi syntax.ATURI) error
105 GetRecentBroadcastOrigins(ctx context.Context) ([]*streamplace.BroadcastDefs_BroadcastOriginView, error)
106
107 CreateMetadataConfiguration(ctx context.Context, metadata *MetadataConfiguration) error
108 GetMetadataConfiguration(ctx context.Context, repoDID string) (*MetadataConfiguration, error)
109 DeleteMetadataConfiguration(ctx context.Context, repoDID string) error
110}
111
112var DBRevision = 2
113
114func MakeDB(dbURL string) (Model, error) {
115 sqliteSuffix := dbURL
116 if dbURL != ":memory:" {
117 // Ensure dbURL exists as a directory on the filesystem
118 if err := os.MkdirAll(dbURL, os.ModePerm); err != nil {
119 return nil, fmt.Errorf("error creating database directory: %w", err)
120 }
121 dbPath := filepath.Join(dbURL, fmt.Sprintf("index_%d.sqlite", DBRevision))
122 sqliteSuffix = dbPath
123 // if this isn't ":memory:", ensure that directory exists (eg, if db
124 // file is being initialized)
125 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil {
126 return nil, fmt.Errorf("error creating database path: %w", err)
127 }
128 }
129 log.Log(context.Background(), "starting database", "dbURL", sqliteSuffix)
130 dial := sqlite.Open(sqliteSuffix)
131
132 db, err := gorm.Open(dial, &gorm.Config{
133 SkipDefaultTransaction: true,
134 TranslateError: true,
135 Logger: config.GormLogger,
136 })
137 if err != nil {
138 return nil, fmt.Errorf("error starting database: %w", err)
139 }
140 err = db.Exec("PRAGMA journal_mode=WAL;").Error
141 if err != nil {
142 return nil, fmt.Errorf("error setting journal mode: %w", err)
143 }
144
145 err = db.Use(prometheus.New(prometheus.Config{
146 DBName: "index",
147 RefreshInterval: 10,
148 StartServer: false,
149 }))
150 if err != nil {
151 return nil, fmt.Errorf("error using prometheus plugin: %w", err)
152 }
153
154 sqlDB, err := db.DB()
155 if err != nil {
156 return nil, fmt.Errorf("error getting database: %w", err)
157 }
158 sqlDB.SetMaxOpenConns(1)
159 for _, model := range []any{
160 PlayerEvent{},
161 Segment{},
162 Thumbnail{},
163 Identity{},
164 Repo{},
165 SigningKey{},
166 Follow{},
167 FeedPost{},
168 Livestream{},
169 Block{},
170 ChatMessage{},
171 ChatProfile{},
172 Gate{},
173 ServerSettings{},
174 Labeler{},
175 Label{},
176 BroadcastOrigin{},
177 MetadataConfiguration{},
178 } {
179 err = db.AutoMigrate(model)
180 if err != nil {
181 return nil, err
182 }
183 }
184 return &DBModel{DB: db}, nil
185}