Write on the margins of the internet. Powered by the AT Protocol.
margin.at
extension
web
atproto
comments
1package sync
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "strings"
10 "time"
11
12 "margin.at/internal/crypto"
13 "margin.at/internal/db"
14 "margin.at/internal/logger"
15 "margin.at/internal/standardsite"
16 "margin.at/internal/verification"
17 "margin.at/internal/xrpc"
18)
19
20var CIDVerificationEnabled = true
21
22type Service struct {
23 db *db.DB
24}
25
26func NewService(database *db.DB) *Service {
27 return &Service{db: database}
28}
29
30func (s *Service) PerformSync(ctx context.Context, did string, getClient func(context.Context, string) (*xrpc.Client, error)) (map[string]string, error) {
31 collections := []string{
32 xrpc.CollectionAnnotation,
33 xrpc.CollectionHighlight,
34 xrpc.CollectionBookmark,
35 xrpc.CollectionReply,
36 xrpc.CollectionLike,
37 xrpc.CollectionCollection,
38 xrpc.CollectionCollectionItem,
39 xrpc.CollectionAPIKey,
40 xrpc.CollectionPreferences,
41 xrpc.CollectionSembleCard,
42 xrpc.CollectionSembleCollection,
43 xrpc.CollectionSembleCollectionLink,
44 xrpc.CollectionDocument,
45 }
46
47 results := make(map[string]string)
48
49 client, err := getClient(ctx, did)
50 if err != nil {
51 return nil, err
52 }
53
54 for _, collectionNSID := range collections {
55 count := 0
56 cursor := ""
57 fetchedURIs := make(map[string]bool)
58
59 for {
60 url := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s&limit=100", client.PDS, did, collectionNSID)
61 if cursor != "" {
62 url += "&cursor=" + cursor
63 }
64
65 req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
66 req.Header.Set("Authorization", "Bearer "+client.AccessToken)
67
68 resp, err := http.DefaultClient.Do(req)
69 if err != nil {
70 return nil, fmt.Errorf("failed to fetch %s: %w", collectionNSID, err)
71 }
72 defer resp.Body.Close()
73
74 if resp.StatusCode != 200 {
75 body, _ := io.ReadAll(resp.Body)
76 results[collectionNSID] = fmt.Sprintf("error: %s", string(body))
77 break
78 }
79
80 var output struct {
81 Records []struct {
82 URI string `json:"uri"`
83 CID string `json:"cid"`
84 Value json.RawMessage `json:"value"`
85 } `json:"records"`
86 Cursor string `json:"cursor"`
87 }
88
89 if err := json.NewDecoder(resp.Body).Decode(&output); err != nil {
90 return nil, err
91 }
92
93 for _, rec := range output.Records {
94 if CIDVerificationEnabled && rec.CID != "" {
95 if err := crypto.VerifyRecordCID(rec.Value, rec.CID, rec.URI); err != nil {
96 logger.Error("CID verification failed for %s: %v (skipping)", rec.URI, err)
97 continue
98 }
99 }
100
101 err := s.upsertRecord(did, collectionNSID, rec.URI, rec.CID, rec.Value)
102 if err != nil {
103 fmt.Printf("Error upserting %s: %v\n", rec.URI, err)
104 } else {
105 count++
106 fetchedURIs[rec.URI] = true
107 }
108 }
109
110 if output.Cursor == "" {
111 break
112 }
113 cursor = output.Cursor
114 }
115
116 deletedCount := 0
117 if results[collectionNSID] == "" {
118 var localURIs []string
119 var err error
120
121 switch collectionNSID {
122 case xrpc.CollectionAnnotation:
123 localURIs, err = s.db.GetAnnotationURIs(did)
124 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionAnnotation)
125 case xrpc.CollectionHighlight:
126 localURIs, err = s.db.GetHighlightURIs(did)
127 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionHighlight)
128 case xrpc.CollectionBookmark:
129 localURIs, err = s.db.GetBookmarkURIs(did)
130 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionBookmark)
131 case xrpc.CollectionCollection:
132 cols, e := s.db.GetCollectionsByAuthor(did)
133 if e == nil {
134 for _, c := range cols {
135 localURIs = append(localURIs, c.URI)
136 }
137 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionCollection)
138 } else {
139 err = e
140 }
141 case xrpc.CollectionCollectionItem:
142 items, e := s.db.GetCollectionItemsByAuthor(did)
143 if e == nil {
144 for _, item := range items {
145 localURIs = append(localURIs, item.URI)
146 }
147 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionCollectionItem)
148 } else {
149 err = e
150 }
151 case xrpc.CollectionReply:
152 replies, e := s.db.GetRepliesByAuthor(did)
153 if e == nil {
154 for _, r := range replies {
155 localURIs = append(localURIs, r.URI)
156 }
157 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionReply)
158 } else {
159 err = e
160 }
161 case xrpc.CollectionLike:
162 likes, e := s.db.GetLikesByAuthor(did)
163 if e == nil {
164 for _, l := range likes {
165 localURIs = append(localURIs, l.URI)
166 }
167 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionLike)
168 } else {
169 err = e
170 }
171 case xrpc.CollectionSembleCard:
172 annos, e1 := s.db.GetAnnotationURIs(did)
173 books, e2 := s.db.GetBookmarkURIs(did)
174 if e1 != nil {
175 err = e1
176 break
177 }
178 if e2 != nil {
179 err = e2
180 break
181 }
182 localURIs = append(localURIs, annos...)
183 localURIs = append(localURIs, books...)
184 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionSembleCard)
185 case xrpc.CollectionSembleCollection:
186 cols, e := s.db.GetCollectionsByAuthor(did)
187 if e == nil {
188 for _, c := range cols {
189 localURIs = append(localURIs, c.URI)
190 }
191 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionSembleCollection)
192 } else {
193 err = e
194 }
195 case xrpc.CollectionAPIKey:
196 localURIs, err = s.db.GetAPIKeyURIs(did)
197 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionAPIKey)
198 case xrpc.CollectionPreferences:
199 localURIs, err = s.db.GetPreferenceURIs(did)
200 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionPreferences)
201 case xrpc.CollectionSembleCollectionLink:
202 items, e := s.db.GetCollectionItemsByAuthor(did)
203 if e == nil {
204 for _, item := range items {
205 localURIs = append(localURIs, item.URI)
206 }
207 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionSembleCollectionLink)
208 } else {
209 err = e
210 }
211 }
212
213 if err == nil {
214 for _, uri := range localURIs {
215 if !fetchedURIs[uri] {
216 switch collectionNSID {
217 case xrpc.CollectionAnnotation:
218 _ = s.db.DeleteAnnotation(uri)
219 case xrpc.CollectionHighlight:
220 _ = s.db.DeleteHighlight(uri)
221 case xrpc.CollectionBookmark:
222 _ = s.db.DeleteBookmark(uri)
223 case xrpc.CollectionCollection:
224 _ = s.db.DeleteCollection(uri)
225 case xrpc.CollectionCollectionItem:
226 _ = s.db.RemoveFromCollection(uri)
227 case xrpc.CollectionReply:
228 _ = s.db.DeleteReply(uri)
229 case xrpc.CollectionLike:
230 _ = s.db.DeleteLike(uri)
231 case xrpc.CollectionSembleCard:
232 _ = s.db.DeleteAnnotation(uri)
233 _ = s.db.DeleteBookmark(uri)
234 case xrpc.CollectionSembleCollection:
235 _ = s.db.DeleteCollection(uri)
236 case xrpc.CollectionSembleCollectionLink:
237 _ = s.db.RemoveFromCollection(uri)
238 case xrpc.CollectionAPIKey:
239 _ = s.db.DeleteAPIKeyByURI(uri)
240 case xrpc.CollectionPreferences:
241 _ = s.db.DeletePreferences(uri)
242 }
243 deletedCount++
244 }
245 }
246 }
247 }
248
249 if results[collectionNSID] == "" {
250 results[collectionNSID] = fmt.Sprintf("synced %d records, deleted %d stale", count, deletedCount)
251 }
252 }
253 return results, nil
254}
255
256func filterURIsByCollection(uris []string, collectionNSID string) []string {
257 if len(uris) == 0 || collectionNSID == "" {
258 return uris
259 }
260 needle := "/" + collectionNSID + "/"
261 out := make([]string, 0, len(uris))
262 for _, u := range uris {
263 if strings.Contains(u, needle) {
264 out = append(out, u)
265 }
266 }
267 return out
268}
269
270func strPtr(s string) *string {
271 if s == "" {
272 return nil
273 }
274 return &s
275}
276
277func (s *Service) upsertRecord(did, collection, uri, cid string, value json.RawMessage) error {
278 cidPtr := strPtr(cid)
279 switch collection {
280 case xrpc.CollectionAnnotation:
281 var record xrpc.AnnotationRecord
282 if err := json.Unmarshal(value, &record); err != nil {
283 return err
284 }
285
286 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
287
288 targetSource := record.Target.Source
289 if targetSource == "" {
290
291 }
292
293 var targetHash string
294 if targetSource != "" {
295 targetHash = db.HashURL(targetSource)
296 }
297
298 motivation := record.Motivation
299 if motivation == "" {
300 motivation = "commenting"
301 }
302
303 var bodyValuePtr, bodyFormatPtr, bodyURIPtr, targetTitlePtr, selectorJSONPtr, tagsJSONPtr *string
304 if record.Body != nil {
305 if record.Body.Value != "" {
306 val := record.Body.Value
307 bodyValuePtr = &val
308 }
309 if record.Body.Format != "" {
310 fmt := record.Body.Format
311 bodyFormatPtr = &fmt
312 }
313 }
314 if record.Target.Title != "" {
315 t := record.Target.Title
316 targetTitlePtr = &t
317 }
318 if len(record.Target.Selector) > 0 {
319 selectorStr := string(record.Target.Selector)
320 selectorJSONPtr = &selectorStr
321 }
322 if len(record.Tags) > 0 {
323 tagsBytes, _ := json.Marshal(record.Tags)
324 tagsStr := string(tagsBytes)
325 tagsJSONPtr = &tagsStr
326 }
327
328 return s.db.CreateAnnotation(&db.Annotation{
329 URI: uri,
330 AuthorDID: did,
331 Motivation: motivation,
332 BodyValue: bodyValuePtr,
333 BodyFormat: bodyFormatPtr,
334 BodyURI: bodyURIPtr,
335 TargetSource: targetSource,
336 TargetHash: targetHash,
337 TargetTitle: targetTitlePtr,
338 SelectorJSON: selectorJSONPtr,
339 TagsJSON: tagsJSONPtr,
340 CreatedAt: createdAt,
341 IndexedAt: time.Now(),
342 CID: cidPtr,
343 })
344
345 case xrpc.CollectionHighlight:
346 var record xrpc.HighlightRecord
347 if err := json.Unmarshal(value, &record); err != nil {
348 return err
349 }
350
351 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
352 if createdAt.IsZero() {
353 createdAt = time.Now()
354 }
355
356 var targetHash string
357 if record.Target.Source != "" {
358 targetHash = db.HashURL(record.Target.Source)
359 }
360
361 var titlePtr, selectorJSONPtr, colorPtr, tagsJSONPtr *string
362 if record.Target.Title != "" {
363 t := record.Target.Title
364 titlePtr = &t
365 }
366 if len(record.Target.Selector) > 0 {
367 selectorStr := string(record.Target.Selector)
368 selectorJSONPtr = &selectorStr
369 }
370 if record.Color != "" {
371 c := record.Color
372 colorPtr = &c
373 }
374 if len(record.Tags) > 0 {
375 tagsBytes, _ := json.Marshal(record.Tags)
376 tagsStr := string(tagsBytes)
377 tagsJSONPtr = &tagsStr
378 }
379
380 return s.db.CreateHighlight(&db.Highlight{
381 URI: uri,
382 AuthorDID: did,
383 TargetSource: record.Target.Source,
384 TargetHash: targetHash,
385 TargetTitle: titlePtr,
386 SelectorJSON: selectorJSONPtr,
387 Color: colorPtr,
388 TagsJSON: tagsJSONPtr,
389 CreatedAt: createdAt,
390 IndexedAt: time.Now(),
391 CID: cidPtr,
392 })
393
394 case xrpc.CollectionBookmark:
395 var record xrpc.BookmarkRecord
396 if err := json.Unmarshal(value, &record); err != nil {
397 return err
398 }
399
400 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
401
402 var sourceHash string
403 if record.Source != "" {
404 sourceHash = db.HashURL(record.Source)
405 }
406
407 var titlePtr, descPtr, tagsJSONPtr *string
408 if record.Title != "" {
409 t := record.Title
410 titlePtr = &t
411 }
412 if record.Description != "" {
413 d := record.Description
414 descPtr = &d
415 }
416 if len(record.Tags) > 0 {
417 tagsBytes, _ := json.Marshal(record.Tags)
418 tagsStr := string(tagsBytes)
419 tagsJSONPtr = &tagsStr
420 }
421
422 return s.db.CreateBookmark(&db.Bookmark{
423 URI: uri,
424 AuthorDID: did,
425 Source: record.Source,
426 SourceHash: sourceHash,
427 Title: titlePtr,
428 Description: descPtr,
429 TagsJSON: tagsJSONPtr,
430 CreatedAt: createdAt,
431 IndexedAt: time.Now(),
432 CID: cidPtr,
433 })
434
435 case xrpc.CollectionCollection:
436 var record xrpc.CollectionRecord
437 if err := json.Unmarshal(value, &record); err != nil {
438 return err
439 }
440 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
441
442 var descPtr, iconPtr *string
443 if record.Description != "" {
444 d := record.Description
445 descPtr = &d
446 }
447 if record.Icon != "" {
448 i := record.Icon
449 iconPtr = &i
450 }
451
452 return s.db.CreateCollection(&db.Collection{
453 URI: uri,
454 AuthorDID: did,
455 Name: record.Name,
456 Description: descPtr,
457 Icon: iconPtr,
458 CreatedAt: createdAt,
459 IndexedAt: time.Now(),
460 })
461
462 case xrpc.CollectionCollectionItem:
463 var record xrpc.CollectionItemRecord
464 if err := json.Unmarshal(value, &record); err != nil {
465 return err
466 }
467 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
468
469 return s.db.AddToCollection(&db.CollectionItem{
470 URI: uri,
471 AuthorDID: did,
472 CollectionURI: record.Collection,
473 AnnotationURI: record.Annotation,
474 Position: record.Position,
475 CreatedAt: createdAt,
476 IndexedAt: time.Now(),
477 })
478
479 case xrpc.CollectionReply:
480 var record xrpc.ReplyRecord
481 if err := json.Unmarshal(value, &record); err != nil {
482 return err
483 }
484 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
485
486 var formatPtr *string
487 if record.Format != "" {
488 f := record.Format
489 formatPtr = &f
490 }
491
492 return s.db.CreateReply(&db.Reply{
493 URI: uri,
494 AuthorDID: did,
495 ParentURI: record.Parent.URI,
496 RootURI: record.Root.URI,
497 Text: record.Text,
498 Format: formatPtr,
499 CreatedAt: createdAt,
500 IndexedAt: time.Now(),
501 CID: cidPtr,
502 })
503
504 case xrpc.CollectionLike:
505 var record xrpc.LikeRecord
506 if err := json.Unmarshal(value, &record); err != nil {
507 return err
508 }
509 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
510
511 return s.db.CreateLike(&db.Like{
512 URI: uri,
513 AuthorDID: did,
514 SubjectURI: record.Subject.URI,
515 CreatedAt: createdAt,
516 IndexedAt: time.Now(),
517 })
518
519 case xrpc.CollectionSembleCard:
520 var card xrpc.SembleCard
521 if err := json.Unmarshal(value, &card); err != nil {
522 return err
523 }
524
525 createdAt := card.GetCreatedAtTime()
526
527 content, err := card.ParseContent()
528 if err != nil {
529 return nil
530 }
531
532 switch card.Type {
533 case "NOTE":
534 note, ok := content.(*xrpc.SembleNoteContent)
535 if !ok {
536 return nil
537 }
538
539 targetSource := card.URL
540 if targetSource == "" {
541 return nil
542 }
543
544 targetHash := db.HashURL(targetSource)
545 motivation := "commenting"
546 bodyValue := note.Text
547
548 return s.db.CreateAnnotation(&db.Annotation{
549 URI: uri,
550 AuthorDID: did,
551 Motivation: motivation,
552 BodyValue: &bodyValue,
553 TargetSource: targetSource,
554 TargetHash: targetHash,
555 CreatedAt: createdAt,
556 IndexedAt: time.Now(),
557 CID: cidPtr,
558 })
559
560 case "URL":
561 urlContent, ok := content.(*xrpc.SembleURLContent)
562 if !ok {
563 return nil
564 }
565
566 source := urlContent.URL
567 if source == "" {
568 return nil
569 }
570 sourceHash := db.HashURL(source)
571
572 var titlePtr *string
573 if urlContent.Metadata != nil && urlContent.Metadata.Title != "" {
574 t := urlContent.Metadata.Title
575 titlePtr = &t
576 }
577
578 return s.db.CreateBookmark(&db.Bookmark{
579 URI: uri,
580 AuthorDID: did,
581 Source: source,
582 SourceHash: sourceHash,
583 Title: titlePtr,
584 CreatedAt: createdAt,
585 IndexedAt: time.Now(),
586 CID: cidPtr,
587 })
588 }
589
590 case xrpc.CollectionSembleCollection:
591 var record xrpc.SembleCollection
592 if err := json.Unmarshal(value, &record); err != nil {
593 return err
594 }
595 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
596
597 var descPtr, iconPtr *string
598 if record.Description != "" {
599 d := record.Description
600 descPtr = &d
601 }
602 icon := "icon:semble"
603 iconPtr = &icon
604
605 return s.db.CreateCollection(&db.Collection{
606 URI: uri,
607 AuthorDID: did,
608 Name: record.Name,
609 Description: descPtr,
610 Icon: iconPtr,
611 CreatedAt: createdAt,
612 IndexedAt: time.Now(),
613 })
614
615 case xrpc.CollectionSembleCollectionLink:
616 var record xrpc.SembleCollectionLink
617 if err := json.Unmarshal(value, &record); err != nil {
618 return err
619 }
620 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
621
622 return s.db.AddToCollection(&db.CollectionItem{
623 URI: uri,
624 AuthorDID: did,
625 CollectionURI: record.Collection.URI,
626 AnnotationURI: record.Card.URI,
627 Position: 0,
628 CreatedAt: createdAt,
629 IndexedAt: time.Now(),
630 })
631
632 case xrpc.CollectionAPIKey:
633 var record xrpc.APIKeyRecord
634 if err := json.Unmarshal(value, &record); err != nil {
635 return err
636 }
637 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
638
639 parts := strings.Split(uri, "/")
640 rkey := parts[len(parts)-1]
641
642 return s.db.CreateAPIKey(&db.APIKey{
643 ID: rkey,
644 OwnerDID: did,
645 Name: record.Name,
646 KeyHash: record.KeyHash,
647 CreatedAt: createdAt,
648 URI: uri,
649 CID: cidPtr,
650 IndexedAt: time.Now(),
651 })
652
653 case xrpc.CollectionDocument:
654 var record struct {
655 Site string `json:"site"`
656 Path string `json:"path"`
657 Title string `json:"title"`
658 Description string `json:"description"`
659 TextContent string `json:"textContent"`
660 Tags []string `json:"tags"`
661 PublishedAt string `json:"publishedAt"`
662 CanonicalURL string `json:"canonicalUrl"`
663 }
664 if err := json.Unmarshal(value, &record); err != nil {
665 return err
666 }
667 if record.Title == "" || record.Site == "" {
668 return nil
669 }
670 publishedAt, err := time.Parse(time.RFC3339, record.PublishedAt)
671 if err != nil {
672 publishedAt = time.Now()
673 }
674 canonicalURL := standardsite.ResolveCanonicalURL(record.Site, record.Path, record.CanonicalURL)
675 if canonicalURL == "" {
676 return nil
677 }
678 var pathPtr, descPtr, textPtr, tagsJSONPtr *string
679 if record.Path != "" {
680 pathPtr = &record.Path
681 }
682 if record.Description != "" {
683 descPtr = &record.Description
684 }
685 if record.TextContent != "" {
686 textPtr = &record.TextContent
687 }
688 if len(record.Tags) > 0 {
689 tagsBytes, _ := json.Marshal(record.Tags)
690 tagsStr := string(tagsBytes)
691 tagsJSONPtr = &tagsStr
692 }
693 if err := verification.VerifyDocument(canonicalURL, uri); err != nil {
694 return nil
695 }
696 return s.db.UpsertDocument(&db.Document{
697 URI: uri,
698 AuthorDID: did,
699 Site: record.Site,
700 Path: pathPtr,
701 Title: record.Title,
702 Description: descPtr,
703 TextContent: textPtr,
704 TagsJSON: tagsJSONPtr,
705 CanonicalURL: canonicalURL,
706 PublishedAt: publishedAt,
707 IndexedAt: time.Now(),
708 })
709
710 case xrpc.CollectionPreferences:
711 var record xrpc.PreferencesRecord
712 if err := json.Unmarshal(value, &record); err != nil {
713 return err
714 }
715 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
716
717 var skippedHostnamesPtr *string
718 if len(record.ExternalLinkSkippedHostnames) > 0 {
719 hostnamesBytes, _ := json.Marshal(record.ExternalLinkSkippedHostnames)
720 hostnamesStr := string(hostnamesBytes)
721 skippedHostnamesPtr = &hostnamesStr
722 }
723
724 var subscribedLabelersPtr *string
725 if len(record.SubscribedLabelers) > 0 {
726 labelersBytes, _ := json.Marshal(record.SubscribedLabelers)
727 s := string(labelersBytes)
728 subscribedLabelersPtr = &s
729 }
730
731 var labelPrefsPtr *string
732 if len(record.LabelPreferences) > 0 {
733 prefsBytes, _ := json.Marshal(record.LabelPreferences)
734 s := string(prefsBytes)
735 labelPrefsPtr = &s
736 }
737
738 return s.db.UpsertPreferences(&db.Preferences{
739 URI: uri,
740 AuthorDID: did,
741 ExternalLinkSkippedHostnames: skippedHostnamesPtr,
742 SubscribedLabelers: subscribedLabelersPtr,
743 LabelPreferences: labelPrefsPtr,
744 DisableExternalLinkWarning: record.DisableExternalLinkWarning,
745 CreatedAt: createdAt,
746 IndexedAt: time.Now(),
747 CID: cidPtr,
748 })
749 }
750 return nil
751}