Live video on the AT Protocol
at natb/update-docs-rtmp 154 lines 4.9 kB view raw
1package model 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "path/filepath" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/lmittmann/tint" 13 slogGorm "github.com/orandin/slog-gorm" 14 "gorm.io/driver/sqlite" 15 "gorm.io/gorm" 16 "stream.place/streamplace/pkg/config" 17 "stream.place/streamplace/pkg/log" 18 "stream.place/streamplace/pkg/oproxy" 19 "stream.place/streamplace/pkg/streamplace" 20) 21 22type DBModel struct { 23 DB *gorm.DB 24 CLI *config.CLI 25} 26 27type Model interface { 28 CreateNotification(token, repoDID string) error 29 ListNotifications() ([]Notification, error) 30 31 CreatePlayerEvent(event PlayerEventAPI) error 32 ListPlayerEvents(playerId string) ([]PlayerEvent, error) 33 PlayerReport(playerId string) (map[string]any, error) 34 ClearPlayerEvents() error 35 36 CreateSegment(segment *Segment) error 37 MostRecentSegments() ([]Segment, error) 38 LatestSegmentForUser(user string) (*Segment, error) 39 CreateThumbnail(thumb *Thumbnail) error 40 LatestThumbnailForUser(user string) (*Thumbnail, error) 41 GetSegment(id string) (*Segment, error) 42 StartSegmentCleaner(ctx context.Context) error 43 44 GetIdentity(id string) (*Identity, error) 45 UpdateIdentity(ident *Identity) error 46 47 GetRepo(did string) (*Repo, error) 48 GetRepoByHandle(handle string) (*Repo, error) 49 GetRepoByHandleOrDID(arg string) (*Repo, error) 50 GetRepoBySigningKey(signingKey string) (*Repo, error) 51 UpdateRepo(repo *Repo) error 52 53 UpdateSigningKey(key *SigningKey) error 54 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error) 55 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error) 56 57 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error 58 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error) 59 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error) 60 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error) 61 DeleteFollow(ctx context.Context, userDID, rev string) error 62 GetFollowersNotificationTokens(userDID string) ([]string, error) 63 64 CreateFeedPost(ctx context.Context, post *FeedPost) error 65 ListFeedPosts() ([]FeedPost, error) 66 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error) 67 GetFeedPost(cid string) (*FeedPost, error) 68 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error) 69 70 CreateLivestream(ctx context.Context, ls *Livestream) error 71 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) 72 GetLivestreamByPostCID(postCID string) (*Livestream, error) 73 74 CreateBlock(ctx context.Context, block *Block) error 75 GetBlock(ctx context.Context, rkey string) (*Block, error) 76 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error) 77 DeleteBlock(ctx context.Context, rkey string) error 78 79 CreateChatMessage(ctx context.Context, message *ChatMessage) error 80 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) 81 GetChatMessage(cid string) (*ChatMessage, error) 82 83 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 84 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 85 86 CreateOAuthSession(id string, session *oproxy.OAuthSession) error 87 LoadOAuthSession(id string) (*oproxy.OAuthSession, error) 88 UpdateOAuthSession(id string, session *oproxy.OAuthSession) error 89 ListOAuthSessions() ([]oproxy.OAuthSession, error) 90} 91 92func MakeDB(dbURL string) (Model, error) { 93 log.Log(context.Background(), "starting database", "dbURL", dbURL) 94 sqliteSuffix := dbURL 95 if dbURL != ":memory:" { 96 if !strings.HasPrefix(dbURL, "sqlite://") { 97 dbURL = fmt.Sprintf("sqlite://%s", dbURL) 98 } 99 sqliteSuffix := dbURL[len("sqlite://"):] 100 // if this isn't ":memory:", ensure that directory exists (eg, if db 101 // file is being initialized) 102 if !strings.Contains(sqliteSuffix, ":?") { 103 os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm) 104 } 105 } 106 dial := sqlite.Open(sqliteSuffix) 107 108 gormLogger := slogGorm.New( 109 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 110 TimeFormat: time.RFC3339, 111 })), 112 // slogGorm.WithTraceAll(), 113 ) 114 115 db, err := gorm.Open(dial, &gorm.Config{ 116 SkipDefaultTransaction: true, 117 TranslateError: true, 118 Logger: gormLogger, 119 }) 120 if err != nil { 121 return nil, fmt.Errorf("error starting database: %w", err) 122 } 123 err = db.Exec("PRAGMA journal_mode=WAL;").Error 124 if err != nil { 125 return nil, fmt.Errorf("error setting journal mode: %w", err) 126 } 127 sqlDB, err := db.DB() 128 if err != nil { 129 return nil, fmt.Errorf("error getting database: %w", err) 130 } 131 sqlDB.SetMaxOpenConns(1) 132 for _, model := range []any{ 133 Notification{}, 134 PlayerEvent{}, 135 Segment{}, 136 Thumbnail{}, 137 Identity{}, 138 Repo{}, 139 SigningKey{}, 140 Follow{}, 141 FeedPost{}, 142 Livestream{}, 143 Block{}, 144 ChatMessage{}, 145 ChatProfile{}, 146 oproxy.OAuthSession{}, 147 } { 148 err = db.AutoMigrate(model) 149 if err != nil { 150 return nil, err 151 } 152 } 153 return &DBModel{DB: db}, nil 154}