Live video on the AT Protocol
1package model
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/lmittmann/tint"
13 slogGorm "github.com/orandin/slog-gorm"
14 "gorm.io/driver/sqlite"
15 "gorm.io/gorm"
16 "stream.place/streamplace/pkg/log"
17 "stream.place/streamplace/pkg/streamplace"
18)
19
20type DBModel struct {
21 DB *gorm.DB
22}
23
24type Model interface {
25 CreateNotification(token, repoDID string) error
26 ListNotifications() ([]Notification, error)
27
28 CreatePlayerEvent(event PlayerEventAPI) error
29 ListPlayerEvents(playerId string) ([]PlayerEvent, error)
30 PlayerReport(playerId string) (map[string]any, error)
31 ClearPlayerEvents() error
32
33 CreateSegment(segment *Segment) error
34 MostRecentSegments() ([]Segment, error)
35 LatestSegmentForUser(user string) (*Segment, error)
36 CreateThumbnail(thumb *Thumbnail) error
37 LatestThumbnailForUser(user string) (*Thumbnail, error)
38 GetSegment(id string) (*Segment, error)
39 StartSegmentCleaner(ctx context.Context) error
40
41 GetIdentity(id string) (*Identity, error)
42 UpdateIdentity(ident *Identity) error
43
44 GetRepo(did string) (*Repo, error)
45 GetRepoByHandle(handle string) (*Repo, error)
46 GetRepoByHandleOrDID(arg string) (*Repo, error)
47 GetRepoBySigningKey(signingKey string) (*Repo, error)
48 UpdateRepo(repo *Repo) error
49
50 UpdateSigningKey(key *SigningKey) error
51 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error)
52 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error)
53
54 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error
55 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error)
56 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error)
57 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error)
58 DeleteFollow(ctx context.Context, userDID, rev string) error
59 GetFollowersNotificationTokens(userDID string) ([]string, error)
60
61 CreateFeedPost(ctx context.Context, post *FeedPost) error
62 ListFeedPosts() ([]FeedPost, error)
63 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error)
64 GetFeedPost(cid string) (*FeedPost, error)
65 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error)
66
67 CreateLivestream(ctx context.Context, ls *Livestream) error
68 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error)
69 GetLivestreamByPostCID(postCID string) (*Livestream, error)
70
71 CreateBlock(ctx context.Context, block *Block) error
72 GetBlock(ctx context.Context, rkey string) (*Block, error)
73 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error)
74 DeleteBlock(ctx context.Context, rkey string) error
75
76 CreateChatMessage(ctx context.Context, message *ChatMessage) error
77 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error)
78 GetChatMessage(cid string) (*ChatMessage, error)
79
80 CreateChatProfile(ctx context.Context, profile *ChatProfile) error
81 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error)
82}
83
84func MakeDB(dbURL string) (Model, error) {
85 log.Log(context.Background(), "starting database", "dbURL", dbURL)
86 sqliteSuffix := dbURL
87 if dbURL != ":memory:" {
88 if !strings.HasPrefix(dbURL, "sqlite://") {
89 dbURL = fmt.Sprintf("sqlite://%s", dbURL)
90 }
91 sqliteSuffix := dbURL[len("sqlite://"):]
92 // if this isn't ":memory:", ensure that directory exists (eg, if db
93 // file is being initialized)
94 if !strings.Contains(sqliteSuffix, ":?") {
95 os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm)
96 }
97 }
98 dial := sqlite.Open(sqliteSuffix)
99
100 gormLogger := slogGorm.New(
101 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{
102 TimeFormat: time.RFC3339,
103 })),
104 // slogGorm.WithTraceAll(),
105 )
106
107 db, err := gorm.Open(dial, &gorm.Config{
108 SkipDefaultTransaction: true,
109 TranslateError: true,
110 Logger: gormLogger,
111 })
112 if err != nil {
113 return nil, fmt.Errorf("error starting database: %w", err)
114 }
115 err = db.Exec("PRAGMA journal_mode=WAL;").Error
116 if err != nil {
117 return nil, fmt.Errorf("error setting journal mode: %w", err)
118 }
119 sqlDB, err := db.DB()
120 if err != nil {
121 return nil, fmt.Errorf("error getting database: %w", err)
122 }
123 sqlDB.SetMaxOpenConns(1)
124 for _, model := range []any{
125 Notification{},
126 PlayerEvent{},
127 Segment{},
128 Thumbnail{},
129 Identity{},
130 Repo{},
131 SigningKey{},
132 Follow{},
133 FeedPost{},
134 Livestream{},
135 Block{},
136 ChatMessage{},
137 ChatProfile{},
138 } {
139 err = db.AutoMigrate(model)
140 if err != nil {
141 return nil, err
142 }
143 }
144 return &DBModel{DB: db}, nil
145}