Live video on the AT Protocol
at natb/command-errors 220 lines 7.5 kB view raw
1package statedb 2 3import ( 4 "bytes" 5 "fmt" 6 "time" 7 8 lexutil "github.com/bluesky-social/indigo/lex/util" 9 "github.com/bluesky-social/indigo/util" 10 "stream.place/streamplace/pkg/spid" 11 "stream.place/streamplace/pkg/streamplace" 12) 13 14const MAX_MULTISTREAM_TARGETS = 100 15const MAX_ACTIVE_MULTISTREAM_TARGETS = 5 16 17type MultistreamTarget struct { 18 URI string `gorm:"column:uri;primarykey"` 19 CID string `gorm:"column:cid;not null"` 20 Active bool `gorm:"column:active"` 21 RepoDID string `gorm:"column:repo_did;not null;index"` 22 MultistreamTarget []byte `gorm:"column:record"` 23} 24 25func (m *MultistreamTarget) TableName() string { 26 return "multistream_targets" 27} 28 29func (state *StatefulDB) CreateMultistreamTarget(input *streamplace.MultistreamCreateTarget_Input, repoDID string) (*streamplace.MultistreamDefs_TargetView, error) { 30 // Check total targets limit 31 var totalCount int64 32 err := state.DB.Model(&MultistreamTarget{}).Where("repo_did = ?", repoDID).Count(&totalCount).Error 33 if err != nil { 34 return nil, fmt.Errorf("failed to count existing targets: %w", err) 35 } 36 if totalCount >= MAX_MULTISTREAM_TARGETS { 37 return nil, fmt.Errorf("maximum number of multistream targets (%d) reached", MAX_MULTISTREAM_TARGETS) 38 } 39 40 // Check active targets limit if this target is active 41 if input.MultistreamTarget.Active { 42 var activeCount int64 43 err := state.DB.Model(&MultistreamTarget{}).Where("repo_did = ? AND active = ?", repoDID, true).Count(&activeCount).Error 44 if err != nil { 45 return nil, fmt.Errorf("failed to count active targets: %w", err) 46 } 47 if activeCount >= MAX_ACTIVE_MULTISTREAM_TARGETS { 48 return nil, fmt.Errorf("maximum number of active multistream targets (%d) reached", MAX_ACTIVE_MULTISTREAM_TARGETS) 49 } 50 } 51 52 // this URI is, of course, a LIE 53 tid := spid.TIDClock.Next() 54 uri := fmt.Sprintf("at://%s/place.stream.multistream.target/%s", repoDID, tid.String()) 55 56 cid, err := spid.GetCID(input.MultistreamTarget) 57 if err != nil { 58 return nil, fmt.Errorf("failed to get CID: %w", err) 59 } 60 61 buf := bytes.Buffer{} 62 err = input.MultistreamTarget.MarshalCBOR(&buf) 63 if err != nil { 64 return nil, fmt.Errorf("failed to marshal multistream target: %w", err) 65 } 66 67 dbTarget := &MultistreamTarget{ 68 URI: uri, 69 CID: cid.String(), 70 RepoDID: repoDID, 71 MultistreamTarget: buf.Bytes(), 72 Active: input.MultistreamTarget.Active, 73 } 74 err = state.DB.Create(dbTarget).Error 75 if err != nil { 76 return nil, err 77 } 78 return &streamplace.MultistreamDefs_TargetView{ 79 Uri: uri, 80 Cid: cid.String(), 81 Record: &lexutil.LexiconTypeDecoder{Val: input.MultistreamTarget}, 82 }, nil 83} 84 85func (state *StatefulDB) GetMultistreamTarget(uri string) (*streamplace.MultistreamDefs_TargetView, error) { 86 return nil, nil 87} 88 89type TargetWithEvent struct { 90 MultistreamTarget 91 LatestEventID *string `gorm:"column:latest_event_id"` 92 LatestEventStatus *string `gorm:"column:latest_event_status"` 93 LatestEventMessage *string `gorm:"column:latest_event_message"` 94 LatestEventCreatedAt *time.Time `gorm:"column:latest_event_created_at"` 95} 96 97func (state *StatefulDB) ListMultistreamTargets(repoDID string, limit int, offset int, active *bool) ([]*streamplace.MultistreamDefs_TargetView, error) { 98 99 var targets []TargetWithEvent 100 query := state.DB.Table("multistream_targets"). 101 Select("multistream_targets.*, me.id as latest_event_id, me.status as latest_event_status, me.message as latest_event_message, me.created_at as latest_event_created_at"). 102 Joins(`LEFT JOIN multistream_events me ON multistream_targets.uri = me.target_uri 103 AND me.created_at = (SELECT MAX(created_at) FROM multistream_events WHERE target_uri = multistream_targets.uri)`). 104 Where("repo_did = ?", repoDID) 105 106 if active != nil { 107 query = query.Where("active = ?", *active) 108 } 109 110 err := query.Limit(limit). 111 Offset(offset). 112 Order("uri ASC"). 113 Find(&targets).Error 114 if err != nil { 115 return nil, fmt.Errorf("failed to list multistream targets: %w", err) 116 } 117 118 result := make([]*streamplace.MultistreamDefs_TargetView, len(targets)) 119 for i, target := range targets { 120 var multistreamTarget streamplace.MultistreamTarget 121 err = multistreamTarget.UnmarshalCBOR(bytes.NewReader(target.MultistreamTarget.MultistreamTarget)) 122 if err != nil { 123 return nil, fmt.Errorf("failed to unmarshal multistream target: %w", err) 124 } 125 cid, err := spid.GetCID(&multistreamTarget) 126 if err != nil { 127 return nil, fmt.Errorf("failed to get CID: %w", err) 128 } 129 130 targetView := &streamplace.MultistreamDefs_TargetView{ 131 Uri: target.URI, 132 Cid: cid.String(), 133 Record: &lexutil.LexiconTypeDecoder{Val: &multistreamTarget}, 134 } 135 136 // Add the latest event if it exists 137 if target.LatestEventID != nil { 138 event := &streamplace.MultistreamDefs_Event{ 139 Status: *target.LatestEventStatus, 140 Message: *target.LatestEventMessage, 141 CreatedAt: target.LatestEventCreatedAt.Format(util.ISO8601), 142 } 143 targetView.LatestEvent = event 144 } 145 146 result[i] = targetView 147 } 148 149 return result, nil 150} 151 152func (state *StatefulDB) UpdateMultistreamTarget(uri string, input *streamplace.MultistreamPutTarget_Input) (*streamplace.MultistreamDefs_TargetView, error) { 153 if input.MultistreamTarget == nil { 154 return nil, fmt.Errorf("multistream target is required") 155 } 156 157 // Get the current target to check repo ownership and current active status 158 var currentTarget MultistreamTarget 159 err := state.DB.Where("uri = ?", uri).First(&currentTarget).Error 160 if err != nil { 161 return nil, fmt.Errorf("multistream target not found") 162 } 163 164 // If updating to active and wasn't previously active, check active targets limit 165 if input.MultistreamTarget.Active && !currentTarget.Active { 166 var activeCount int64 167 err := state.DB.Model(&MultistreamTarget{}).Where("repo_did = ? AND active = ?", currentTarget.RepoDID, true).Count(&activeCount).Error 168 if err != nil { 169 return nil, fmt.Errorf("failed to count active targets: %w", err) 170 } 171 if activeCount >= MAX_ACTIVE_MULTISTREAM_TARGETS { 172 return nil, fmt.Errorf("maximum number of active multistream targets (%d) reached", MAX_ACTIVE_MULTISTREAM_TARGETS) 173 } 174 } 175 176 // Get CID for the updated target 177 cid, err := spid.GetCID(input.MultistreamTarget) 178 if err != nil { 179 return nil, fmt.Errorf("failed to get CID: %w", err) 180 } 181 182 // Marshal the target data 183 buf := bytes.Buffer{} 184 err = input.MultistreamTarget.MarshalCBOR(&buf) 185 if err != nil { 186 return nil, fmt.Errorf("failed to marshal multistream target: %w", err) 187 } 188 189 // Update the database record 190 updates := map[string]interface{}{ 191 "cid": cid.String(), 192 "record": buf.Bytes(), 193 "active": input.MultistreamTarget.Active, 194 } 195 196 result := state.DB.Model(&MultistreamTarget{}).Where("uri = ?", uri).Updates(updates) 197 if result.Error != nil { 198 return nil, fmt.Errorf("failed to update multistream target: %w", result.Error) 199 } 200 if result.RowsAffected == 0 { 201 return nil, fmt.Errorf("multistream target not found") 202 } 203 204 return &streamplace.MultistreamDefs_TargetView{ 205 Uri: uri, 206 Cid: cid.String(), 207 Record: &lexutil.LexiconTypeDecoder{Val: input.MultistreamTarget}, 208 }, nil 209} 210 211func (state *StatefulDB) DeleteMultistreamTarget(uri string) error { 212 result := state.DB.Where("uri = ?", uri).Delete(&MultistreamTarget{}) 213 if result.Error != nil { 214 return fmt.Errorf("failed to delete multistream target: %w", result.Error) 215 } 216 if result.RowsAffected == 0 { 217 return fmt.Errorf("multistream target not found") 218 } 219 return nil 220}