Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.12 185 lines 6.5 kB view raw
1package model 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "path/filepath" 8 "time" 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "gorm.io/driver/sqlite" 14 "gorm.io/gorm" 15 "gorm.io/plugin/prometheus" 16 "stream.place/streamplace/pkg/config" 17 "stream.place/streamplace/pkg/log" 18 "stream.place/streamplace/pkg/streamplace" 19) 20 21type DBModel struct { 22 DB *gorm.DB 23} 24 25type Model interface { 26 CreatePlayerEvent(event PlayerEventAPI) error 27 ListPlayerEvents(playerID string) ([]PlayerEvent, error) 28 PlayerReport(playerID string) (map[string]any, error) 29 ClearPlayerEvents() error 30 31 CreateSegment(segment *Segment) error 32 MostRecentSegments() ([]Segment, error) 33 LatestSegmentForUser(user string) (*Segment, error) 34 LatestSegmentsForUser(user string, limit int, before *time.Time, after *time.Time) ([]Segment, error) 35 CreateThumbnail(thumb *Thumbnail) error 36 LatestThumbnailForUser(user string) (*Thumbnail, error) 37 GetSegment(id string) (*Segment, error) 38 GetExpiredSegments(ctx context.Context) ([]Segment, error) 39 DeleteSegment(ctx context.Context, id string) error 40 StartSegmentCleaner(ctx context.Context) error 41 SegmentCleaner(ctx context.Context) error 42 43 GetIdentity(id string) (*Identity, error) 44 UpdateIdentity(ident *Identity) error 45 46 GetRepo(did string) (*Repo, error) 47 GetRepoByHandle(handle string) (*Repo, error) 48 GetRepoByHandleOrDID(arg string) (*Repo, error) 49 GetRepoBySigningKey(signingKey string) (*Repo, error) 50 GetAllRepos() ([]Repo, error) 51 UpdateRepo(repo *Repo) error 52 53 UpdateSigningKey(key *SigningKey) error 54 GetSigningKey(ctx context.Context, did, repoDID string) (*SigningKey, error) 55 GetSigningKeyByRKey(ctx context.Context, rkey string) (*SigningKey, error) 56 GetSigningKeysForRepo(repoDID string) ([]SigningKey, error) 57 58 CreateFollow(ctx context.Context, userDID, rev string, follow *bsky.GraphFollow) error 59 GetUserFollowing(ctx context.Context, userDID string) ([]Follow, error) 60 GetUserFollowers(ctx context.Context, userDID string) ([]Follow, error) 61 GetUserFollowingUser(ctx context.Context, userDID, subjectDID string) (*Follow, error) 62 DeleteFollow(ctx context.Context, userDID, rev string) error 63 64 CreateFeedPost(ctx context.Context, post *FeedPost) error 65 ListFeedPosts() ([]FeedPost, error) 66 ListFeedPostsByType(feedType string, limit int, after int64) ([]FeedPost, error) 67 GetFeedPost(uri string) (*FeedPost, error) 68 GetReplies(repoDID string) ([]*bsky.FeedDefs_PostView, error) 69 70 CreateLivestream(ctx context.Context, ls *Livestream) error 71 GetLatestLivestreamForRepo(repoDID string) (*Livestream, error) 72 GetLivestreamByPostURI(postURI string) (*Livestream, error) 73 GetLatestLivestreams(limit int, before *time.Time) ([]Livestream, error) 74 75 CreateBlock(ctx context.Context, block *Block) error 76 GetBlock(ctx context.Context, rkey string) (*Block, error) 77 GetUserBlock(ctx context.Context, userDID, subjectDID string) (*Block, error) 78 DeleteBlock(ctx context.Context, rkey string) error 79 80 CreateChatMessage(ctx context.Context, message *ChatMessage) error 81 MostRecentChatMessages(repoDID string) ([]*streamplace.ChatDefs_MessageView, error) 82 GetChatMessage(uri string) (*ChatMessage, error) 83 DeleteChatMessage(ctx context.Context, uri string, deletedAt *time.Time) error 84 85 CreateGate(ctx context.Context, gate *Gate) error 86 DeleteGate(ctx context.Context, rkey string) error 87 GetGate(ctx context.Context, rkey string) (*Gate, error) 88 GetUserGates(ctx context.Context, userDID string) ([]*Gate, error) 89 90 CreateChatProfile(ctx context.Context, profile *ChatProfile) error 91 GetChatProfile(ctx context.Context, repoDID string) (*ChatProfile, error) 92 93 UpdateServerSettings(ctx context.Context, settings *ServerSettings) error 94 GetServerSettings(ctx context.Context, server string, repoDID string) (*ServerSettings, error) 95 DeleteServerSettings(ctx context.Context, server string, repoDID string) error 96 97 CreateLabeler(did string) (*Labeler, error) 98 GetLabeler(did string) (*Labeler, error) 99 UpdateLabelerCursor(did string, cursor int64) error 100 101 CreateLabel(label *Label) error 102 GetActiveLabels(uri string) ([]*comatproto.LabelDefs_Label, error) 103 104 UpdateBroadcastOrigin(ctx context.Context, origin *streamplace.BroadcastOrigin, aturi syntax.ATURI) error 105 GetRecentBroadcastOrigins(ctx context.Context) ([]*streamplace.BroadcastDefs_BroadcastOriginView, error) 106 107 CreateMetadataConfiguration(ctx context.Context, metadata *MetadataConfiguration) error 108 GetMetadataConfiguration(ctx context.Context, repoDID string) (*MetadataConfiguration, error) 109 DeleteMetadataConfiguration(ctx context.Context, repoDID string) error 110} 111 112var DBRevision = 2 113 114func MakeDB(dbURL string) (Model, error) { 115 sqliteSuffix := dbURL 116 if dbURL != ":memory:" { 117 // Ensure dbURL exists as a directory on the filesystem 118 if err := os.MkdirAll(dbURL, os.ModePerm); err != nil { 119 return nil, fmt.Errorf("error creating database directory: %w", err) 120 } 121 dbPath := filepath.Join(dbURL, fmt.Sprintf("index_%d.sqlite", DBRevision)) 122 sqliteSuffix = dbPath 123 // if this isn't ":memory:", ensure that directory exists (eg, if db 124 // file is being initialized) 125 if err := os.MkdirAll(filepath.Dir(sqliteSuffix), os.ModePerm); err != nil { 126 return nil, fmt.Errorf("error creating database path: %w", err) 127 } 128 } 129 log.Log(context.Background(), "starting database", "dbURL", sqliteSuffix) 130 dial := sqlite.Open(sqliteSuffix) 131 132 db, err := gorm.Open(dial, &gorm.Config{ 133 SkipDefaultTransaction: true, 134 TranslateError: true, 135 Logger: config.GormLogger, 136 }) 137 if err != nil { 138 return nil, fmt.Errorf("error starting database: %w", err) 139 } 140 err = db.Exec("PRAGMA journal_mode=WAL;").Error 141 if err != nil { 142 return nil, fmt.Errorf("error setting journal mode: %w", err) 143 } 144 145 err = db.Use(prometheus.New(prometheus.Config{ 146 DBName: "index", 147 RefreshInterval: 10, 148 StartServer: false, 149 })) 150 if err != nil { 151 return nil, fmt.Errorf("error using prometheus plugin: %w", err) 152 } 153 154 sqlDB, err := db.DB() 155 if err != nil { 156 return nil, fmt.Errorf("error getting database: %w", err) 157 } 158 sqlDB.SetMaxOpenConns(1) 159 for _, model := range []any{ 160 PlayerEvent{}, 161 Segment{}, 162 Thumbnail{}, 163 Identity{}, 164 Repo{}, 165 SigningKey{}, 166 Follow{}, 167 FeedPost{}, 168 Livestream{}, 169 Block{}, 170 ChatMessage{}, 171 ChatProfile{}, 172 Gate{}, 173 ServerSettings{}, 174 Labeler{}, 175 Label{}, 176 BroadcastOrigin{}, 177 MetadataConfiguration{}, 178 } { 179 err = db.AutoMigrate(model) 180 if err != nil { 181 return nil, err 182 } 183 } 184 return &DBModel{DB: db}, nil 185}