1package server
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "strings"
8 "time"
9
10 "github.com/gocql/gocql"
11 vyletdatabase "github.com/vylet-app/go/database/proto"
12 "github.com/vylet-app/go/internal/helpers"
13 "google.golang.org/protobuf/types/known/timestamppb"
14)
15
16func (s *Server) CreateFollow(ctx context.Context, req *vyletdatabase.CreateFollowRequest) (*vyletdatabase.CreateFollowResponse, error) {
17 logger := s.logger.With("name", "CreateFollow", "uri", req.Follow.Uri, "did", req.Follow.AuthorDid, "subjectDid", req.Follow.SubjectDid)
18
19 now := time.Now().UTC()
20
21 batch := s.cqlSession.NewBatch(gocql.LoggedBatch).WithContext(ctx)
22
23 args := []any{
24 req.Follow.Uri,
25 req.Follow.Cid,
26 req.Follow.SubjectDid,
27 req.Follow.AuthorDid,
28 req.Follow.CreatedAt.AsTime(),
29 now,
30 }
31
32 query := `
33 INSERT INTO %s
34 (uri, cid, subject_did, author_did, created_at, indexed_at)
35 VALUES
36 (?, ?, ?, ?, ?, ?)
37 `
38
39 batch.Query(fmt.Sprintf(query, "follows_by_subject_did"), args...)
40 batch.Query(fmt.Sprintf(query, "follows_by_author_did"), args...)
41 batch.Query(fmt.Sprintf(query, "follows_by_uri"), args...)
42 batch.Query(fmt.Sprintf(query, "follows_by_author_did_subject_did"), args...)
43
44 if err := s.cqlSession.ExecuteBatch(batch); err != nil {
45 logger.Error("failed to create follow", "err", err)
46 return &vyletdatabase.CreateFollowResponse{
47 Error: helpers.ToStringPtr(err.Error()),
48 }, nil
49 }
50
51 if err := s.cqlSession.Query(`
52 UPDATE follow_counts
53 SET follows_count = follows_count + 1
54 WHERE did = ?
55 `, req.Follow.AuthorDid).WithContext(ctx).Exec(); err != nil {
56 logger.Error("failed to increment follows count", "err", err)
57 return &vyletdatabase.CreateFollowResponse{
58 Error: helpers.ToStringPtr(err.Error()),
59 }, nil
60 }
61
62 if err := s.cqlSession.Query(`
63 UPDATE follow_counts
64 SET followers_count = followers_count + 1
65 WHERE did = ?
66 `, req.Follow.SubjectDid).WithContext(ctx).Exec(); err != nil {
67 logger.Error("failed to increment followers count", "err", err)
68 return &vyletdatabase.CreateFollowResponse{
69 Error: helpers.ToStringPtr(err.Error()),
70 }, nil
71 }
72
73 return &vyletdatabase.CreateFollowResponse{}, nil
74}
75
76func (s *Server) DeleteFollow(ctx context.Context, req *vyletdatabase.DeleteFollowRequest) (*vyletdatabase.DeleteFollowResponse, error) {
77 logger := s.logger.With("name", "DeleteFollow", "uri", req.Uri)
78
79 var (
80 createdAt time.Time
81 subjectDid string
82 authorDid string
83 )
84
85 query := `
86 SELECT created_at, subject_did, author_did
87 FROM follows_by_uri
88 WHERE uri = ?
89 `
90 if err := s.cqlSession.Query(query, req.Uri).WithContext(ctx).Scan(&createdAt, &subjectDid, &authorDid); err != nil {
91 if err == gocql.ErrNotFound {
92 logger.Warn("follow not found", "uri", req.Uri)
93 return &vyletdatabase.DeleteFollowResponse{
94 Error: helpers.ToStringPtr("follow not found"),
95 }, nil
96 }
97 logger.Error("failed to fetch follow", "uri", req.Uri, "err", err)
98 return &vyletdatabase.DeleteFollowResponse{
99 Error: helpers.ToStringPtr(err.Error()),
100 }, nil
101 }
102
103 logger = logger.With("authorDid", authorDid, "subjectDid", subjectDid)
104
105 batch := s.cqlSession.NewBatch(gocql.LoggedBatch).WithContext(ctx)
106
107 batch.Query(`
108 DELETE FROM follows_by_uri
109 WHERE uri = ?
110 `, req.Uri)
111
112 batch.Query(`
113 DELETE FROM follows_by_subject_did
114 WHERE subject_did = ? AND created_at = ? AND uri = ?
115 `, subjectDid, createdAt, req.Uri)
116
117 batch.Query(`
118 DELETE FROM follows_by_author_did
119 WHERE author_did = ? AND created_at = ? AND uri = ?
120 `, authorDid, createdAt, req.Uri)
121
122 batch.Query(`
123 DELETE FROM follows_by_author_did_subject_did
124 WHERE author_did = ? AND subject_did = ?
125 `, authorDid, subjectDid)
126
127 if err := s.cqlSession.ExecuteBatch(batch); err != nil {
128 logger.Error("failed to delete follow", "uri", req.Uri, "err", err)
129 return &vyletdatabase.DeleteFollowResponse{
130 Error: helpers.ToStringPtr(err.Error()),
131 }, nil
132 }
133
134 if err := s.cqlSession.Query(`
135 UPDATE follow_counts
136 SET follows_count = follows_count - 1
137 WHERE did = ?
138 `, authorDid).WithContext(ctx).Exec(); err != nil {
139 logger.Error("failed to decrement follows count", "err", err)
140 return &vyletdatabase.DeleteFollowResponse{
141 Error: helpers.ToStringPtr(err.Error()),
142 }, nil
143 }
144
145 if err := s.cqlSession.Query(`
146 UPDATE follow_counts
147 SET followers_count = followers_count - 1
148 WHERE did = ?
149 `, subjectDid).WithContext(ctx).Exec(); err != nil {
150 logger.Error("failed to decrement followers count", "err", err)
151 return &vyletdatabase.DeleteFollowResponse{
152 Error: helpers.ToStringPtr(err.Error()),
153 }, nil
154 }
155
156 return &vyletdatabase.DeleteFollowResponse{}, nil
157}
158
159func (s *Server) GetFollowsByActor(ctx context.Context, req *vyletdatabase.GetFollowsByActorRequest) (*vyletdatabase.GetFollowsByActorResponse, error) {
160 logger := s.logger.With("name", "GetFollowsByActor", "did", req.Did)
161
162 if req.Limit <= 0 {
163 return nil, fmt.Errorf("limit must be greater than 0")
164 }
165
166 var (
167 query string
168 args []any
169 )
170
171 if req.Cursor != nil && *req.Cursor != "" {
172 cursorParts := strings.SplitN(*req.Cursor, "|", 2)
173 if len(cursorParts) != 2 {
174 logger.Error("invalid cursor format", "cursor", *req.Cursor)
175 return &vyletdatabase.GetFollowsByActorResponse{
176 Error: helpers.ToStringPtr("invalid cursor format"),
177 }, nil
178 }
179
180 cursorTime, err := time.Parse(time.RFC3339Nano, cursorParts[0])
181 if err != nil {
182 logger.Error("failed to parse cursor timestamp", "cursor", *req.Cursor, "err", err)
183 return &vyletdatabase.GetFollowsByActorResponse{
184 Error: helpers.ToStringPtr("invalid cursor format"),
185 }, nil
186 }
187 cursorUri := cursorParts[1]
188
189 query = `
190 SELECT uri, cid, subject_did, author_did, created_at, indexed_at
191 FROM follows_by_author_did
192 WHERE author_did = ? AND (created_at, uri) < (?, ?)
193 ORDER BY created_at DESC, uri ASC
194 LIMIT ?
195 `
196 args = []any{req.Did, cursorTime, cursorUri, req.Limit + 1}
197 } else {
198 query = `
199 SELECT uri, cid, subject_did, author_did, created_at, indexed_at
200 FROM follows_by_author_did
201 WHERE author_did = ?
202 ORDER BY created_at DESC, uri ASC
203 LIMIT ?
204 `
205 args = []any{req.Did, req.Limit}
206 }
207
208 iter := s.cqlSession.Query(query, args...).WithContext(ctx).Iter()
209 defer iter.Close()
210
211 var follows []*vyletdatabase.Follow
212
213 var (
214 createdAt time.Time
215 indexedAt time.Time
216 )
217 for {
218 follow := &vyletdatabase.Follow{}
219 if !iter.Scan(
220 &follow.Uri,
221 &follow.Cid,
222 &follow.SubjectDid,
223 &follow.AuthorDid,
224 &createdAt,
225 &indexedAt,
226 ) {
227 break
228 }
229 follow.CreatedAt = timestamppb.New(createdAt)
230 follow.IndexedAt = timestamppb.New(indexedAt)
231
232 follows = append(follows, follow)
233 }
234 if err := iter.Close(); err != nil {
235 logger.Error("failed to iterate follows", "err", err)
236 return &vyletdatabase.GetFollowsByActorResponse{
237 Error: helpers.ToStringPtr(err.Error()),
238 }, nil
239 }
240
241 var nextCursor *string
242 if len(follows) > int(req.Limit) {
243 follows = follows[:req.Limit]
244 last := follows[len(follows)-1]
245 cursorStr := fmt.Sprintf("%s|%s",
246 last.CreatedAt.AsTime().Format(time.RFC3339Nano),
247 last.Uri)
248 nextCursor = &cursorStr
249 }
250
251 return &vyletdatabase.GetFollowsByActorResponse{
252 Follows: follows,
253 Cursor: nextCursor,
254 }, nil
255}
256
257func (s *Server) GetFollowersByActor(ctx context.Context, req *vyletdatabase.GetFollowersByActorRequest) (*vyletdatabase.GetFollowersByActorResponse, error) {
258 logger := s.logger.With("name", "GetFollowersByActor", "did", req.Did)
259
260 if req.Limit <= 0 {
261 return nil, fmt.Errorf("limit must be greater than 0")
262 }
263
264 var (
265 query string
266 args []any
267 )
268
269 if req.Cursor != nil && *req.Cursor != "" {
270 cursorParts := strings.SplitN(*req.Cursor, "|", 2)
271 if len(cursorParts) != 2 {
272 logger.Error("invalid cursor format", "cursor", *req.Cursor)
273 return &vyletdatabase.GetFollowersByActorResponse{
274 Error: helpers.ToStringPtr("invalid cursor format"),
275 }, nil
276 }
277
278 cursorTime, err := time.Parse(time.RFC3339Nano, cursorParts[0])
279 if err != nil {
280 logger.Error("failed to parse cursor timestamp", "cursor", *req.Cursor, "err", err)
281 return &vyletdatabase.GetFollowersByActorResponse{
282 Error: helpers.ToStringPtr("invalid cursor format"),
283 }, nil
284 }
285 cursorUri := cursorParts[1]
286
287 query = `
288 SELECT uri, cid, subject_did, author_did, created_at, indexed_at
289 FROM follows_by_subject_did
290 WHERE subject_did = ? AND (created_at, uri) < (?, ?)
291 ORDER BY created_at DESC, uri ASC
292 LIMIT ?
293 `
294 args = []any{req.Did, cursorTime, cursorUri, req.Limit + 1}
295 } else {
296 query = `
297 SELECT uri, cid, subject_did, author_did, created_at, indexed_at
298 FROM follows_by_subject_did
299 WHERE subject_did = ?
300 ORDER BY created_at DESC, uri ASC
301 LIMIT ?
302 `
303 args = []any{req.Did, req.Limit}
304 }
305
306 iter := s.cqlSession.Query(query, args...).WithContext(ctx).Iter()
307 defer iter.Close()
308
309 var follows []*vyletdatabase.Follow
310
311 var (
312 createdAt time.Time
313 indexedAt time.Time
314 )
315 for {
316 follow := &vyletdatabase.Follow{}
317 if !iter.Scan(
318 &follow.Uri,
319 &follow.Cid,
320 &follow.SubjectDid,
321 &follow.AuthorDid,
322 &createdAt,
323 &indexedAt,
324 ) {
325 break
326 }
327 follow.CreatedAt = timestamppb.New(createdAt)
328 follow.IndexedAt = timestamppb.New(indexedAt)
329
330 follows = append(follows, follow)
331 }
332 if err := iter.Close(); err != nil {
333 logger.Error("failed to iterate follows", "err", err)
334 return &vyletdatabase.GetFollowersByActorResponse{
335 Error: helpers.ToStringPtr(err.Error()),
336 }, nil
337 }
338
339 var nextCursor *string
340 if len(follows) > int(req.Limit) {
341 follows = follows[:req.Limit]
342 last := follows[len(follows)-1]
343 cursorStr := fmt.Sprintf("%s|%s",
344 last.CreatedAt.AsTime().Format(time.RFC3339Nano),
345 last.Uri)
346 nextCursor = &cursorStr
347 }
348
349 return &vyletdatabase.GetFollowersByActorResponse{
350 Followers: follows,
351 Cursor: nextCursor,
352 }, nil
353}
354
355func (s *Server) GetFollowForAuthorSubject(ctx context.Context, req *vyletdatabase.GetFollowForAuthorSubjectRequest) (*vyletdatabase.GetFollowForAuthorSubjectResponse, error) {
356 logger := s.logger.With("name", "GetFollowForAuthorSubject", "authorDid", req.AuthorDid, "subjectDid", req.SubjectDid)
357
358 args := []any{req.AuthorDid, req.SubjectDid}
359
360 query := `
361 SELECT uri, cid, subject_did, author_did, created_at, indexed_at
362 FROM follows_by_author_did_subject_did
363 WHERE author_did = ? AND subject_did = ?
364 `
365
366 follow := &vyletdatabase.Follow{}
367 var (
368 createdAt time.Time
369 indexedAt time.Time
370 )
371 if err := s.cqlSession.Query(query, args...).WithContext(ctx).Scan(
372 &follow.Uri,
373 &follow.Cid,
374 &follow.SubjectDid,
375 &follow.AuthorDid,
376 &createdAt,
377 &indexedAt,
378 ); err != nil {
379 if errors.Is(err, gocql.ErrNotFound) {
380 return &vyletdatabase.GetFollowForAuthorSubjectResponse{}, nil
381 }
382
383 logger.Error("error finding follow", "err", err)
384 return &vyletdatabase.GetFollowForAuthorSubjectResponse{
385 Error: helpers.ToStringPtr(err.Error()),
386 }, nil
387 }
388 follow.CreatedAt = timestamppb.New(createdAt)
389 follow.IndexedAt = timestamppb.New(indexedAt)
390
391 return &vyletdatabase.GetFollowForAuthorSubjectResponse{
392 Follow: follow,
393 }, nil
394}