Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "time"
9
10 comatproto "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/api/bsky"
12 "gorm.io/driver/sqlite"
13 "gorm.io/gorm"
14 "stream.place/streamplace/pkg/config"
15 "stream.place/streamplace/pkg/log"
16 "stream.place/streamplace/pkg/streamplace"
17)
18
19type DBModel struct {
20 DB *gorm.DB
21 CLI *config.CLI
22}
23
24type Model interface {
25 CreatePlayerEvent(event PlayerEventAPI) error
26 ListPlayerEvents(playerID string) ([]PlayerEvent, error)
27 PlayerReport(playerID string) (map[string]any, error)
28 ClearPlayerEvents() error
29
30 CreateSegment(segment *Segment) error
31 MostRecentSegments() ([]Segment, error)
32 LatestSegmentForUser(user string) (*Segment, error)
33 LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error)
34 CreateThumbnail(thumb *Thumbnail) error
35 LatestThumbnailForUser(user string) (*Thumbnail, error)
36 GetSegment(id string) (*Segment, error)
37 StartSegmentCleaner(ctx context.Context) error
38
39 GetIdentity(id string) (*Identity, error)
40 UpdateIdentity(ident *Identity) error
41
42 GetRepo(did string) (*Repo, error)
43 GetRepoByHandle(handle string) (*Repo, error)
44 GetRepoByHandleOrDID(arg string) (*Repo, error)
45 GetRepoBySigningKey(signingKey string) (*Repo, error)
46 GetAllRepos() ([]Repo, error)
47 UpdateRepo(repo *Repo) error
48
49 UpdateSigningKey(key *SigningKey) error
50 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error)
51 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error)
52 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error)
53
54 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error
55 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error)
56 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error)
57 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error)
58 DeleteFollow(ctx context.Context, userDID, rev string) error
59
60 CreateFeedPost(ctx context.Context, post *FeedPost) error
61 ListFeedPosts() ([]FeedPost, error)
62 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error)
63 GetFeedPost(uri string) (*FeedPost, error)
64 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error)
65
66 CreateLivestream(ctx context.Context, ls *Livestream) error
67 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error)
68 GetLivestreamByPostURI(postURI string) (*Livestream, error)
69 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error)
70
71 CreateBlock(ctx context.Context, block *Block) error
72 GetBlock(ctx context.Context, rkey string) (*Block, error)
73 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error)
74 DeleteBlock(ctx context.Context, rkey string) error
75
76 CreateChatMessage(ctx context.Context, message *ChatMessage) error
77 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error)
78 GetChatMessage(uri string) (*ChatMessage, error)
79 DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error
80
81 CreateGate(ctx context.Context, gate *Gate) error
82 DeleteGate(ctx context.Context, rkey string) error
83 GetGate(ctx context.Context, rkey string) (*Gate, error)
84 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error)
85
86 CreateChatProfile(ctx context.Context, profile *ChatProfile) error
87 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error)
88
89 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error
90 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error)
91 DeleteServerSettings(ctx context.Context, server string, repoDID string) error
92
93 CreateLabeler(did string) (*Labeler, error)
94 GetLabeler(did string) (*Labeler, error)
95 UpdateLabelerCursor(did string, cursor int64) error
96
97 CreateLabel(label *Label) error
98 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error)
99}
100
101var DBRevision = 2
102
103func MakeDB(dbURL string) (Model, error) {
104 sqliteSuffix := dbURL
105 if dbURL != ":memory:" {
106 // Ensure dbURL exists as a directory on the filesystem
107 if err := os.MkdirAll(dbURL, os.ModePerm); err != nil {
108 return nil, fmt.Errorf("error creating database directory: %w", err)
109 }
110 dbPath := filepath.Join(dbURL, fmt.Sprintf("index_%d.sqlite", DBRevision))
111 sqliteSuffix = dbPath
112 // if this isn't ":memory:", ensure that directory exists (eg, if db
113 // file is being initialized)
114 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil {
115 return nil, fmt.Errorf("error creating database path: %w", err)
116 }
117 }
118 log.Log(context.Background(), "starting database", "dbURL", sqliteSuffix)
119 dial := sqlite.Open(sqliteSuffix)
120
121 db, err := gorm.Open(dial, &gorm.Config{
122 SkipDefaultTransaction: true,
123 TranslateError: true,
124 Logger: config.GormLogger,
125 })
126 if err != nil {
127 return nil, fmt.Errorf("error starting database: %w", err)
128 }
129 err = db.Exec("PRAGMA journal_mode=WAL;").Error
130 if err != nil {
131 return nil, fmt.Errorf("error setting journal mode: %w", err)
132 }
133 sqlDB, err := db.DB()
134 if err != nil {
135 return nil, fmt.Errorf("error getting database: %w", err)
136 }
137 sqlDB.SetMaxOpenConns(1)
138 for _, model := range []any{
139 PlayerEvent{},
140 Segment{},
141 Thumbnail{},
142 Identity{},
143 Repo{},
144 SigningKey{},
145 Follow{},
146 FeedPost{},
147 Livestream{},
148 Block{},
149 ChatMessage{},
150 ChatProfile{},
151 Gate{},
152 ServerSettings{},
153 Labeler{},
154 Label{},
155 } {
156 err = db.AutoMigrate(model)
157 if err != nil {
158 return nil, err
159 }
160 }
161 return &DBModel{DB: db}, nil
162}