Live video on the AT Protocol
1package statedb
2
3import (
4 "bytes"
5 "fmt"
6 "time"
7
8 lexutil "github.com/bluesky-social/indigo/lex/util"
9 "github.com/bluesky-social/indigo/util"
10 "stream.place/streamplace/pkg/spid"
11 "stream.place/streamplace/pkg/streamplace"
12)
13
14const MAX_MULTISTREAM_TARGETS = 100
15const MAX_ACTIVE_MULTISTREAM_TARGETS = 5
16
17type MultistreamTarget struct {
18 URI string `gorm:"column:uri;primarykey"`
19 CID string `gorm:"column:cid;not null"`
20 Active bool `gorm:"column:active"`
21 RepoDID string `gorm:"column:repo_did;not null;index"`
22 MultistreamTarget []byte `gorm:"column:record"`
23}
24
25func (m *MultistreamTarget) TableName() string {
26 return "multistream_targets"
27}
28
29func (state *StatefulDB) CreateMultistreamTarget(input *streamplace.MultistreamCreateTarget_Input, repoDID string) (*streamplace.MultistreamDefs_TargetView, error) {
30 // Check total targets limit
31 var totalCount int64
32 err := state.DB.Model(&MultistreamTarget{}).Where("repo_did = ?", repoDID).Count(&totalCount).Error
33 if err != nil {
34 return nil, fmt.Errorf("failed to count existing targets: %w", err)
35 }
36 if totalCount >= MAX_MULTISTREAM_TARGETS {
37 return nil, fmt.Errorf("maximum number of multistream targets (%d) reached", MAX_MULTISTREAM_TARGETS)
38 }
39
40 // Check active targets limit if this target is active
41 if input.MultistreamTarget.Active {
42 var activeCount int64
43 err := state.DB.Model(&MultistreamTarget{}).Where("repo_did = ? AND active = ?", repoDID, true).Count(&activeCount).Error
44 if err != nil {
45 return nil, fmt.Errorf("failed to count active targets: %w", err)
46 }
47 if activeCount >= MAX_ACTIVE_MULTISTREAM_TARGETS {
48 return nil, fmt.Errorf("maximum number of active multistream targets (%d) reached", MAX_ACTIVE_MULTISTREAM_TARGETS)
49 }
50 }
51
52 // this URI is, of course, a LIE
53 tid := spid.TIDClock.Next()
54 uri := fmt.Sprintf("at://%s/place.stream.multistream.target/%s", repoDID, tid.String())
55
56 cid, err := spid.GetCID(input.MultistreamTarget)
57 if err != nil {
58 return nil, fmt.Errorf("failed to get CID: %w", err)
59 }
60
61 buf := bytes.Buffer{}
62 err = input.MultistreamTarget.MarshalCBOR(&buf)
63 if err != nil {
64 return nil, fmt.Errorf("failed to marshal multistream target: %w", err)
65 }
66
67 dbTarget := &MultistreamTarget{
68 URI: uri,
69 CID: cid.String(),
70 RepoDID: repoDID,
71 MultistreamTarget: buf.Bytes(),
72 Active: input.MultistreamTarget.Active,
73 }
74 err = state.DB.Create(dbTarget).Error
75 if err != nil {
76 return nil, err
77 }
78 return &streamplace.MultistreamDefs_TargetView{
79 Uri: uri,
80 Cid: cid.String(),
81 Record: &lexutil.LexiconTypeDecoder{Val: input.MultistreamTarget},
82 }, nil
83}
84
85func (state *StatefulDB) GetMultistreamTarget(uri string) (*streamplace.MultistreamDefs_TargetView, error) {
86 return nil, nil
87}
88
89type TargetWithEvent struct {
90 MultistreamTarget
91 LatestEventID *string `gorm:"column:latest_event_id"`
92 LatestEventStatus *string `gorm:"column:latest_event_status"`
93 LatestEventMessage *string `gorm:"column:latest_event_message"`
94 LatestEventCreatedAt *time.Time `gorm:"column:latest_event_created_at"`
95}
96
97func (state *StatefulDB) ListMultistreamTargets(repoDID string, limit int, offset int, active *bool) ([]*streamplace.MultistreamDefs_TargetView, error) {
98
99 var targets []TargetWithEvent
100 query := state.DB.Table("multistream_targets").
101 Select("multistream_targets.*, me.id as latest_event_id, me.status as latest_event_status, me.message as latest_event_message, me.created_at as latest_event_created_at").
102 Joins(`LEFT JOIN multistream_events me ON multistream_targets.uri = me.target_uri
103 AND me.created_at = (SELECT MAX(created_at) FROM multistream_events WHERE target_uri = multistream_targets.uri)`).
104 Where("repo_did = ?", repoDID)
105
106 if active != nil {
107 query = query.Where("active = ?", *active)
108 }
109
110 err := query.Limit(limit).
111 Offset(offset).
112 Order("uri ASC").
113 Find(&targets).Error
114 if err != nil {
115 return nil, fmt.Errorf("failed to list multistream targets: %w", err)
116 }
117
118 result := make([]*streamplace.MultistreamDefs_TargetView, len(targets))
119 for i, target := range targets {
120 var multistreamTarget streamplace.MultistreamTarget
121 err = multistreamTarget.UnmarshalCBOR(bytes.NewReader(target.MultistreamTarget.MultistreamTarget))
122 if err != nil {
123 return nil, fmt.Errorf("failed to unmarshal multistream target: %w", err)
124 }
125 cid, err := spid.GetCID(&multistreamTarget)
126 if err != nil {
127 return nil, fmt.Errorf("failed to get CID: %w", err)
128 }
129
130 targetView := &streamplace.MultistreamDefs_TargetView{
131 Uri: target.URI,
132 Cid: cid.String(),
133 Record: &lexutil.LexiconTypeDecoder{Val: &multistreamTarget},
134 }
135
136 // Add the latest event if it exists
137 if target.LatestEventID != nil {
138 event := &streamplace.MultistreamDefs_Event{
139 Status: *target.LatestEventStatus,
140 Message: *target.LatestEventMessage,
141 CreatedAt: target.LatestEventCreatedAt.Format(util.ISO8601),
142 }
143 targetView.LatestEvent = event
144 }
145
146 result[i] = targetView
147 }
148
149 return result, nil
150}
151
152func (state *StatefulDB) UpdateMultistreamTarget(uri string, input *streamplace.MultistreamPutTarget_Input) (*streamplace.MultistreamDefs_TargetView, error) {
153 if input.MultistreamTarget == nil {
154 return nil, fmt.Errorf("multistream target is required")
155 }
156
157 // Get the current target to check repo ownership and current active status
158 var currentTarget MultistreamTarget
159 err := state.DB.Where("uri = ?", uri).First(¤tTarget).Error
160 if err != nil {
161 return nil, fmt.Errorf("multistream target not found")
162 }
163
164 // If updating to active and wasn't previously active, check active targets limit
165 if input.MultistreamTarget.Active && !currentTarget.Active {
166 var activeCount int64
167 err := state.DB.Model(&MultistreamTarget{}).Where("repo_did = ? AND active = ?", currentTarget.RepoDID, true).Count(&activeCount).Error
168 if err != nil {
169 return nil, fmt.Errorf("failed to count active targets: %w", err)
170 }
171 if activeCount >= MAX_ACTIVE_MULTISTREAM_TARGETS {
172 return nil, fmt.Errorf("maximum number of active multistream targets (%d) reached", MAX_ACTIVE_MULTISTREAM_TARGETS)
173 }
174 }
175
176 // Get CID for the updated target
177 cid, err := spid.GetCID(input.MultistreamTarget)
178 if err != nil {
179 return nil, fmt.Errorf("failed to get CID: %w", err)
180 }
181
182 // Marshal the target data
183 buf := bytes.Buffer{}
184 err = input.MultistreamTarget.MarshalCBOR(&buf)
185 if err != nil {
186 return nil, fmt.Errorf("failed to marshal multistream target: %w", err)
187 }
188
189 // Update the database record
190 updates := map[string]interface{}{
191 "cid": cid.String(),
192 "record": buf.Bytes(),
193 "active": input.MultistreamTarget.Active,
194 }
195
196 result := state.DB.Model(&MultistreamTarget{}).Where("uri = ?", uri).Updates(updates)
197 if result.Error != nil {
198 return nil, fmt.Errorf("failed to update multistream target: %w", result.Error)
199 }
200 if result.RowsAffected == 0 {
201 return nil, fmt.Errorf("multistream target not found")
202 }
203
204 return &streamplace.MultistreamDefs_TargetView{
205 Uri: uri,
206 Cid: cid.String(),
207 Record: &lexutil.LexiconTypeDecoder{Val: input.MultistreamTarget},
208 }, nil
209}
210
211func (state *StatefulDB) DeleteMultistreamTarget(uri string) error {
212 result := state.DB.Where("uri = ?", uri).Delete(&MultistreamTarget{})
213 if result.Error != nil {
214 return fmt.Errorf("failed to delete multistream target: %w", result.Error)
215 }
216 if result.RowsAffected == 0 {
217 return fmt.Errorf("multistream target not found")
218 }
219 return nil
220}