Live video on the AT Protocol
at eli/podman-cache 166 lines 5.6 kB view raw
1package model 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "path/filepath" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/lmittmann/tint" 13 slogGorm "github.com/orandin/slog-gorm" 14 "github.com/streamplace/oatproxy/pkg/oatproxy" 15 "gorm.io/driver/sqlite" 16 "gorm.io/gorm" 17 "stream.place/streamplace/pkg/config" 18 "stream.place/streamplace/pkg/log" 19 "stream.place/streamplace/pkg/streamplace" 20) 21 22type DBModel struct { 23 DB *gorm.DB 24 CLI *config.CLI 25} 26 27type Model interface { 28 CreateNotification(token, repoDID string) error 29 ListNotifications() ([]Notification, error) 30 31 CreatePlayerEvent(event PlayerEventAPI) error 32 ListPlayerEvents(playerID string) ([]PlayerEvent, error) 33 PlayerReport(playerID string) (map[string]any, error) 34 ClearPlayerEvents() error 35 36 CreateSegment(segment *Segment) error 37 MostRecentSegments() ([]Segment, error) 38 LatestSegmentForUser(user string) (*Segment, error) 39 LatestSegmentsForUser(user string, limit int, before *time.Time) ([]Segment, error) 40 CreateThumbnail(thumb *Thumbnail) error 41 LatestThumbnailForUser(user string) (*Thumbnail, error) 42 GetSegment(id string) (*Segment, error) 43 StartSegmentCleaner(ctx context.Context) error 44 45 GetIdentity(id string) (*Identity, error) 46 UpdateIdentity(ident *Identity) error 47 48 GetRepo(did string) (*Repo, error) 49 GetRepoByHandle(handle string) (*Repo, error) 50 GetRepoByHandleOrDID(arg string) (*Repo, error) 51 GetRepoBySigningKey(signingKey string) (*Repo, error) 52 GetAllRepos() ([]Repo, error) 53 UpdateRepo(repo *Repo) error 54 55 UpdateSigningKey(key *SigningKey) error 56 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error) 57 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error) 58 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error) 59 60 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error 61 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error) 62 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error) 63 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error) 64 DeleteFollow(ctx context.Context, userDID, rev string) error 65 GetFollowersNotificationTokens(userDID string) ([]string, error) 66 67 CreateFeedPost(ctx context.Context, post *FeedPost) error 68 ListFeedPosts() ([]FeedPost, error) 69 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error) 70 GetFeedPost(cid string) (*FeedPost, error) 71 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error) 72 73 CreateLivestream(ctx context.Context, ls *Livestream) error 74 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) 75 GetLivestreamByPostCID(postCID string) (*Livestream, error) 76 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) 77 78 CreateBlock(ctx context.Context, block *Block) error 79 GetBlock(ctx context.Context, rkey string) (*Block, error) 80 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error) 81 DeleteBlock(ctx context.Context, rkey string) error 82 83 CreateChatMessage(ctx context.Context, message *ChatMessage) error 84 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) 85 GetChatMessage(cid string) (*ChatMessage, error) 86 87 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 88 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 89 90 CreateOAuthSession(id string, session *oatproxy.OAuthSession) error 91 LoadOAuthSession(id string) (*oatproxy.OAuthSession, error) 92 UpdateOAuthSession(id string, session *oatproxy.OAuthSession) error 93 ListOAuthSessions() ([]oatproxy.OAuthSession, error) 94 GetSessionByDID(did string) (*oatproxy.OAuthSession, error) 95 96 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error 97 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) 98 DeleteServerSettings(ctx context.Context, server string, repoDID string) error 99} 100 101func MakeDB(dbURL string) (Model, error) { 102 log.Log(context.Background(), "starting database", "dbURL", dbURL) 103 sqliteSuffix := dbURL 104 if dbURL != ":memory:" { 105 if !strings.HasPrefix(dbURL, "sqlite://") { 106 dbURL = fmt.Sprintf("sqlite://%s", dbURL) 107 } 108 sqliteSuffix := dbURL[len("sqlite://"):] 109 // if this isn't ":memory:", ensure that directory exists (eg, if db 110 // file is being initialized) 111 if !strings.Contains(sqliteSuffix, ":?") { 112 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil { 113 return nil, fmt.Errorf("error creating database path: %w", err) 114 } 115 } 116 } 117 dial := sqlite.Open(sqliteSuffix) 118 119 gormLogger := slogGorm.New( 120 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 121 TimeFormat: time.RFC3339, 122 })), 123 // slogGorm.WithTraceAll(), 124 ) 125 126 db, err := gorm.Open(dial, &gorm.Config{ 127 SkipDefaultTransaction: true, 128 TranslateError: true, 129 Logger: gormLogger, 130 }) 131 if err != nil { 132 return nil, fmt.Errorf("error starting database: %w", err) 133 } 134 err = db.Exec("PRAGMA journal_mode=WAL;").Error 135 if err != nil { 136 return nil, fmt.Errorf("error setting journal mode: %w", err) 137 } 138 sqlDB, err := db.DB() 139 if err != nil { 140 return nil, fmt.Errorf("error getting database: %w", err) 141 } 142 sqlDB.SetMaxOpenConns(1) 143 for _, model := range []any{ 144 Notification{}, 145 PlayerEvent{}, 146 Segment{}, 147 Thumbnail{}, 148 Identity{}, 149 Repo{}, 150 SigningKey{}, 151 Follow{}, 152 FeedPost{}, 153 Livestream{}, 154 Block{}, 155 ChatMessage{}, 156 ChatProfile{}, 157 oatproxy.OAuthSession{}, 158 ServerSettings{}, 159 } { 160 err = db.AutoMigrate(model) 161 if err != nil { 162 return nil, err 163 } 164 } 165 return &DBModel{DB: db}, nil 166}