Write on the margins of the internet. Powered by the AT Protocol. margin.at
extension web atproto comments
at main 751 lines 19 kB view raw
1package sync 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "strings" 10 "time" 11 12 "margin.at/internal/crypto" 13 "margin.at/internal/db" 14 "margin.at/internal/logger" 15 "margin.at/internal/standardsite" 16 "margin.at/internal/verification" 17 "margin.at/internal/xrpc" 18) 19 20var CIDVerificationEnabled = true 21 22type Service struct { 23 db *db.DB 24} 25 26func NewService(database *db.DB) *Service { 27 return &Service{db: database} 28} 29 30func (s *Service) PerformSync(ctx context.Context, did string, getClient func(context.Context, string) (*xrpc.Client, error)) (map[string]string, error) { 31 collections := []string{ 32 xrpc.CollectionAnnotation, 33 xrpc.CollectionHighlight, 34 xrpc.CollectionBookmark, 35 xrpc.CollectionReply, 36 xrpc.CollectionLike, 37 xrpc.CollectionCollection, 38 xrpc.CollectionCollectionItem, 39 xrpc.CollectionAPIKey, 40 xrpc.CollectionPreferences, 41 xrpc.CollectionSembleCard, 42 xrpc.CollectionSembleCollection, 43 xrpc.CollectionSembleCollectionLink, 44 xrpc.CollectionDocument, 45 } 46 47 results := make(map[string]string) 48 49 client, err := getClient(ctx, did) 50 if err != nil { 51 return nil, err 52 } 53 54 for _, collectionNSID := range collections { 55 count := 0 56 cursor := "" 57 fetchedURIs := make(map[string]bool) 58 59 for { 60 url := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s&limit=100", client.PDS, did, collectionNSID) 61 if cursor != "" { 62 url += "&cursor=" + cursor 63 } 64 65 req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) 66 req.Header.Set("Authorization", "Bearer "+client.AccessToken) 67 68 resp, err := http.DefaultClient.Do(req) 69 if err != nil { 70 return nil, fmt.Errorf("failed to fetch %s: %w", collectionNSID, err) 71 } 72 defer resp.Body.Close() 73 74 if resp.StatusCode != 200 { 75 body, _ := io.ReadAll(resp.Body) 76 results[collectionNSID] = fmt.Sprintf("error: %s", string(body)) 77 break 78 } 79 80 var output struct { 81 Records []struct { 82 URI string `json:"uri"` 83 CID string `json:"cid"` 84 Value json.RawMessage `json:"value"` 85 } `json:"records"` 86 Cursor string `json:"cursor"` 87 } 88 89 if err := json.NewDecoder(resp.Body).Decode(&output); err != nil { 90 return nil, err 91 } 92 93 for _, rec := range output.Records { 94 if CIDVerificationEnabled && rec.CID != "" { 95 if err := crypto.VerifyRecordCID(rec.Value, rec.CID, rec.URI); err != nil { 96 logger.Error("CID verification failed for %s: %v (skipping)", rec.URI, err) 97 continue 98 } 99 } 100 101 err := s.upsertRecord(did, collectionNSID, rec.URI, rec.CID, rec.Value) 102 if err != nil { 103 fmt.Printf("Error upserting %s: %v\n", rec.URI, err) 104 } else { 105 count++ 106 fetchedURIs[rec.URI] = true 107 } 108 } 109 110 if output.Cursor == "" { 111 break 112 } 113 cursor = output.Cursor 114 } 115 116 deletedCount := 0 117 if results[collectionNSID] == "" { 118 var localURIs []string 119 var err error 120 121 switch collectionNSID { 122 case xrpc.CollectionAnnotation: 123 localURIs, err = s.db.GetAnnotationURIs(did) 124 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionAnnotation) 125 case xrpc.CollectionHighlight: 126 localURIs, err = s.db.GetHighlightURIs(did) 127 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionHighlight) 128 case xrpc.CollectionBookmark: 129 localURIs, err = s.db.GetBookmarkURIs(did) 130 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionBookmark) 131 case xrpc.CollectionCollection: 132 cols, e := s.db.GetCollectionsByAuthor(did) 133 if e == nil { 134 for _, c := range cols { 135 localURIs = append(localURIs, c.URI) 136 } 137 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionCollection) 138 } else { 139 err = e 140 } 141 case xrpc.CollectionCollectionItem: 142 items, e := s.db.GetCollectionItemsByAuthor(did) 143 if e == nil { 144 for _, item := range items { 145 localURIs = append(localURIs, item.URI) 146 } 147 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionCollectionItem) 148 } else { 149 err = e 150 } 151 case xrpc.CollectionReply: 152 replies, e := s.db.GetRepliesByAuthor(did) 153 if e == nil { 154 for _, r := range replies { 155 localURIs = append(localURIs, r.URI) 156 } 157 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionReply) 158 } else { 159 err = e 160 } 161 case xrpc.CollectionLike: 162 likes, e := s.db.GetLikesByAuthor(did) 163 if e == nil { 164 for _, l := range likes { 165 localURIs = append(localURIs, l.URI) 166 } 167 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionLike) 168 } else { 169 err = e 170 } 171 case xrpc.CollectionSembleCard: 172 annos, e1 := s.db.GetAnnotationURIs(did) 173 books, e2 := s.db.GetBookmarkURIs(did) 174 if e1 != nil { 175 err = e1 176 break 177 } 178 if e2 != nil { 179 err = e2 180 break 181 } 182 localURIs = append(localURIs, annos...) 183 localURIs = append(localURIs, books...) 184 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionSembleCard) 185 case xrpc.CollectionSembleCollection: 186 cols, e := s.db.GetCollectionsByAuthor(did) 187 if e == nil { 188 for _, c := range cols { 189 localURIs = append(localURIs, c.URI) 190 } 191 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionSembleCollection) 192 } else { 193 err = e 194 } 195 case xrpc.CollectionAPIKey: 196 localURIs, err = s.db.GetAPIKeyURIs(did) 197 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionAPIKey) 198 case xrpc.CollectionPreferences: 199 localURIs, err = s.db.GetPreferenceURIs(did) 200 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionPreferences) 201 case xrpc.CollectionSembleCollectionLink: 202 items, e := s.db.GetCollectionItemsByAuthor(did) 203 if e == nil { 204 for _, item := range items { 205 localURIs = append(localURIs, item.URI) 206 } 207 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionSembleCollectionLink) 208 } else { 209 err = e 210 } 211 } 212 213 if err == nil { 214 for _, uri := range localURIs { 215 if !fetchedURIs[uri] { 216 switch collectionNSID { 217 case xrpc.CollectionAnnotation: 218 _ = s.db.DeleteAnnotation(uri) 219 case xrpc.CollectionHighlight: 220 _ = s.db.DeleteHighlight(uri) 221 case xrpc.CollectionBookmark: 222 _ = s.db.DeleteBookmark(uri) 223 case xrpc.CollectionCollection: 224 _ = s.db.DeleteCollection(uri) 225 case xrpc.CollectionCollectionItem: 226 _ = s.db.RemoveFromCollection(uri) 227 case xrpc.CollectionReply: 228 _ = s.db.DeleteReply(uri) 229 case xrpc.CollectionLike: 230 _ = s.db.DeleteLike(uri) 231 case xrpc.CollectionSembleCard: 232 _ = s.db.DeleteAnnotation(uri) 233 _ = s.db.DeleteBookmark(uri) 234 case xrpc.CollectionSembleCollection: 235 _ = s.db.DeleteCollection(uri) 236 case xrpc.CollectionSembleCollectionLink: 237 _ = s.db.RemoveFromCollection(uri) 238 case xrpc.CollectionAPIKey: 239 _ = s.db.DeleteAPIKeyByURI(uri) 240 case xrpc.CollectionPreferences: 241 _ = s.db.DeletePreferences(uri) 242 } 243 deletedCount++ 244 } 245 } 246 } 247 } 248 249 if results[collectionNSID] == "" { 250 results[collectionNSID] = fmt.Sprintf("synced %d records, deleted %d stale", count, deletedCount) 251 } 252 } 253 return results, nil 254} 255 256func filterURIsByCollection(uris []string, collectionNSID string) []string { 257 if len(uris) == 0 || collectionNSID == "" { 258 return uris 259 } 260 needle := "/" + collectionNSID + "/" 261 out := make([]string, 0, len(uris)) 262 for _, u := range uris { 263 if strings.Contains(u, needle) { 264 out = append(out, u) 265 } 266 } 267 return out 268} 269 270func strPtr(s string) *string { 271 if s == "" { 272 return nil 273 } 274 return &s 275} 276 277func (s *Service) upsertRecord(did, collection, uri, cid string, value json.RawMessage) error { 278 cidPtr := strPtr(cid) 279 switch collection { 280 case xrpc.CollectionAnnotation: 281 var record xrpc.AnnotationRecord 282 if err := json.Unmarshal(value, &record); err != nil { 283 return err 284 } 285 286 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 287 288 targetSource := record.Target.Source 289 if targetSource == "" { 290 291 } 292 293 var targetHash string 294 if targetSource != "" { 295 targetHash = db.HashURL(targetSource) 296 } 297 298 motivation := record.Motivation 299 if motivation == "" { 300 motivation = "commenting" 301 } 302 303 var bodyValuePtr, bodyFormatPtr, bodyURIPtr, targetTitlePtr, selectorJSONPtr, tagsJSONPtr *string 304 if record.Body != nil { 305 if record.Body.Value != "" { 306 val := record.Body.Value 307 bodyValuePtr = &val 308 } 309 if record.Body.Format != "" { 310 fmt := record.Body.Format 311 bodyFormatPtr = &fmt 312 } 313 } 314 if record.Target.Title != "" { 315 t := record.Target.Title 316 targetTitlePtr = &t 317 } 318 if len(record.Target.Selector) > 0 { 319 selectorStr := string(record.Target.Selector) 320 selectorJSONPtr = &selectorStr 321 } 322 if len(record.Tags) > 0 { 323 tagsBytes, _ := json.Marshal(record.Tags) 324 tagsStr := string(tagsBytes) 325 tagsJSONPtr = &tagsStr 326 } 327 328 return s.db.CreateAnnotation(&db.Annotation{ 329 URI: uri, 330 AuthorDID: did, 331 Motivation: motivation, 332 BodyValue: bodyValuePtr, 333 BodyFormat: bodyFormatPtr, 334 BodyURI: bodyURIPtr, 335 TargetSource: targetSource, 336 TargetHash: targetHash, 337 TargetTitle: targetTitlePtr, 338 SelectorJSON: selectorJSONPtr, 339 TagsJSON: tagsJSONPtr, 340 CreatedAt: createdAt, 341 IndexedAt: time.Now(), 342 CID: cidPtr, 343 }) 344 345 case xrpc.CollectionHighlight: 346 var record xrpc.HighlightRecord 347 if err := json.Unmarshal(value, &record); err != nil { 348 return err 349 } 350 351 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 352 if createdAt.IsZero() { 353 createdAt = time.Now() 354 } 355 356 var targetHash string 357 if record.Target.Source != "" { 358 targetHash = db.HashURL(record.Target.Source) 359 } 360 361 var titlePtr, selectorJSONPtr, colorPtr, tagsJSONPtr *string 362 if record.Target.Title != "" { 363 t := record.Target.Title 364 titlePtr = &t 365 } 366 if len(record.Target.Selector) > 0 { 367 selectorStr := string(record.Target.Selector) 368 selectorJSONPtr = &selectorStr 369 } 370 if record.Color != "" { 371 c := record.Color 372 colorPtr = &c 373 } 374 if len(record.Tags) > 0 { 375 tagsBytes, _ := json.Marshal(record.Tags) 376 tagsStr := string(tagsBytes) 377 tagsJSONPtr = &tagsStr 378 } 379 380 return s.db.CreateHighlight(&db.Highlight{ 381 URI: uri, 382 AuthorDID: did, 383 TargetSource: record.Target.Source, 384 TargetHash: targetHash, 385 TargetTitle: titlePtr, 386 SelectorJSON: selectorJSONPtr, 387 Color: colorPtr, 388 TagsJSON: tagsJSONPtr, 389 CreatedAt: createdAt, 390 IndexedAt: time.Now(), 391 CID: cidPtr, 392 }) 393 394 case xrpc.CollectionBookmark: 395 var record xrpc.BookmarkRecord 396 if err := json.Unmarshal(value, &record); err != nil { 397 return err 398 } 399 400 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 401 402 var sourceHash string 403 if record.Source != "" { 404 sourceHash = db.HashURL(record.Source) 405 } 406 407 var titlePtr, descPtr, tagsJSONPtr *string 408 if record.Title != "" { 409 t := record.Title 410 titlePtr = &t 411 } 412 if record.Description != "" { 413 d := record.Description 414 descPtr = &d 415 } 416 if len(record.Tags) > 0 { 417 tagsBytes, _ := json.Marshal(record.Tags) 418 tagsStr := string(tagsBytes) 419 tagsJSONPtr = &tagsStr 420 } 421 422 return s.db.CreateBookmark(&db.Bookmark{ 423 URI: uri, 424 AuthorDID: did, 425 Source: record.Source, 426 SourceHash: sourceHash, 427 Title: titlePtr, 428 Description: descPtr, 429 TagsJSON: tagsJSONPtr, 430 CreatedAt: createdAt, 431 IndexedAt: time.Now(), 432 CID: cidPtr, 433 }) 434 435 case xrpc.CollectionCollection: 436 var record xrpc.CollectionRecord 437 if err := json.Unmarshal(value, &record); err != nil { 438 return err 439 } 440 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 441 442 var descPtr, iconPtr *string 443 if record.Description != "" { 444 d := record.Description 445 descPtr = &d 446 } 447 if record.Icon != "" { 448 i := record.Icon 449 iconPtr = &i 450 } 451 452 return s.db.CreateCollection(&db.Collection{ 453 URI: uri, 454 AuthorDID: did, 455 Name: record.Name, 456 Description: descPtr, 457 Icon: iconPtr, 458 CreatedAt: createdAt, 459 IndexedAt: time.Now(), 460 }) 461 462 case xrpc.CollectionCollectionItem: 463 var record xrpc.CollectionItemRecord 464 if err := json.Unmarshal(value, &record); err != nil { 465 return err 466 } 467 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 468 469 return s.db.AddToCollection(&db.CollectionItem{ 470 URI: uri, 471 AuthorDID: did, 472 CollectionURI: record.Collection, 473 AnnotationURI: record.Annotation, 474 Position: record.Position, 475 CreatedAt: createdAt, 476 IndexedAt: time.Now(), 477 }) 478 479 case xrpc.CollectionReply: 480 var record xrpc.ReplyRecord 481 if err := json.Unmarshal(value, &record); err != nil { 482 return err 483 } 484 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 485 486 var formatPtr *string 487 if record.Format != "" { 488 f := record.Format 489 formatPtr = &f 490 } 491 492 return s.db.CreateReply(&db.Reply{ 493 URI: uri, 494 AuthorDID: did, 495 ParentURI: record.Parent.URI, 496 RootURI: record.Root.URI, 497 Text: record.Text, 498 Format: formatPtr, 499 CreatedAt: createdAt, 500 IndexedAt: time.Now(), 501 CID: cidPtr, 502 }) 503 504 case xrpc.CollectionLike: 505 var record xrpc.LikeRecord 506 if err := json.Unmarshal(value, &record); err != nil { 507 return err 508 } 509 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 510 511 return s.db.CreateLike(&db.Like{ 512 URI: uri, 513 AuthorDID: did, 514 SubjectURI: record.Subject.URI, 515 CreatedAt: createdAt, 516 IndexedAt: time.Now(), 517 }) 518 519 case xrpc.CollectionSembleCard: 520 var card xrpc.SembleCard 521 if err := json.Unmarshal(value, &card); err != nil { 522 return err 523 } 524 525 createdAt := card.GetCreatedAtTime() 526 527 content, err := card.ParseContent() 528 if err != nil { 529 return nil 530 } 531 532 switch card.Type { 533 case "NOTE": 534 note, ok := content.(*xrpc.SembleNoteContent) 535 if !ok { 536 return nil 537 } 538 539 targetSource := card.URL 540 if targetSource == "" { 541 return nil 542 } 543 544 targetHash := db.HashURL(targetSource) 545 motivation := "commenting" 546 bodyValue := note.Text 547 548 return s.db.CreateAnnotation(&db.Annotation{ 549 URI: uri, 550 AuthorDID: did, 551 Motivation: motivation, 552 BodyValue: &bodyValue, 553 TargetSource: targetSource, 554 TargetHash: targetHash, 555 CreatedAt: createdAt, 556 IndexedAt: time.Now(), 557 CID: cidPtr, 558 }) 559 560 case "URL": 561 urlContent, ok := content.(*xrpc.SembleURLContent) 562 if !ok { 563 return nil 564 } 565 566 source := urlContent.URL 567 if source == "" { 568 return nil 569 } 570 sourceHash := db.HashURL(source) 571 572 var titlePtr *string 573 if urlContent.Metadata != nil && urlContent.Metadata.Title != "" { 574 t := urlContent.Metadata.Title 575 titlePtr = &t 576 } 577 578 return s.db.CreateBookmark(&db.Bookmark{ 579 URI: uri, 580 AuthorDID: did, 581 Source: source, 582 SourceHash: sourceHash, 583 Title: titlePtr, 584 CreatedAt: createdAt, 585 IndexedAt: time.Now(), 586 CID: cidPtr, 587 }) 588 } 589 590 case xrpc.CollectionSembleCollection: 591 var record xrpc.SembleCollection 592 if err := json.Unmarshal(value, &record); err != nil { 593 return err 594 } 595 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 596 597 var descPtr, iconPtr *string 598 if record.Description != "" { 599 d := record.Description 600 descPtr = &d 601 } 602 icon := "icon:semble" 603 iconPtr = &icon 604 605 return s.db.CreateCollection(&db.Collection{ 606 URI: uri, 607 AuthorDID: did, 608 Name: record.Name, 609 Description: descPtr, 610 Icon: iconPtr, 611 CreatedAt: createdAt, 612 IndexedAt: time.Now(), 613 }) 614 615 case xrpc.CollectionSembleCollectionLink: 616 var record xrpc.SembleCollectionLink 617 if err := json.Unmarshal(value, &record); err != nil { 618 return err 619 } 620 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 621 622 return s.db.AddToCollection(&db.CollectionItem{ 623 URI: uri, 624 AuthorDID: did, 625 CollectionURI: record.Collection.URI, 626 AnnotationURI: record.Card.URI, 627 Position: 0, 628 CreatedAt: createdAt, 629 IndexedAt: time.Now(), 630 }) 631 632 case xrpc.CollectionAPIKey: 633 var record xrpc.APIKeyRecord 634 if err := json.Unmarshal(value, &record); err != nil { 635 return err 636 } 637 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 638 639 parts := strings.Split(uri, "/") 640 rkey := parts[len(parts)-1] 641 642 return s.db.CreateAPIKey(&db.APIKey{ 643 ID: rkey, 644 OwnerDID: did, 645 Name: record.Name, 646 KeyHash: record.KeyHash, 647 CreatedAt: createdAt, 648 URI: uri, 649 CID: cidPtr, 650 IndexedAt: time.Now(), 651 }) 652 653 case xrpc.CollectionDocument: 654 var record struct { 655 Site string `json:"site"` 656 Path string `json:"path"` 657 Title string `json:"title"` 658 Description string `json:"description"` 659 TextContent string `json:"textContent"` 660 Tags []string `json:"tags"` 661 PublishedAt string `json:"publishedAt"` 662 CanonicalURL string `json:"canonicalUrl"` 663 } 664 if err := json.Unmarshal(value, &record); err != nil { 665 return err 666 } 667 if record.Title == "" || record.Site == "" { 668 return nil 669 } 670 publishedAt, err := time.Parse(time.RFC3339, record.PublishedAt) 671 if err != nil { 672 publishedAt = time.Now() 673 } 674 canonicalURL := standardsite.ResolveCanonicalURL(record.Site, record.Path, record.CanonicalURL) 675 if canonicalURL == "" { 676 return nil 677 } 678 var pathPtr, descPtr, textPtr, tagsJSONPtr *string 679 if record.Path != "" { 680 pathPtr = &record.Path 681 } 682 if record.Description != "" { 683 descPtr = &record.Description 684 } 685 if record.TextContent != "" { 686 textPtr = &record.TextContent 687 } 688 if len(record.Tags) > 0 { 689 tagsBytes, _ := json.Marshal(record.Tags) 690 tagsStr := string(tagsBytes) 691 tagsJSONPtr = &tagsStr 692 } 693 if err := verification.VerifyDocument(canonicalURL, uri); err != nil { 694 return nil 695 } 696 return s.db.UpsertDocument(&db.Document{ 697 URI: uri, 698 AuthorDID: did, 699 Site: record.Site, 700 Path: pathPtr, 701 Title: record.Title, 702 Description: descPtr, 703 TextContent: textPtr, 704 TagsJSON: tagsJSONPtr, 705 CanonicalURL: canonicalURL, 706 PublishedAt: publishedAt, 707 IndexedAt: time.Now(), 708 }) 709 710 case xrpc.CollectionPreferences: 711 var record xrpc.PreferencesRecord 712 if err := json.Unmarshal(value, &record); err != nil { 713 return err 714 } 715 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 716 717 var skippedHostnamesPtr *string 718 if len(record.ExternalLinkSkippedHostnames) > 0 { 719 hostnamesBytes, _ := json.Marshal(record.ExternalLinkSkippedHostnames) 720 hostnamesStr := string(hostnamesBytes) 721 skippedHostnamesPtr = &hostnamesStr 722 } 723 724 var subscribedLabelersPtr *string 725 if len(record.SubscribedLabelers) > 0 { 726 labelersBytes, _ := json.Marshal(record.SubscribedLabelers) 727 s := string(labelersBytes) 728 subscribedLabelersPtr = &s 729 } 730 731 var labelPrefsPtr *string 732 if len(record.LabelPreferences) > 0 { 733 prefsBytes, _ := json.Marshal(record.LabelPreferences) 734 s := string(prefsBytes) 735 labelPrefsPtr = &s 736 } 737 738 return s.db.UpsertPreferences(&db.Preferences{ 739 URI: uri, 740 AuthorDID: did, 741 ExternalLinkSkippedHostnames: skippedHostnamesPtr, 742 SubscribedLabelers: subscribedLabelersPtr, 743 LabelPreferences: labelPrefsPtr, 744 DisableExternalLinkWarning: record.DisableExternalLinkWarning, 745 CreatedAt: createdAt, 746 IndexedAt: time.Now(), 747 CID: cidPtr, 748 }) 749 } 750 return nil 751}