Live video on the AT Protocol
at eli/chat-mod-fixes 162 lines 5.6 kB view raw
1package model 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "path/filepath" 8 "time" 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "gorm.io/driver/sqlite" 13 "gorm.io/gorm" 14 "stream.place/streamplace/pkg/config" 15 "stream.place/streamplace/pkg/log" 16 "stream.place/streamplace/pkg/streamplace" 17) 18 19type DBModel struct { 20 DB *gorm.DB 21 CLI *config.CLI 22} 23 24type Model interface { 25 CreatePlayerEvent(event PlayerEventAPI) error 26 ListPlayerEvents(playerID string) ([]PlayerEvent, error) 27 PlayerReport(playerID string) (map[string]any, error) 28 ClearPlayerEvents() error 29 30 CreateSegment(segment *Segment) error 31 MostRecentSegments() ([]Segment, error) 32 LatestSegmentForUser(user string) (*Segment, error) 33 LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) 34 CreateThumbnail(thumb *Thumbnail) error 35 LatestThumbnailForUser(user string) (*Thumbnail, error) 36 GetSegment(id string) (*Segment, error) 37 StartSegmentCleaner(ctx context.Context) error 38 39 GetIdentity(id string) (*Identity, error) 40 UpdateIdentity(ident *Identity) error 41 42 GetRepo(did string) (*Repo, error) 43 GetRepoByHandle(handle string) (*Repo, error) 44 GetRepoByHandleOrDID(arg string) (*Repo, error) 45 GetRepoBySigningKey(signingKey string) (*Repo, error) 46 GetAllRepos() ([]Repo, error) 47 UpdateRepo(repo *Repo) error 48 49 UpdateSigningKey(key *SigningKey) error 50 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error) 51 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error) 52 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error) 53 54 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error 55 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error) 56 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error) 57 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error) 58 DeleteFollow(ctx context.Context, userDID, rev string) error 59 60 CreateFeedPost(ctx context.Context, post *FeedPost) error 61 ListFeedPosts() ([]FeedPost, error) 62 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error) 63 GetFeedPost(uri string) (*FeedPost, error) 64 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error) 65 66 CreateLivestream(ctx context.Context, ls *Livestream) error 67 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) 68 GetLivestreamByPostURI(postURI string) (*Livestream, error) 69 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) 70 71 CreateBlock(ctx context.Context, block *Block) error 72 GetBlock(ctx context.Context, rkey string) (*Block, error) 73 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error) 74 DeleteBlock(ctx context.Context, rkey string) error 75 76 CreateChatMessage(ctx context.Context, message *ChatMessage) error 77 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) 78 GetChatMessage(uri string) (*ChatMessage, error) 79 DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error 80 81 CreateGate(ctx context.Context, gate *Gate) error 82 DeleteGate(ctx context.Context, rkey string) error 83 GetGate(ctx context.Context, rkey string) (*Gate, error) 84 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error) 85 86 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 87 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 88 89 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error 90 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) 91 DeleteServerSettings(ctx context.Context, server string, repoDID string) error 92 93 CreateLabeler(did string) (*Labeler, error) 94 GetLabeler(did string) (*Labeler, error) 95 UpdateLabelerCursor(did string, cursor int64) error 96 97 CreateLabel(label *Label) error 98 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error) 99} 100 101var DBRevision = 2 102 103func MakeDB(dbURL string) (Model, error) { 104 sqliteSuffix := dbURL 105 if dbURL != ":memory:" { 106 // Ensure dbURL exists as a directory on the filesystem 107 if err := os.MkdirAll(dbURL, os.ModePerm); err != nil { 108 return nil, fmt.Errorf("error creating database directory: %w", err) 109 } 110 dbPath := filepath.Join(dbURL, fmt.Sprintf("index_%d.sqlite", DBRevision)) 111 sqliteSuffix = dbPath 112 // if this isn't ":memory:", ensure that directory exists (eg, if db 113 // file is being initialized) 114 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil { 115 return nil, fmt.Errorf("error creating database path: %w", err) 116 } 117 } 118 log.Log(context.Background(), "starting database", "dbURL", sqliteSuffix) 119 dial := sqlite.Open(sqliteSuffix) 120 121 db, err := gorm.Open(dial, &gorm.Config{ 122 SkipDefaultTransaction: true, 123 TranslateError: true, 124 Logger: config.GormLogger, 125 }) 126 if err != nil { 127 return nil, fmt.Errorf("error starting database: %w", err) 128 } 129 err = db.Exec("PRAGMA journal_mode=WAL;").Error 130 if err != nil { 131 return nil, fmt.Errorf("error setting journal mode: %w", err) 132 } 133 sqlDB, err := db.DB() 134 if err != nil { 135 return nil, fmt.Errorf("error getting database: %w", err) 136 } 137 sqlDB.SetMaxOpenConns(1) 138 for _, model := range []any{ 139 PlayerEvent{}, 140 Segment{}, 141 Thumbnail{}, 142 Identity{}, 143 Repo{}, 144 SigningKey{}, 145 Follow{}, 146 FeedPost{}, 147 Livestream{}, 148 Block{}, 149 ChatMessage{}, 150 ChatProfile{}, 151 Gate{}, 152 ServerSettings{}, 153 Labeler{}, 154 Label{}, 155 } { 156 err = db.AutoMigrate(model) 157 if err != nil { 158 return nil, err 159 } 160 } 161 return &DBModel{DB: db}, nil 162}