Live video on the AT Protocol
at eli/postgres 170 lines 5.6 kB view raw
1package model 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "path/filepath" 8 "strings" 9 "time" 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/api/bsky" 13 "github.com/lmittmann/tint" 14 slogGorm "github.com/orandin/slog-gorm" 15 "gorm.io/driver/sqlite" 16 "gorm.io/gorm" 17 "stream.place/streamplace/pkg/config" 18 "stream.place/streamplace/pkg/log" 19 "stream.place/streamplace/pkg/streamplace" 20) 21 22type DBModel struct { 23 DB *gorm.DB 24 CLI *config.CLI 25} 26 27type Model interface { 28 CreatePlayerEvent(event PlayerEventAPI) error 29 ListPlayerEvents(playerID string) ([]PlayerEvent, error) 30 PlayerReport(playerID string) (map[string]any, error) 31 ClearPlayerEvents() error 32 33 CreateSegment(segment *Segment) error 34 MostRecentSegments() ([]Segment, error) 35 LatestSegmentForUser(user string) (*Segment, error) 36 LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) 37 CreateThumbnail(thumb *Thumbnail) error 38 LatestThumbnailForUser(user string) (*Thumbnail, error) 39 GetSegment(id string) (*Segment, error) 40 StartSegmentCleaner(ctx context.Context) error 41 42 GetIdentity(id string) (*Identity, error) 43 UpdateIdentity(ident *Identity) error 44 45 GetRepo(did string) (*Repo, error) 46 GetRepoByHandle(handle string) (*Repo, error) 47 GetRepoByHandleOrDID(arg string) (*Repo, error) 48 GetRepoBySigningKey(signingKey string) (*Repo, error) 49 GetAllRepos() ([]Repo, error) 50 UpdateRepo(repo *Repo) error 51 52 UpdateSigningKey(key *SigningKey) error 53 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error) 54 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error) 55 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error) 56 57 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error 58 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error) 59 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error) 60 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error) 61 DeleteFollow(ctx context.Context, userDID, rev string) error 62 63 CreateFeedPost(ctx context.Context, post *FeedPost) error 64 ListFeedPosts() ([]FeedPost, error) 65 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error) 66 GetFeedPost(cid string) (*FeedPost, error) 67 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error) 68 69 CreateLivestream(ctx context.Context, ls *Livestream) error 70 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) 71 GetLivestreamByPostCID(postCID string) (*Livestream, error) 72 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) 73 74 CreateBlock(ctx context.Context, block *Block) error 75 GetBlock(ctx context.Context, rkey string) (*Block, error) 76 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error) 77 DeleteBlock(ctx context.Context, rkey string) error 78 79 CreateChatMessage(ctx context.Context, message *ChatMessage) error 80 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) 81 GetChatMessage(cid string) (*ChatMessage, error) 82 83 CreateGate(ctx context.Context, gate *Gate) error 84 DeleteGate(ctx context.Context, rkey string) error 85 GetGate(ctx context.Context, rkey string) (*Gate, error) 86 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error) 87 88 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 89 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 90 91 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error 92 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) 93 DeleteServerSettings(ctx context.Context, server string, repoDID string) error 94 95 CreateLabeler(did string) (*Labeler, error) 96 GetLabeler(did string) (*Labeler, error) 97 UpdateLabelerCursor(did string, cursor int64) error 98 99 CreateLabel(label *Label) error 100 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error) 101} 102 103func MakeDB(dbURL string) (Model, error) { 104 log.Log(context.Background(), "starting database", "dbURL", dbURL) 105 sqliteSuffix := dbURL 106 if dbURL != ":memory:" { 107 if !strings.HasPrefix(dbURL, "sqlite://") { 108 dbURL = fmt.Sprintf("sqlite://%s", dbURL) 109 } 110 sqliteSuffix := dbURL[len("sqlite://"):] 111 // if this isn't ":memory:", ensure that directory exists (eg, if db 112 // file is being initialized) 113 if !strings.Contains(sqliteSuffix, ":?") { 114 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil { 115 return nil, fmt.Errorf("error creating database path: %w", err) 116 } 117 } 118 } 119 dial := sqlite.Open(sqliteSuffix) 120 121 gormLogger := slogGorm.New( 122 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 123 TimeFormat: time.RFC3339, 124 })), 125 // slogGorm.WithTraceAll(), 126 ) 127 128 db, err := gorm.Open(dial, &gorm.Config{ 129 SkipDefaultTransaction: true, 130 TranslateError: true, 131 Logger: gormLogger, 132 }) 133 if err != nil { 134 return nil, fmt.Errorf("error starting database: %w", err) 135 } 136 err = db.Exec("PRAGMA journal_mode=WAL;").Error 137 if err != nil { 138 return nil, fmt.Errorf("error setting journal mode: %w", err) 139 } 140 sqlDB, err := db.DB() 141 if err != nil { 142 return nil, fmt.Errorf("error getting database: %w", err) 143 } 144 sqlDB.SetMaxOpenConns(1) 145 for _, model := range []any{ 146 PlayerEvent{}, 147 Segment{}, 148 Thumbnail{}, 149 Identity{}, 150 Repo{}, 151 SigningKey{}, 152 Follow{}, 153 FeedPost{}, 154 Livestream{}, 155 Block{}, 156 ChatMessage{}, 157 ChatProfile{}, 158 Gate{}, 159 ServerSettings{}, 160 161 Labeler{}, 162 Label{}, 163 } { 164 err = db.AutoMigrate(model) 165 if err != nil { 166 return nil, err 167 } 168 } 169 return &DBModel{DB: db}, nil 170}