Live video on the AT Protocol
at eli/sync-tangled 145 lines 4.5 kB view raw
1package model 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "path/filepath" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/lmittmann/tint" 13 slogGorm "github.com/orandin/slog-gorm" 14 "gorm.io/driver/sqlite" 15 "gorm.io/gorm" 16 "stream.place/streamplace/pkg/log" 17 "stream.place/streamplace/pkg/streamplace" 18) 19 20type DBModel struct { 21 DB *gorm.DB 22} 23 24type Model interface { 25 CreateNotification(token, repoDID string) error 26 ListNotifications() ([]Notification, error) 27 28 CreatePlayerEvent(event PlayerEventAPI) error 29 ListPlayerEvents(playerId string) ([]PlayerEvent, error) 30 PlayerReport(playerId string) (map[string]any, error) 31 ClearPlayerEvents() error 32 33 CreateSegment(segment *Segment) error 34 MostRecentSegments() ([]Segment, error) 35 LatestSegmentForUser(user string) (*Segment, error) 36 CreateThumbnail(thumb *Thumbnail) error 37 LatestThumbnailForUser(user string) (*Thumbnail, error) 38 GetSegment(id string) (*Segment, error) 39 StartSegmentCleaner(ctx context.Context) error 40 41 GetIdentity(id string) (*Identity, error) 42 UpdateIdentity(ident *Identity) error 43 44 GetRepo(did string) (*Repo, error) 45 GetRepoByHandle(handle string) (*Repo, error) 46 GetRepoByHandleOrDID(arg string) (*Repo, error) 47 GetRepoBySigningKey(signingKey string) (*Repo, error) 48 UpdateRepo(repo *Repo) error 49 50 UpdateSigningKey(key *SigningKey) error 51 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error) 52 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error) 53 54 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error 55 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error) 56 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error) 57 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error) 58 DeleteFollow(ctx context.Context, userDID, rev string) error 59 GetFollowersNotificationTokens(userDID string) ([]string, error) 60 61 CreateFeedPost(ctx context.Context, post *FeedPost) error 62 ListFeedPosts() ([]FeedPost, error) 63 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error) 64 GetFeedPost(cid string) (*FeedPost, error) 65 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error) 66 67 CreateLivestream(ctx context.Context, ls *Livestream) error 68 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) 69 GetLivestreamByPostCID(postCID string) (*Livestream, error) 70 71 CreateBlock(ctx context.Context, block *Block) error 72 GetBlock(ctx context.Context, rkey string) (*Block, error) 73 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error) 74 DeleteBlock(ctx context.Context, rkey string) error 75 76 CreateChatMessage(ctx context.Context, message *ChatMessage) error 77 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) 78 GetChatMessage(cid string) (*ChatMessage, error) 79 80 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 81 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 82} 83 84func MakeDB(dbURL string) (Model, error) { 85 log.Log(context.Background(), "starting database", "dbURL", dbURL) 86 sqliteSuffix := dbURL 87 if dbURL != ":memory:" { 88 if !strings.HasPrefix(dbURL, "sqlite://") { 89 dbURL = fmt.Sprintf("sqlite://%s", dbURL) 90 } 91 sqliteSuffix := dbURL[len("sqlite://"):] 92 // if this isn't ":memory:", ensure that directory exists (eg, if db 93 // file is being initialized) 94 if !strings.Contains(sqliteSuffix, ":?") { 95 os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm) 96 } 97 } 98 dial := sqlite.Open(sqliteSuffix) 99 100 gormLogger := slogGorm.New( 101 slogGorm.WithHandler(tint.NewHandler(os.Stderr, &tint.Options{ 102 TimeFormat: time.RFC3339, 103 })), 104 // slogGorm.WithTraceAll(), 105 ) 106 107 db, err := gorm.Open(dial, &gorm.Config{ 108 SkipDefaultTransaction: true, 109 TranslateError: true, 110 Logger: gormLogger, 111 }) 112 if err != nil { 113 return nil, fmt.Errorf("error starting database: %w", err) 114 } 115 err = db.Exec("PRAGMA journal_mode=WAL;").Error 116 if err != nil { 117 return nil, fmt.Errorf("error setting journal mode: %w", err) 118 } 119 sqlDB, err := db.DB() 120 if err != nil { 121 return nil, fmt.Errorf("error getting database: %w", err) 122 } 123 sqlDB.SetMaxOpenConns(1) 124 for _, model := range []any{ 125 Notification{}, 126 PlayerEvent{}, 127 Segment{}, 128 Thumbnail{}, 129 Identity{}, 130 Repo{}, 131 SigningKey{}, 132 Follow{}, 133 FeedPost{}, 134 Livestream{}, 135 Block{}, 136 ChatMessage{}, 137 ChatProfile{}, 138 } { 139 err = db.AutoMigrate(model) 140 if err != nil { 141 return nil, err 142 } 143 } 144 return &DBModel{DB: db}, nil 145}