Live video on the AT Protocol
at feat-iroh-replicator 188 lines 6.5 kB view raw
1package model 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "path/filepath" 8 "strings" 9 "time" 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/api/bsky" 13 "github.com/lmittmann/tint" 14 slogGorm "github.com/orandin/slog-gorm" 15 "github.com/streamplace/oatproxy/pkg/oatproxy" 16 "gorm.io/driver/sqlite" 17 "gorm.io/gorm" 18 "stream.place/streamplace/pkg/config" 19 "stream.place/streamplace/pkg/log" 20 "stream.place/streamplace/pkg/streamplace" 21) 22 23type DBModel struct { 24 DB *gorm.DB 25 CLI *config.CLI 26} 27 28type Model interface { 29 CreateNotification(token, repoDID string) error 30 ListNotifications() ([]Notification, error) 31 32 CreatePlayerEvent(event PlayerEventAPI) error 33 ListPlayerEvents(playerID string) ([]PlayerEvent, error) 34 PlayerReport(playerID string) (map[string]any, error) 35 ClearPlayerEvents() error 36 37 CreateSegment(segment *Segment) error 38 MostRecentSegments() ([]Segment, error) 39 LatestSegmentForUser(user string) (*Segment, error) 40 LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) 41 CreateThumbnail(thumb *Thumbnail) error 42 LatestThumbnailForUser(user string) (*Thumbnail, error) 43 GetSegment(id string) (*Segment, error) 44 StartSegmentCleaner(ctx context.Context) error 45 46 GetIdentity(id string) (*Identity, error) 47 UpdateIdentity(ident *Identity) error 48 49 GetRepo(did string) (*Repo, error) 50 GetRepoByHandle(handle string) (*Repo, error) 51 GetRepoByHandleOrDID(arg string) (*Repo, error) 52 GetRepoBySigningKey(signingKey string) (*Repo, error) 53 GetAllRepos() ([]Repo, error) 54 UpdateRepo(repo *Repo) error 55 56 UpdateSigningKey(key *SigningKey) error 57 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error) 58 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error) 59 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error) 60 61 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error 62 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error) 63 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error) 64 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error) 65 DeleteFollow(ctx context.Context, userDID, rev string) error 66 GetFollowersNotificationTokens(userDID string) ([]string, error) 67 68 CreateFeedPost(ctx context.Context, post *FeedPost) error 69 ListFeedPosts() ([]FeedPost, error) 70 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error) 71 GetFeedPost(cid string) (*FeedPost, error) 72 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error) 73 74 CreateLivestream(ctx context.Context, ls *Livestream) error 75 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) 76 GetLivestreamByPostCID(postCID string) (*Livestream, error) 77 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) 78 79 CreateBlock(ctx context.Context, block *Block) error 80 GetBlock(ctx context.Context, rkey string) (*Block, error) 81 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error) 82 DeleteBlock(ctx context.Context, rkey string) error 83 84 CreateChatMessage(ctx context.Context, message *ChatMessage) error 85 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) 86 GetChatMessage(cid string) (*ChatMessage, error) 87 88 CreateGate(ctx context.Context, gate *Gate) error 89 DeleteGate(ctx context.Context, rkey string) error 90 GetGate(ctx context.Context, rkey string) (*Gate, error) 91 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error) 92 93 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 94 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 95 96 CreateOAuthSession(id string, session *oatproxy.OAuthSession) error 97 LoadOAuthSession(id string) (*oatproxy.OAuthSession, error) 98 UpdateOAuthSession(id string, session *oatproxy.OAuthSession) error 99 ListOAuthSessions() ([]oatproxy.OAuthSession, error) 100 GetSessionByDID(did string) (*oatproxy.OAuthSession, error) 101 102 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error 103 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) 104 DeleteServerSettings(ctx context.Context, server string, repoDID string) error 105 106 CreateCommitEvent(commit *comatproto.SyncSubscribeRepos_Commit, signedData string) error 107 GetCommitEventsSince(repoDID string, t time.Time) ([]*XrpcStreamEvent, error) 108 GetCommitEventsSinceSeq(repoDID string, seq int64) ([]*XrpcStreamEvent, error) 109 GetMostRecentCommitEvent(repoDID string) (*XrpcStreamEvent, error) 110 111 CreateLabeler(did string) (*Labeler, error) 112 GetLabeler(did string) (*Labeler, error) 113 UpdateLabelerCursor(did string, cursor int64) error 114 115 CreateLabel(label *Label) error 116 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error) 117} 118 119func MakeDB(dbURL string) (Model, error) { 120 log.Log(context.Background(), "starting database", "dbURL", dbURL) 121 sqliteSuffix := dbURL 122 if dbURL != ":memory:" { 123 if !strings.HasPrefix(dbURL, "sqlite://") { 124 dbURL = fmt.Sprintf("sqlite://%s", dbURL) 125 } 126 sqliteSuffix := dbURL[len("sqlite://"):] 127 // if this isn't ":memory:", ensure that directory exists (eg, if db 128 // file is being initialized) 129 if !strings.Contains(sqliteSuffix, ":?") { 130 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil { 131 return nil, fmt.Errorf("error creating database path: %w", err) 132 } 133 } 134 } 135 dial := sqlite.Open(sqliteSuffix) 136 137 gormLogger := slogGorm.New( 138 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 139 TimeFormat: time.RFC3339, 140 })), 141 // slogGorm.WithTraceAll(), 142 ) 143 144 db, err := gorm.Open(dial, &gorm.Config{ 145 SkipDefaultTransaction: true, 146 TranslateError: true, 147 Logger: gormLogger, 148 }) 149 if err != nil { 150 return nil, fmt.Errorf("error starting database: %w", err) 151 } 152 err = db.Exec("PRAGMA journal_mode=WAL;").Error 153 if err != nil { 154 return nil, fmt.Errorf("error setting journal mode: %w", err) 155 } 156 sqlDB, err := db.DB() 157 if err != nil { 158 return nil, fmt.Errorf("error getting database: %w", err) 159 } 160 sqlDB.SetMaxOpenConns(1) 161 for _, model := range []any{ 162 Notification{}, 163 PlayerEvent{}, 164 Segment{}, 165 Thumbnail{}, 166 Identity{}, 167 Repo{}, 168 SigningKey{}, 169 Follow{}, 170 FeedPost{}, 171 Livestream{}, 172 Block{}, 173 ChatMessage{}, 174 ChatProfile{}, 175 Gate{}, 176 oatproxy.OAuthSession{}, 177 ServerSettings{}, 178 XrpcStreamEvent{}, 179 Labeler{}, 180 Label{}, 181 } { 182 err = db.AutoMigrate(model) 183 if err != nil { 184 return nil, err 185 } 186 } 187 return &DBModel{DB: db}, nil 188}