Live video on the AT Protocol
at restructure 166 lines 5.8 kB view raw
1package model 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "path/filepath" 8 "time" 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "gorm.io/driver/sqlite" 14 "gorm.io/gorm" 15 "stream.place/streamplace/pkg/config" 16 "stream.place/streamplace/pkg/log" 17 "stream.place/streamplace/pkg/streamplace" 18) 19 20type DBModel struct { 21 DB *gorm.DB 22 CLI *config.CLI 23} 24 25type Model interface { 26 CreatePlayerEvent(event PlayerEventAPI) error 27 ListPlayerEvents(playerID string) ([]PlayerEvent, error) 28 PlayerReport(playerID string) (map[string]any, error) 29 ClearPlayerEvents() error 30 31 CreateSegment(segment *Segment) error 32 MostRecentSegments() ([]Segment, error) 33 LatestSegmentForUser(user string) (*Segment, error) 34 LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) 35 CreateThumbnail(thumb *Thumbnail) error 36 LatestThumbnailForUser(user string) (*Thumbnail, error) 37 GetSegment(id string) (*Segment, error) 38 StartSegmentCleaner(ctx context.Context) error 39 40 GetIdentity(id string) (*Identity, error) 41 UpdateIdentity(ident *Identity) error 42 43 GetRepo(did string) (*Repo, error) 44 GetRepoByHandle(handle string) (*Repo, error) 45 GetRepoByHandleOrDID(arg string) (*Repo, error) 46 GetRepoBySigningKey(signingKey string) (*Repo, error) 47 GetAllRepos() ([]Repo, error) 48 UpdateRepo(repo *Repo) error 49 50 UpdateSigningKey(key *SigningKey) error 51 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error) 52 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error) 53 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error) 54 55 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error 56 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error) 57 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error) 58 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error) 59 DeleteFollow(ctx context.Context, userDID, rev string) error 60 61 CreateFeedPost(ctx context.Context, post *FeedPost) error 62 ListFeedPosts() ([]FeedPost, error) 63 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error) 64 GetFeedPost(uri string) (*FeedPost, error) 65 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error) 66 67 CreateLivestream(ctx context.Context, ls *Livestream) error 68 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) 69 GetLivestreamByPostURI(postURI string) (*Livestream, error) 70 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) 71 72 CreateBlock(ctx context.Context, block *Block) error 73 GetBlock(ctx context.Context, rkey string) (*Block, error) 74 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error) 75 DeleteBlock(ctx context.Context, rkey string) error 76 77 CreateChatMessage(ctx context.Context, message *ChatMessage) error 78 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) 79 GetChatMessage(uri string) (*ChatMessage, error) 80 DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error 81 82 CreateGate(ctx context.Context, gate *Gate) error 83 DeleteGate(ctx context.Context, rkey string) error 84 GetGate(ctx context.Context, rkey string) (*Gate, error) 85 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error) 86 87 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 88 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 89 90 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error 91 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) 92 DeleteServerSettings(ctx context.Context, server string, repoDID string) error 93 94 CreateLabeler(did string) (*Labeler, error) 95 GetLabeler(did string) (*Labeler, error) 96 UpdateLabelerCursor(did string, cursor int64) error 97 98 CreateLabel(label *Label) error 99 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error) 100 101 UpdateBroadcastOrigin(ctx context.Context, origin *streamplace.BroadcastOrigin, aturi syntax.ATURI) error 102} 103 104var DBRevision = 2 105 106func MakeDB(dbURL string) (Model, error) { 107 sqliteSuffix := dbURL 108 if dbURL != ":memory:" { 109 // Ensure dbURL exists as a directory on the filesystem 110 if err := os.MkdirAll(dbURL, os.ModePerm); err != nil { 111 return nil, fmt.Errorf("error creating database directory: %w", err) 112 } 113 dbPath := filepath.Join(dbURL, fmt.Sprintf("index_%d.sqlite", DBRevision)) 114 sqliteSuffix = dbPath 115 // if this isn't ":memory:", ensure that directory exists (eg, if db 116 // file is being initialized) 117 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil { 118 return nil, fmt.Errorf("error creating database path: %w", err) 119 } 120 } 121 log.Log(context.Background(), "starting database", "dbURL", sqliteSuffix) 122 dial := sqlite.Open(sqliteSuffix) 123 124 db, err := gorm.Open(dial, &gorm.Config{ 125 SkipDefaultTransaction: true, 126 TranslateError: true, 127 Logger: config.GormLogger, 128 }) 129 if err != nil { 130 return nil, fmt.Errorf("error starting database: %w", err) 131 } 132 err = db.Exec("PRAGMA journal_mode=WAL;").Error 133 if err != nil { 134 return nil, fmt.Errorf("error setting journal mode: %w", err) 135 } 136 sqlDB, err := db.DB() 137 if err != nil { 138 return nil, fmt.Errorf("error getting database: %w", err) 139 } 140 sqlDB.SetMaxOpenConns(1) 141 for _, model := range []any{ 142 PlayerEvent{}, 143 Segment{}, 144 Thumbnail{}, 145 Identity{}, 146 Repo{}, 147 SigningKey{}, 148 Follow{}, 149 FeedPost{}, 150 Livestream{}, 151 Block{}, 152 ChatMessage{}, 153 ChatProfile{}, 154 Gate{}, 155 ServerSettings{}, 156 Labeler{}, 157 Label{}, 158 BroadcastOrigin{}, 159 } { 160 err = db.AutoMigrate(model) 161 if err != nil { 162 return nil, err 163 } 164 } 165 return &DBModel{DB: db}, nil 166}