this repo has no description
at main 11 kB view raw
1package server 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "strings" 8 "time" 9 10 "github.com/gocql/gocql" 11 vyletdatabase "github.com/vylet-app/go/database/proto" 12 "github.com/vylet-app/go/internal/helpers" 13 "google.golang.org/protobuf/types/known/timestamppb" 14) 15 16func (s *Server) CreateFollow(ctx context.Context, req *vyletdatabase.CreateFollowRequest) (*vyletdatabase.CreateFollowResponse, error) { 17 logger := s.logger.With("name", "CreateFollow", "uri", req.Follow.Uri, "did", req.Follow.AuthorDid, "subjectDid", req.Follow.SubjectDid) 18 19 now := time.Now().UTC() 20 21 batch := s.cqlSession.NewBatch(gocql.LoggedBatch).WithContext(ctx) 22 23 args := []any{ 24 req.Follow.Uri, 25 req.Follow.Cid, 26 req.Follow.SubjectDid, 27 req.Follow.AuthorDid, 28 req.Follow.CreatedAt.AsTime(), 29 now, 30 } 31 32 query := ` 33 INSERT INTO %s 34 (uri, cid, subject_did, author_did, created_at, indexed_at) 35 VALUES 36 (?, ?, ?, ?, ?, ?) 37 ` 38 39 batch.Query(fmt.Sprintf(query, "follows_by_subject_did"), args...) 40 batch.Query(fmt.Sprintf(query, "follows_by_author_did"), args...) 41 batch.Query(fmt.Sprintf(query, "follows_by_uri"), args...) 42 batch.Query(fmt.Sprintf(query, "follows_by_author_did_subject_did"), args...) 43 44 if err := s.cqlSession.ExecuteBatch(batch); err != nil { 45 logger.Error("failed to create follow", "err", err) 46 return &vyletdatabase.CreateFollowResponse{ 47 Error: helpers.ToStringPtr(err.Error()), 48 }, nil 49 } 50 51 if err := s.cqlSession.Query(` 52 UPDATE follow_counts 53 SET follows_count = follows_count + 1 54 WHERE did = ? 55 `, req.Follow.AuthorDid).WithContext(ctx).Exec(); err != nil { 56 logger.Error("failed to increment follows count", "err", err) 57 return &vyletdatabase.CreateFollowResponse{ 58 Error: helpers.ToStringPtr(err.Error()), 59 }, nil 60 } 61 62 if err := s.cqlSession.Query(` 63 UPDATE follow_counts 64 SET followers_count = followers_count + 1 65 WHERE did = ? 66 `, req.Follow.SubjectDid).WithContext(ctx).Exec(); err != nil { 67 logger.Error("failed to increment followers count", "err", err) 68 return &vyletdatabase.CreateFollowResponse{ 69 Error: helpers.ToStringPtr(err.Error()), 70 }, nil 71 } 72 73 return &vyletdatabase.CreateFollowResponse{}, nil 74} 75 76func (s *Server) DeleteFollow(ctx context.Context, req *vyletdatabase.DeleteFollowRequest) (*vyletdatabase.DeleteFollowResponse, error) { 77 logger := s.logger.With("name", "DeleteFollow", "uri", req.Uri) 78 79 var ( 80 createdAt time.Time 81 subjectDid string 82 authorDid string 83 ) 84 85 query := ` 86 SELECT created_at, subject_did, author_did 87 FROM follows_by_uri 88 WHERE uri = ? 89 ` 90 if err := s.cqlSession.Query(query, req.Uri).WithContext(ctx).Scan(&createdAt, &subjectDid, &authorDid); err != nil { 91 if err == gocql.ErrNotFound { 92 logger.Warn("follow not found", "uri", req.Uri) 93 return &vyletdatabase.DeleteFollowResponse{ 94 Error: helpers.ToStringPtr("follow not found"), 95 }, nil 96 } 97 logger.Error("failed to fetch follow", "uri", req.Uri, "err", err) 98 return &vyletdatabase.DeleteFollowResponse{ 99 Error: helpers.ToStringPtr(err.Error()), 100 }, nil 101 } 102 103 logger = logger.With("authorDid", authorDid, "subjectDid", subjectDid) 104 105 batch := s.cqlSession.NewBatch(gocql.LoggedBatch).WithContext(ctx) 106 107 batch.Query(` 108 DELETE FROM follows_by_uri 109 WHERE uri = ? 110 `, req.Uri) 111 112 batch.Query(` 113 DELETE FROM follows_by_subject_did 114 WHERE subject_did = ? AND created_at = ? AND uri = ? 115 `, subjectDid, createdAt, req.Uri) 116 117 batch.Query(` 118 DELETE FROM follows_by_author_did 119 WHERE author_did = ? AND created_at = ? AND uri = ? 120 `, authorDid, createdAt, req.Uri) 121 122 batch.Query(` 123 DELETE FROM follows_by_author_did_subject_did 124 WHERE author_did = ? AND subject_did = ? 125 `, authorDid, subjectDid) 126 127 if err := s.cqlSession.ExecuteBatch(batch); err != nil { 128 logger.Error("failed to delete follow", "uri", req.Uri, "err", err) 129 return &vyletdatabase.DeleteFollowResponse{ 130 Error: helpers.ToStringPtr(err.Error()), 131 }, nil 132 } 133 134 if err := s.cqlSession.Query(` 135 UPDATE follow_counts 136 SET follows_count = follows_count - 1 137 WHERE did = ? 138 `, authorDid).WithContext(ctx).Exec(); err != nil { 139 logger.Error("failed to decrement follows count", "err", err) 140 return &vyletdatabase.DeleteFollowResponse{ 141 Error: helpers.ToStringPtr(err.Error()), 142 }, nil 143 } 144 145 if err := s.cqlSession.Query(` 146 UPDATE follow_counts 147 SET followers_count = followers_count - 1 148 WHERE did = ? 149 `, subjectDid).WithContext(ctx).Exec(); err != nil { 150 logger.Error("failed to decrement followers count", "err", err) 151 return &vyletdatabase.DeleteFollowResponse{ 152 Error: helpers.ToStringPtr(err.Error()), 153 }, nil 154 } 155 156 return &vyletdatabase.DeleteFollowResponse{}, nil 157} 158 159func (s *Server) GetFollowsByActor(ctx context.Context, req *vyletdatabase.GetFollowsByActorRequest) (*vyletdatabase.GetFollowsByActorResponse, error) { 160 logger := s.logger.With("name", "GetFollowsByActor", "did", req.Did) 161 162 if req.Limit <= 0 { 163 return nil, fmt.Errorf("limit must be greater than 0") 164 } 165 166 var ( 167 query string 168 args []any 169 ) 170 171 if req.Cursor != nil && *req.Cursor != "" { 172 cursorParts := strings.SplitN(*req.Cursor, "|", 2) 173 if len(cursorParts) != 2 { 174 logger.Error("invalid cursor format", "cursor", *req.Cursor) 175 return &vyletdatabase.GetFollowsByActorResponse{ 176 Error: helpers.ToStringPtr("invalid cursor format"), 177 }, nil 178 } 179 180 cursorTime, err := time.Parse(time.RFC3339Nano, cursorParts[0]) 181 if err != nil { 182 logger.Error("failed to parse cursor timestamp", "cursor", *req.Cursor, "err", err) 183 return &vyletdatabase.GetFollowsByActorResponse{ 184 Error: helpers.ToStringPtr("invalid cursor format"), 185 }, nil 186 } 187 cursorUri := cursorParts[1] 188 189 query = ` 190 SELECT uri, cid, subject_did, author_did, created_at, indexed_at 191 FROM follows_by_author_did 192 WHERE author_did = ? AND (created_at, uri) < (?, ?) 193 ORDER BY created_at DESC, uri ASC 194 LIMIT ? 195 ` 196 args = []any{req.Did, cursorTime, cursorUri, req.Limit + 1} 197 } else { 198 query = ` 199 SELECT uri, cid, subject_did, author_did, created_at, indexed_at 200 FROM follows_by_author_did 201 WHERE author_did = ? 202 ORDER BY created_at DESC, uri ASC 203 LIMIT ? 204 ` 205 args = []any{req.Did, req.Limit} 206 } 207 208 iter := s.cqlSession.Query(query, args...).WithContext(ctx).Iter() 209 defer iter.Close() 210 211 var follows []*vyletdatabase.Follow 212 213 var ( 214 createdAt time.Time 215 indexedAt time.Time 216 ) 217 for { 218 follow := &vyletdatabase.Follow{} 219 if !iter.Scan( 220 &follow.Uri, 221 &follow.Cid, 222 &follow.SubjectDid, 223 &follow.AuthorDid, 224 &createdAt, 225 &indexedAt, 226 ) { 227 break 228 } 229 follow.CreatedAt = timestamppb.New(createdAt) 230 follow.IndexedAt = timestamppb.New(indexedAt) 231 232 follows = append(follows, follow) 233 } 234 if err := iter.Close(); err != nil { 235 logger.Error("failed to iterate follows", "err", err) 236 return &vyletdatabase.GetFollowsByActorResponse{ 237 Error: helpers.ToStringPtr(err.Error()), 238 }, nil 239 } 240 241 var nextCursor *string 242 if len(follows) > int(req.Limit) { 243 follows = follows[:req.Limit] 244 last := follows[len(follows)-1] 245 cursorStr := fmt.Sprintf("%s|%s", 246 last.CreatedAt.AsTime().Format(time.RFC3339Nano), 247 last.Uri) 248 nextCursor = &cursorStr 249 } 250 251 return &vyletdatabase.GetFollowsByActorResponse{ 252 Follows: follows, 253 Cursor: nextCursor, 254 }, nil 255} 256 257func (s *Server) GetFollowersByActor(ctx context.Context, req *vyletdatabase.GetFollowersByActorRequest) (*vyletdatabase.GetFollowersByActorResponse, error) { 258 logger := s.logger.With("name", "GetFollowersByActor", "did", req.Did) 259 260 if req.Limit <= 0 { 261 return nil, fmt.Errorf("limit must be greater than 0") 262 } 263 264 var ( 265 query string 266 args []any 267 ) 268 269 if req.Cursor != nil && *req.Cursor != "" { 270 cursorParts := strings.SplitN(*req.Cursor, "|", 2) 271 if len(cursorParts) != 2 { 272 logger.Error("invalid cursor format", "cursor", *req.Cursor) 273 return &vyletdatabase.GetFollowersByActorResponse{ 274 Error: helpers.ToStringPtr("invalid cursor format"), 275 }, nil 276 } 277 278 cursorTime, err := time.Parse(time.RFC3339Nano, cursorParts[0]) 279 if err != nil { 280 logger.Error("failed to parse cursor timestamp", "cursor", *req.Cursor, "err", err) 281 return &vyletdatabase.GetFollowersByActorResponse{ 282 Error: helpers.ToStringPtr("invalid cursor format"), 283 }, nil 284 } 285 cursorUri := cursorParts[1] 286 287 query = ` 288 SELECT uri, cid, subject_did, author_did, created_at, indexed_at 289 FROM follows_by_subject_did 290 WHERE subject_did = ? AND (created_at, uri) < (?, ?) 291 ORDER BY created_at DESC, uri ASC 292 LIMIT ? 293 ` 294 args = []any{req.Did, cursorTime, cursorUri, req.Limit + 1} 295 } else { 296 query = ` 297 SELECT uri, cid, subject_did, author_did, created_at, indexed_at 298 FROM follows_by_subject_did 299 WHERE subject_did = ? 300 ORDER BY created_at DESC, uri ASC 301 LIMIT ? 302 ` 303 args = []any{req.Did, req.Limit} 304 } 305 306 iter := s.cqlSession.Query(query, args...).WithContext(ctx).Iter() 307 defer iter.Close() 308 309 var follows []*vyletdatabase.Follow 310 311 var ( 312 createdAt time.Time 313 indexedAt time.Time 314 ) 315 for { 316 follow := &vyletdatabase.Follow{} 317 if !iter.Scan( 318 &follow.Uri, 319 &follow.Cid, 320 &follow.SubjectDid, 321 &follow.AuthorDid, 322 &createdAt, 323 &indexedAt, 324 ) { 325 break 326 } 327 follow.CreatedAt = timestamppb.New(createdAt) 328 follow.IndexedAt = timestamppb.New(indexedAt) 329 330 follows = append(follows, follow) 331 } 332 if err := iter.Close(); err != nil { 333 logger.Error("failed to iterate follows", "err", err) 334 return &vyletdatabase.GetFollowersByActorResponse{ 335 Error: helpers.ToStringPtr(err.Error()), 336 }, nil 337 } 338 339 var nextCursor *string 340 if len(follows) > int(req.Limit) { 341 follows = follows[:req.Limit] 342 last := follows[len(follows)-1] 343 cursorStr := fmt.Sprintf("%s|%s", 344 last.CreatedAt.AsTime().Format(time.RFC3339Nano), 345 last.Uri) 346 nextCursor = &cursorStr 347 } 348 349 return &vyletdatabase.GetFollowersByActorResponse{ 350 Followers: follows, 351 Cursor: nextCursor, 352 }, nil 353} 354 355func (s *Server) GetFollowForAuthorSubject(ctx context.Context, req *vyletdatabase.GetFollowForAuthorSubjectRequest) (*vyletdatabase.GetFollowForAuthorSubjectResponse, error) { 356 logger := s.logger.With("name", "GetFollowForAuthorSubject", "authorDid", req.AuthorDid, "subjectDid", req.SubjectDid) 357 358 args := []any{req.AuthorDid, req.SubjectDid} 359 360 query := ` 361 SELECT uri, cid, subject_did, author_did, created_at, indexed_at 362 FROM follows_by_author_did_subject_did 363 WHERE author_did = ? AND subject_did = ? 364 ` 365 366 follow := &vyletdatabase.Follow{} 367 var ( 368 createdAt time.Time 369 indexedAt time.Time 370 ) 371 if err := s.cqlSession.Query(query, args...).WithContext(ctx).Scan( 372 &follow.Uri, 373 &follow.Cid, 374 &follow.SubjectDid, 375 &follow.AuthorDid, 376 &createdAt, 377 &indexedAt, 378 ); err != nil { 379 if errors.Is(err, gocql.ErrNotFound) { 380 return &vyletdatabase.GetFollowForAuthorSubjectResponse{}, nil 381 } 382 383 logger.Error("error finding follow", "err", err) 384 return &vyletdatabase.GetFollowForAuthorSubjectResponse{ 385 Error: helpers.ToStringPtr(err.Error()), 386 }, nil 387 } 388 follow.CreatedAt = timestamppb.New(createdAt) 389 follow.IndexedAt = timestamppb.New(indexedAt) 390 391 return &vyletdatabase.GetFollowForAuthorSubjectResponse{ 392 Follow: follow, 393 }, nil 394}