bluesky appview implementation using microcosm and other services server.reddwarf.app
appview bluesky reddwarf microcosm

getPostThreadV2 mostly full impl w/ op prio (kinda slow and no sort)

Changed files
+680 -171
shims
lex
app
bsky
actor
unspecced
getpostthreadv2
+8 -168
main.go
··· 20 20 "tangled.org/whey.party/red-dwarf-server/microcosm/slingshot" 21 21 appbskyactordefs "tangled.org/whey.party/red-dwarf-server/shims/lex/app/bsky/actor/defs" 22 22 appbskyfeeddefs "tangled.org/whey.party/red-dwarf-server/shims/lex/app/bsky/feed/defs" 23 + appbskyunspeccedgetpostthreadv2 "tangled.org/whey.party/red-dwarf-server/shims/lex/app/bsky/unspecced/getpostthreadv2" 23 24 "tangled.org/whey.party/red-dwarf-server/shims/utils" 24 25 "tangled.org/whey.party/red-dwarf-server/sticket" 25 26 "tangled.org/whey.party/red-dwarf-server/store" ··· 79 80 cs := constellation.NewConstellation(CONSTELLATION_URL) 80 81 // spacedust is type definitions only 81 82 // jetstream types is probably available from jetstream/pkg/models 83 + 84 + //threadGraphCache := make(map[syntax.ATURI]appbskyunspeccedgetpostthreadv2.ThreadGraph) 82 85 83 86 router_raw := gin.New() 84 87 router_raw.Use(gin.Logger()) ··· 626 629 } 627 630 router.GET("/xrpc/app.bsky.unspecced.getPostThreadV2", 628 631 func(c *gin.Context) { 629 - ctx := c.Request.Context() 630 - 631 - rawdid := c.GetString("user_did") 632 - var viewer *utils.DID 633 - didval, errdid := utils.NewDID(rawdid) 634 - if errdid != nil { 635 - viewer = nil 636 - } else { 637 - viewer = &didval 638 - } 639 - 640 - threadAnchorURIraw := c.Query("anchor") 641 - if threadAnchorURIraw == "" { 642 - c.JSON(http.StatusBadRequest, gin.H{"error": "Missing feed param"}) 643 - return 644 - } 645 - 646 - threadAnchorURI, err := syntax.ParseATURI(threadAnchorURIraw) 647 - if err != nil { 648 - return 649 - } 650 - 651 - //var thread []*appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem 652 - 653 - var skeletonposts []string 654 - 655 - emptystrarray := &[]string{} 656 - limit := 100 657 - 658 - // recurse to the top 659 - parentsMap := map[string]*appbsky.FeedDefs_PostView{} 660 - current := threadAnchorURI 661 - iteration := 0 662 - for true { 663 - iteration = iteration + 1 664 - if iteration > 10 { 665 - break 666 - } 667 - recordaturi, err := syntax.ParseATURI("at://" + current.Authority().String() + "/" + string(current.Collection()) + "/" + string(current.RecordKey())) 668 - if err != nil { 669 - break 670 - } 671 - // loop 672 - postView, postRecord, err := appbskyfeeddefs.PostView(ctx, recordaturi.String(), sl, cs, BSKYIMAGECDN_URL, viewer, 3) 673 - if err != nil { 674 - break 675 - } 676 - if postView == nil { 677 - break 678 - } 679 - //if iteration != 1 { 680 - skeletonposts = append([]string{recordaturi.String()}, skeletonposts...) 681 - //} 682 - parentsMap[recordaturi.String()] = postView 683 - if postRecord != nil && postRecord.Reply != nil && postRecord.Reply.Parent != nil && postRecord.Reply.Root != nil { 684 - if postRecord.Reply.Parent.Uri == postRecord.Reply.Root.Uri { 685 - //break 686 - iteration = 9 687 - } 688 - 689 - current, err = syntax.ParseATURI(postRecord.Reply.Parent.Uri) 690 - if err != nil { 691 - break 692 - } 693 - } else { 694 - log.Println("what huh what hwat") 695 - break //whatt 696 - } 697 - 698 - } 699 - 700 - //skeletonposts = append(skeletonposts, threadAnchorURI.String()) 701 - // todo: theres a cursor!!! pagination please! 702 - // todo: also i doubt im gonna do proper threadding right now, so make sure to remind me to do it properly some time later thanks 703 - //rootReplies, _ := constellation.GetBacklinks(ctx, cs, string(threadAnchorURI), "app.bsky.feed.post:reply.root.uri", *emptystrarray, &limit, nil) 704 - parentReplies, _ := constellation.GetBacklinks(ctx, cs, string(threadAnchorURI), "app.bsky.feed.post:reply.parent.uri", *emptystrarray, &limit, nil) 705 - 706 - for _, rec := range parentReplies.Records { 707 - recordaturi, err := syntax.ParseATURI("at://" + rec.Did + "/" + rec.Collection + "/" + rec.Rkey) 708 - if err != nil { 709 - continue 710 - } 711 - skeletonposts = append(skeletonposts, recordaturi.String()) 712 - } 713 - maplen := len(parentsMap) 714 - concurrentResults := MapConcurrent( 715 - ctx, 716 - skeletonposts, 717 - 20, 718 - func(ctx context.Context, raw string, idx int) (*appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem, error) { 719 - var postView *appbsky.FeedDefs_PostView 720 - fromParentChain := false 721 - postView, ok := parentsMap[raw] 722 - if !ok { 723 - post, _, err := appbskyfeeddefs.PostView(ctx, raw, sl, cs, BSKYIMAGECDN_URL, viewer, 3) 724 - if err != nil { 725 - return nil, err 726 - } 727 - if post == nil { 728 - return nil, fmt.Errorf("post not found") 729 - } 730 - postView = post 731 - } else { 732 - fromParentChain = true 733 - } 734 - 735 - depth := int64(1) 736 - if raw == threadAnchorURI.String() { 737 - depth = 0 738 - } 739 - if fromParentChain { 740 - depth = int64(0 - maplen + idx + 1) 741 - } 742 - 743 - return &appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem{ 744 - // Depth int64 `json:"depth" cborgen:"depth"` 745 - Depth: depth, // todo: placeholder 746 - // Uri string `json:"uri" cborgen:"uri"` 747 - Uri: raw, 748 - // Value *UnspeccedGetPostThreadOtherV2_ThreadItem_Value `json:"value" cborgen:"value"` 749 - Value: &appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem_Value{ 750 - // UnspeccedDefs_ThreadItemPost *UnspeccedDefs_ThreadItemPost 751 - UnspeccedDefs_ThreadItemPost: &appbsky.UnspeccedDefs_ThreadItemPost{ 752 - // LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.unspecced.defs#threadItemPost"` 753 - LexiconTypeID: "app.bsky.unspecced.defs#threadItemPost", 754 - // // hiddenByThreadgate: The threadgate created by the author indicates this post as a reply to be hidden for everyone consuming the thread. 755 - // HiddenByThreadgate bool `json:"hiddenByThreadgate" cborgen:"hiddenByThreadgate"` 756 - HiddenByThreadgate: false, // todo: placeholder 757 - // // moreParents: This post has more parents that were not present in the response. This is just a boolean, without the number of parents. 758 - // MoreParents bool `json:"moreParents" cborgen:"moreParents"` 759 - MoreParents: false, // todo: placeholder 760 - // // moreReplies: This post has more replies that were not present in the response. This is a numeric value, which is best-effort and might not be accurate. 761 - // MoreReplies int64 `json:"moreReplies" cborgen:"moreReplies"` 762 - MoreReplies: 0, // todo: placeholder 763 - // // mutedByViewer: This is by an account muted by the viewer requesting it. 764 - // MutedByViewer bool `json:"mutedByViewer" cborgen:"mutedByViewer"` 765 - MutedByViewer: false, // todo: placeholder 766 - // // opThread: This post is part of a contiguous thread by the OP from the thread root. Many different OP threads can happen in the same thread. 767 - // OpThread bool `json:"opThread" cborgen:"opThread"` 768 - OpThread: false, // todo: placeholder 769 - // Post *FeedDefs_PostView `json:"post" cborgen:"post"` 770 - Post: postView, 771 - }, 772 - }, 773 - }, nil 774 - }, 775 - ) 776 - 777 - // build final slice 778 - out := make([]*appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem, 0, len(concurrentResults)) 779 - for _, r := range concurrentResults { 780 - if r.Err == nil && r.Value != nil && r.Value.Value != nil && r.Value.Value.UnspeccedDefs_ThreadItemPost != nil && r.Value.Value.UnspeccedDefs_ThreadItemPost.Post != nil { 781 - out = append(out, r.Value) 782 - } 783 - } 784 - 785 - // c.JSON(http.StatusOK, &appbsky.UnspeccedGetPostThreadOtherV2_Output{ 786 - // // Thread []*UnspeccedGetPostThreadOtherV2_ThreadItem `json:"thread" cborgen:"thread"` 787 - // Thread: out, 788 - // HasOtherReplies: false, 789 - // }) 790 - resp := &GetPostThreadOtherV2_Output_WithOtherReplies{ 791 - UnspeccedGetPostThreadOtherV2_Output: appbsky.UnspeccedGetPostThreadOtherV2_Output{ 792 - Thread: out, 793 - }, 794 - HasOtherReplies: false, 795 - } 796 - c.JSON(http.StatusOK, resp) 632 + //appbskyunspeccedgetpostthreadv2.HandleGetPostThreadV2(c, sl, cs, BSKYIMAGECDN_URL) 633 + // V2V2 still doesnt work. should probably make the handler from scratch to fully use the thread grapher. 634 + // also the thread grapher is still sequental. pls fix that 635 + //appbskyunspeccedgetpostthreadv2.HandleGetPostThreadV2V2(c, sl, cs, BSKYIMAGECDN_URL) 636 + appbskyunspeccedgetpostthreadv2.HandleGetPostThreadV2V3(c, sl, cs, BSKYIMAGECDN_URL) 797 637 }) 798 638 799 639 // weird stuff
+1 -1
readme.md
··· 15 15 - `app.bsky.feed.getPosts` (post rendering is incomplete) 16 16 - `app.bsky.feed.getFeed` (post rendering is incomplete) 17 17 - `app.bsky.unspecced.getConfig` (placeholder) 18 - - `app.bsky.unspecced.getPostThreadV2` (thread rendering is incomplete) 18 + - `app.bsky.unspecced.getPostThreadV2` (mostly working! doesnt use prefered sort, not performant)
+1 -2
shims/lex/app/bsky/actor/defs/profileview.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 - "log" 7 6 8 7 "github.com/bluesky-social/indigo/api/agnostic" 9 8 appbsky "github.com/bluesky-social/indigo/api/bsky" ··· 91 90 //viewerProfileURI, err_viewerProfileURI := syntax.ParseATURI("at://" + viewerProfileDID + "/app.bsky.actor.profile/self") 92 91 //targetProfileURI, err_targetProfileURI := syntax.ParseATURI("at://" + targetProfileDID + "/app.bsky.actor.profile/self") 93 92 94 - log.Println("viewerProfileDID: " + viewerProfileDID + " and targetProfileDID: " + targetProfileDID) 93 + //log.Println("viewerProfileDID: " + viewerProfileDID + " and targetProfileDID: " + targetProfileDID) 95 94 96 95 if viewerProfileDID != "" && targetProfileDID != "" { 97 96 blockingBacklink, err_blockingBacklink := constellation.GetBacklinks(ctx, cs, targetProfileDID, "app.bsky.graph.block:subject", []string{viewerProfileDID}, nil, nil)
+525
shims/lex/app/bsky/unspecced/getpostthreadv2/query.go
··· 1 + package appbskyunspeccedgetpostthreadv2 2 + 3 + import ( 4 + "net/http" 5 + "strconv" 6 + "sync" 7 + 8 + "context" 9 + "fmt" 10 + "log" 11 + 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 + "github.com/gin-gonic/gin" 14 + "tangled.org/whey.party/red-dwarf-server/microcosm" 15 + "tangled.org/whey.party/red-dwarf-server/shims/utils" 16 + 17 + "tangled.org/whey.party/red-dwarf-server/microcosm/constellation" 18 + appbskyfeeddefs "tangled.org/whey.party/red-dwarf-server/shims/lex/app/bsky/feed/defs" 19 + 20 + appbsky "github.com/bluesky-social/indigo/api/bsky" 21 + ) 22 + 23 + func HandleGetPostThreadV2(c *gin.Context, sl *microcosm.MicrocosmClient, cs *microcosm.MicrocosmClient, imgcdn string) { 24 + ctx := c.Request.Context() 25 + 26 + rawdid := c.GetString("user_did") 27 + var viewer *utils.DID 28 + didval, errdid := utils.NewDID(rawdid) 29 + if errdid != nil { 30 + viewer = nil 31 + } else { 32 + viewer = &didval 33 + } 34 + 35 + threadAnchorURIraw := c.Query("anchor") 36 + if threadAnchorURIraw == "" { 37 + c.JSON(http.StatusBadRequest, gin.H{"error": "Missing feed param"}) 38 + return 39 + } 40 + 41 + threadAnchorURI, err := syntax.ParseATURI(threadAnchorURIraw) 42 + if err != nil { 43 + return 44 + } 45 + 46 + //var thread []*appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem 47 + 48 + var skeletonposts []string 49 + 50 + emptystrarray := &[]string{} 51 + limit := 100 52 + 53 + // recurse to the top 54 + parentsMap := map[string]*appbsky.FeedDefs_PostView{} 55 + current := threadAnchorURI 56 + iteration := 0 57 + for true { 58 + iteration = iteration + 1 59 + if iteration > 10 { 60 + break 61 + } 62 + recordaturi, err := syntax.ParseATURI("at://" + current.Authority().String() + "/" + string(current.Collection()) + "/" + string(current.RecordKey())) 63 + if err != nil { 64 + break 65 + } 66 + // loop 67 + postView, postRecord, err := appbskyfeeddefs.PostView(ctx, recordaturi.String(), sl, cs, imgcdn, viewer, 3) 68 + if err != nil { 69 + break 70 + } 71 + if postView == nil { 72 + break 73 + } 74 + //if iteration != 1 { 75 + skeletonposts = append([]string{recordaturi.String()}, skeletonposts...) 76 + //} 77 + parentsMap[recordaturi.String()] = postView 78 + if postRecord != nil && postRecord.Reply != nil && postRecord.Reply.Parent != nil && postRecord.Reply.Root != nil { 79 + if postRecord.Reply.Parent.Uri == postRecord.Reply.Root.Uri { 80 + //break 81 + iteration = 9 82 + } 83 + 84 + current, err = syntax.ParseATURI(postRecord.Reply.Parent.Uri) 85 + if err != nil { 86 + break 87 + } 88 + } else { 89 + log.Println("what huh what hwat") 90 + break //whatt 91 + } 92 + 93 + } 94 + 95 + //skeletonposts = append(skeletonposts, threadAnchorURI.String()) 96 + // todo: theres a cursor!!! pagination please! 97 + // todo: also i doubt im gonna do proper threadding right now, so make sure to remind me to do it properly some time later thanks 98 + //rootReplies, _ := constellation.GetBacklinks(ctx, cs, string(threadAnchorURI), "app.bsky.feed.post:reply.root.uri", *emptystrarray, &limit, nil) 99 + parentReplies, _ := constellation.GetBacklinks(ctx, cs, string(threadAnchorURI), "app.bsky.feed.post:reply.parent.uri", *emptystrarray, &limit, nil) 100 + 101 + for _, rec := range parentReplies.Records { 102 + recordaturi, err := syntax.ParseATURI("at://" + rec.Did + "/" + rec.Collection + "/" + rec.Rkey) 103 + if err != nil { 104 + continue 105 + } 106 + skeletonposts = append(skeletonposts, recordaturi.String()) 107 + } 108 + maplen := len(parentsMap) 109 + concurrentResults := MapConcurrent( 110 + ctx, 111 + skeletonposts, 112 + 20, 113 + func(ctx context.Context, raw string, idx int) (*appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem, error) { 114 + var postView *appbsky.FeedDefs_PostView 115 + fromParentChain := false 116 + postView, ok := parentsMap[raw] 117 + if !ok { 118 + post, _, err := appbskyfeeddefs.PostView(ctx, raw, sl, cs, imgcdn, viewer, 3) 119 + if err != nil { 120 + return nil, err 121 + } 122 + if post == nil { 123 + return nil, fmt.Errorf("post not found") 124 + } 125 + postView = post 126 + } else { 127 + fromParentChain = true 128 + } 129 + 130 + depth := int64(1) 131 + if raw == threadAnchorURI.String() { 132 + depth = 0 133 + } 134 + if fromParentChain { 135 + depth = int64(0 - maplen + idx + 1) 136 + } 137 + 138 + return &appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem{ 139 + // Depth int64 `json:"depth" cborgen:"depth"` 140 + Depth: depth, // todo: placeholder 141 + // Uri string `json:"uri" cborgen:"uri"` 142 + Uri: raw, 143 + // Value *UnspeccedGetPostThreadOtherV2_ThreadItem_Value `json:"value" cborgen:"value"` 144 + Value: &appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem_Value{ 145 + // UnspeccedDefs_ThreadItemPost *UnspeccedDefs_ThreadItemPost 146 + UnspeccedDefs_ThreadItemPost: &appbsky.UnspeccedDefs_ThreadItemPost{ 147 + // LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.unspecced.defs#threadItemPost"` 148 + LexiconTypeID: "app.bsky.unspecced.defs#threadItemPost", 149 + // // hiddenByThreadgate: The threadgate created by the author indicates this post as a reply to be hidden for everyone consuming the thread. 150 + // HiddenByThreadgate bool `json:"hiddenByThreadgate" cborgen:"hiddenByThreadgate"` 151 + HiddenByThreadgate: false, // todo: placeholder 152 + // // moreParents: This post has more parents that were not present in the response. This is just a boolean, without the number of parents. 153 + // MoreParents bool `json:"moreParents" cborgen:"moreParents"` 154 + MoreParents: false, // todo: placeholder 155 + // // moreReplies: This post has more replies that were not present in the response. This is a numeric value, which is best-effort and might not be accurate. 156 + // MoreReplies int64 `json:"moreReplies" cborgen:"moreReplies"` 157 + MoreReplies: 0, // todo: placeholder 158 + // // mutedByViewer: This is by an account muted by the viewer requesting it. 159 + // MutedByViewer bool `json:"mutedByViewer" cborgen:"mutedByViewer"` 160 + MutedByViewer: false, // todo: placeholder 161 + // // opThread: This post is part of a contiguous thread by the OP from the thread root. Many different OP threads can happen in the same thread. 162 + // OpThread bool `json:"opThread" cborgen:"opThread"` 163 + OpThread: false, // todo: placeholder 164 + // Post *FeedDefs_PostView `json:"post" cborgen:"post"` 165 + Post: postView, 166 + }, 167 + }, 168 + }, nil 169 + }, 170 + ) 171 + 172 + // build final slice 173 + out := make([]*appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem, 0, len(concurrentResults)) 174 + for _, r := range concurrentResults { 175 + if r.Err == nil && r.Value != nil && r.Value.Value != nil && r.Value.Value.UnspeccedDefs_ThreadItemPost != nil && r.Value.Value.UnspeccedDefs_ThreadItemPost.Post != nil { 176 + out = append(out, r.Value) 177 + } 178 + } 179 + 180 + // c.JSON(http.StatusOK, &appbsky.UnspeccedGetPostThreadOtherV2_Output{ 181 + // // Thread []*UnspeccedGetPostThreadOtherV2_ThreadItem `json:"thread" cborgen:"thread"` 182 + // Thread: out, 183 + // HasOtherReplies: false, 184 + // }) 185 + resp := &GetPostThreadOtherV2_Output_WithOtherReplies{ 186 + UnspeccedGetPostThreadOtherV2_Output: appbsky.UnspeccedGetPostThreadOtherV2_Output{ 187 + Thread: out, 188 + }, 189 + HasOtherReplies: false, 190 + } 191 + c.JSON(http.StatusOK, resp) 192 + } 193 + 194 + type GetPostThreadOtherV2_Output_WithOtherReplies struct { 195 + appbsky.UnspeccedGetPostThreadOtherV2_Output 196 + HasOtherReplies bool `json:"hasOtherReplies"` 197 + } 198 + 199 + type AsyncResult[T any] struct { 200 + Value T 201 + Err error 202 + } 203 + 204 + func MapConcurrent[T any, R any]( 205 + ctx context.Context, 206 + items []T, 207 + concurrencyLimit int, 208 + mapper func(context.Context, T, int) (R, error), 209 + ) []AsyncResult[R] { 210 + if len(items) == 0 { 211 + return nil 212 + } 213 + 214 + results := make([]AsyncResult[R], len(items)) 215 + var wg sync.WaitGroup 216 + 217 + sem := make(chan struct{}, concurrencyLimit) 218 + 219 + for i, item := range items { 220 + wg.Add(1) 221 + go func(idx int, input T) { 222 + defer wg.Done() 223 + 224 + sem <- struct{}{} 225 + defer func() { <-sem }() 226 + 227 + if ctx.Err() != nil { 228 + results[idx] = AsyncResult[R]{Err: ctx.Err()} 229 + return 230 + } 231 + 232 + val, err := mapper(ctx, input, idx) 233 + results[idx] = AsyncResult[R]{Value: val, Err: err} 234 + }(i, item) 235 + } 236 + 237 + wg.Wait() 238 + return results 239 + } 240 + 241 + type SkeletonPost struct { 242 + post syntax.ATURI 243 + depth int 244 + } 245 + 246 + func HandleGetPostThreadV2V3(c *gin.Context, sl *microcosm.MicrocosmClient, cs *microcosm.MicrocosmClient, imgcdn string) { 247 + ctx := c.Request.Context() 248 + 249 + rawdid := c.GetString("user_did") 250 + var viewer *utils.DID 251 + didval, errdid := utils.NewDID(rawdid) 252 + if errdid != nil { 253 + viewer = nil 254 + } else { 255 + viewer = &didval 256 + } 257 + 258 + threadAnchorURIraw := c.Query("anchor") 259 + if threadAnchorURIraw == "" { 260 + c.JSON(http.StatusBadRequest, gin.H{"error": "Missing feed param"}) 261 + return 262 + } 263 + 264 + // "Whether to include parents above the anchor. 265 + // bool as string 266 + // true as default 267 + aboveParam := c.Query("above") // why would you need above = false ? 268 + above := true 269 + if aboveParam == "false" { 270 + above = false 271 + } 272 + 273 + // "How many levels of replies to include below the anchor." 274 + // integer as string 275 + // default: 6, min: 0, max: 20 276 + belowParam := c.Query("below") // bskydefault: 10 277 + below, err_below := strconv.ParseInt(belowParam, 10, 64) 278 + if err_below != nil { 279 + below = 6 280 + } else { 281 + if below > 20 { 282 + below = 20 283 + } 284 + if below < 0 { 285 + below = 0 286 + } 287 + } 288 + 289 + // "Maximum of replies to include at each level of the thread, except for the direct replies to the anchor, 290 + // which are (NOTE: currently, during unspecced phase) all returned (NOTE: later they might be paginated)." 291 + // integer as string 292 + // default: 10, min: 0, max: 100 293 + branchingFactorParam := c.Query("branchingFactor") // bskydefault: 1 294 + branchingFactor, err_branchingFactor := strconv.ParseInt(branchingFactorParam, 10, 64) 295 + if err_branchingFactor != nil { 296 + branchingFactor = 10 297 + } else { 298 + if branchingFactor > 100 { 299 + branchingFactor = 100 300 + } 301 + if branchingFactor < 0 { 302 + branchingFactor = 0 303 + } 304 + } 305 + 306 + // "Sorting for the thread replies" 307 + // string (enum) ["newest", "oldest", "top"] 308 + // default: "oldest" 309 + sortParam := c.Query("sort") // bskydefault: top 310 + sort := sortParam 311 + if sort != "newest" && sort != "oldest" && sort != "top" { 312 + sort = "top" 313 + } 314 + 315 + threadAnchorURI, err := syntax.ParseATURI(threadAnchorURIraw) 316 + if err != nil { 317 + return 318 + } 319 + 320 + threadGraph, err := ThreadGrapher(ctx, cs, sl, threadAnchorURI) 321 + if err != nil { 322 + c.JSON(http.StatusBadRequest, gin.H{"error": ("failed to graph the thread: " + err.Error())}) 323 + return 324 + } 325 + var skeletonposts []SkeletonPost 326 + 327 + // Parent Chain 328 + parentChainHeight := 0 329 + 330 + // root := threadGraph.RootURI 331 + if above { 332 + current := threadAnchorURI 333 + 334 + for true { 335 + log.Println("[parent threader] current: " + current.Authority().String() + string(current.Collection()) + string(current.RecordKey())) 336 + if parentChainHeight > 20 { 337 + // root = current 338 + break 339 + } 340 + parent, ok := threadGraph.ParentsMap[current] 341 + if !ok { 342 + // root = current 343 + break 344 + } 345 + parentChainHeight = parentChainHeight + 1 346 + skeletonposts = append(skeletonposts, SkeletonPost{ 347 + post: parent, 348 + depth: -parentChainHeight, 349 + }) 350 + current = parent 351 + } 352 + flipArray(&skeletonposts) 353 + } 354 + 355 + // handled by the recurser 356 + // // Anchor Post 357 + // skeletonposts = append(skeletonposts, SkeletonPost{ 358 + // post: threadAnchorURI, 359 + // depth: 0, 360 + // }) 361 + 362 + // Tree Replies (with OP thread priority) 363 + // should probably be recursive 364 + recursiveHandleV2V3TreeReplies(threadGraph, &skeletonposts, threadAnchorURI, &below, &branchingFactor, &sort, 0, 0) 365 + 366 + //maplen := len(parentsMap) 367 + concurrentResults := MapConcurrent( 368 + ctx, 369 + skeletonposts, 370 + 20, 371 + func(ctx context.Context, raw SkeletonPost, idx int) (*appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem, error) { 372 + var postView *appbsky.FeedDefs_PostView 373 + //fromParentChain := false 374 + //postView, ok := parentsMap[raw] 375 + //if !ok { 376 + post, _, err := appbskyfeeddefs.PostView(ctx, raw.post.String(), sl, cs, imgcdn, viewer, 3) 377 + if err != nil { 378 + return nil, err 379 + } 380 + if post == nil { 381 + return nil, fmt.Errorf("post not found") 382 + } 383 + postView = post 384 + //} else { 385 + // fromParentChain = true 386 + //} 387 + 388 + depth := int64(1) 389 + // if raw == threadAnchorURI.String() { 390 + // depth = 0 391 + // } 392 + // if fromParentChain { 393 + // depth = int64(0 - parentChainHeight + idx + 1) 394 + // } 395 + depth = int64(raw.depth) 396 + 397 + return &appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem{ 398 + // Depth int64 `json:"depth" cborgen:"depth"` 399 + Depth: depth, // todo: placeholder 400 + // Uri string `json:"uri" cborgen:"uri"` 401 + Uri: raw.post.String(), 402 + // Value *UnspeccedGetPostThreadOtherV2_ThreadItem_Value `json:"value" cborgen:"value"` 403 + Value: &appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem_Value{ 404 + // UnspeccedDefs_ThreadItemPost *UnspeccedDefs_ThreadItemPost 405 + UnspeccedDefs_ThreadItemPost: &appbsky.UnspeccedDefs_ThreadItemPost{ 406 + // LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.unspecced.defs#threadItemPost"` 407 + LexiconTypeID: "app.bsky.unspecced.defs#threadItemPost", 408 + // // hiddenByThreadgate: The threadgate created by the author indicates this post as a reply to be hidden for everyone consuming the thread. 409 + // HiddenByThreadgate bool `json:"hiddenByThreadgate" cborgen:"hiddenByThreadgate"` 410 + HiddenByThreadgate: false, // todo: placeholder 411 + // // moreParents: This post has more parents that were not present in the response. This is just a boolean, without the number of parents. 412 + // MoreParents bool `json:"moreParents" cborgen:"moreParents"` 413 + MoreParents: false, // todo: placeholder 414 + // // moreReplies: This post has more replies that were not present in the response. This is a numeric value, which is best-effort and might not be accurate. 415 + // MoreReplies int64 `json:"moreReplies" cborgen:"moreReplies"` 416 + MoreReplies: 0, // todo: placeholder 417 + // // mutedByViewer: This is by an account muted by the viewer requesting it. 418 + // MutedByViewer bool `json:"mutedByViewer" cborgen:"mutedByViewer"` 419 + MutedByViewer: false, // todo: placeholder 420 + // // opThread: This post is part of a contiguous thread by the OP from the thread root. Many different OP threads can happen in the same thread. 421 + // OpThread bool `json:"opThread" cborgen:"opThread"` 422 + OpThread: false, // todo: placeholder 423 + // Post *FeedDefs_PostView `json:"post" cborgen:"post"` 424 + Post: postView, 425 + }, 426 + }, 427 + }, nil 428 + }, 429 + ) 430 + 431 + // build final slice 432 + out := make([]*appbsky.UnspeccedGetPostThreadOtherV2_ThreadItem, 0, len(concurrentResults)) 433 + for _, r := range concurrentResults { 434 + if r.Err == nil && r.Value != nil && r.Value.Value != nil && r.Value.Value.UnspeccedDefs_ThreadItemPost != nil && r.Value.Value.UnspeccedDefs_ThreadItemPost.Post != nil { 435 + out = append(out, r.Value) 436 + } 437 + } 438 + 439 + // c.JSON(http.StatusOK, &appbsky.UnspeccedGetPostThreadOtherV2_Output{ 440 + // // Thread []*UnspeccedGetPostThreadOtherV2_ThreadItem `json:"thread" cborgen:"thread"` 441 + // Thread: out, 442 + // HasOtherReplies: false, 443 + // }) 444 + resp := &GetPostThreadOtherV2_Output_WithOtherReplies{ 445 + UnspeccedGetPostThreadOtherV2_Output: appbsky.UnspeccedGetPostThreadOtherV2_Output{ 446 + Thread: out, 447 + }, 448 + HasOtherReplies: false, 449 + } 450 + c.JSON(http.StatusOK, resp) 451 + } 452 + 453 + func flipArray[T any](s *[]T) { 454 + for i, j := 0, len(*s)-1; i < j; i, j = i+1, j-1 { 455 + (*s)[i], (*s)[j] = (*s)[j], (*s)[i] 456 + } 457 + } 458 + 459 + func recursiveHandleV2V3TreeReplies(threadGraph *ThreadGraph, skeletonposts *[]SkeletonPost, current syntax.ATURI, below *int64, branchingFactor *int64, sort *string, verticalPos int64, horizontalPos int64) { 460 + log.Println("[V3 Recurse] at y: " + fmt.Sprint(verticalPos) + ", x: " + fmt.Sprint(horizontalPos)) 461 + // breakings 462 + if below != nil && verticalPos > *below { 463 + log.Println("[V3 Recurse] exit by too low") 464 + return 465 + } 466 + if branchingFactor != nil && horizontalPos > *branchingFactor && verticalPos > 1 { 467 + log.Println("[V3 Recurse] exit by too wide; branchingFactor: " + fmt.Sprint(*branchingFactor) + "; horizontalPos: " + fmt.Sprint(horizontalPos) + "; verticalPos: " + fmt.Sprint(verticalPos)) 468 + return 469 + } 470 + 471 + // the things to do if not recurse 472 + if skeletonposts != nil { 473 + *skeletonposts = append(*skeletonposts, SkeletonPost{ 474 + post: current, 475 + depth: int(verticalPos), 476 + }) 477 + } else { 478 + log.Println("[V3 Recurse] exit by no skeleton posts") 479 + return 480 + } 481 + 482 + repliesAtThisPosition, ok := threadGraph.ChildrenMap[current] 483 + if !ok { 484 + log.Println("[V3 Recurse] exit by no replies") 485 + return 486 + } 487 + 488 + // recurse 489 + op := threadGraph.RootURI.Authority() 490 + 491 + // We need a modifyable copy of the slice because we are about to reorder it 492 + // and we don't want to mess up the original map in case it's used elsewhere. 493 + sortedReplies := make([]syntax.ATURI, len(repliesAtThisPosition)) 494 + copy(sortedReplies, repliesAtThisPosition) 495 + 496 + // 1. Find the "best" OP reply (lexicographically smallest rkey usually means oldest) 497 + bestOpIndex := -1 498 + var bestOpURI syntax.ATURI 499 + 500 + for i, replyURI := range sortedReplies { 501 + if replyURI.Authority() == op { 502 + // If this is the first OP reply we found, or if it's lexicographically smaller (older) than the previous best 503 + if bestOpIndex == -1 || replyURI.String() < bestOpURI.String() { 504 + bestOpIndex = i 505 + bestOpURI = replyURI 506 + } 507 + } 508 + } 509 + 510 + // 2. If we found an OP reply, move it to index 0 511 + if bestOpIndex > 0 { // If it's already 0, no need to move 512 + // Remove the best OP reply from its current spot 513 + // (Go slice trick: append everything before it + everything after it) 514 + withoutBest := append(sortedReplies[:bestOpIndex], sortedReplies[bestOpIndex+1:]...) 515 + 516 + // Prepend it to the front 517 + sortedReplies = append([]syntax.ATURI{bestOpURI}, withoutBest...) 518 + } 519 + 520 + for idx, reply := range sortedReplies { 521 + recursiveHandleV2V3TreeReplies(threadGraph, skeletonposts, reply, below, branchingFactor, sort, verticalPos+1, int64(idx+1)) 522 + } 523 + 524 + // going up 525 + }
+145
shims/lex/app/bsky/unspecced/getpostthreadv2/threadgrapher.go
··· 1 + package appbskyunspeccedgetpostthreadv2 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "log" 8 + "sync" 9 + 10 + "github.com/bluesky-social/indigo/api/agnostic" 11 + //comatproto "github.com/bluesky-social/indigo/api/atproto" 12 + appbsky "github.com/bluesky-social/indigo/api/bsky" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + 15 + "tangled.org/whey.party/red-dwarf-server/microcosm" 16 + "tangled.org/whey.party/red-dwarf-server/microcosm/constellation" 17 + ) 18 + 19 + type ThreadGraph struct { 20 + mu sync.RWMutex // Mutex required for thread-safe updates (Firehose) 21 + 22 + RootURI syntax.ATURI 23 + AnchorURI syntax.ATURI 24 + 25 + ParentsMap map[syntax.ATURI]syntax.ATURI 26 + 27 + ChildrenMap map[syntax.ATURI][]syntax.ATURI 28 + } 29 + 30 + func NewThreadGraph() *ThreadGraph { 31 + return &ThreadGraph{ 32 + ParentsMap: make(map[syntax.ATURI]syntax.ATURI), 33 + ChildrenMap: make(map[syntax.ATURI][]syntax.ATURI), 34 + } 35 + } 36 + 37 + func ThreadGrapher(ctx context.Context, cs *microcosm.MicrocosmClient, sl *microcosm.MicrocosmClient, threadAnchorURI syntax.ATURI) (*ThreadGraph, error) { 38 + tg := NewThreadGraph() 39 + tg.AnchorURI = threadAnchorURI 40 + 41 + anchorPostRecordResponse, err := agnostic.RepoGetRecord(ctx, sl, "", "app.bsky.feed.post", threadAnchorURI.Authority().String(), string(threadAnchorURI.RecordKey())) 42 + if err != nil { 43 + log.Println("[ThreadGrapher] exit by anchor post resolve failure") 44 + return nil, fmt.Errorf("failed to fetch anchor: %w", err) 45 + } 46 + 47 + var anchorPost appbsky.FeedPost 48 + if err := json.Unmarshal(*anchorPostRecordResponse.Value, &anchorPost); err != nil { 49 + log.Println("[ThreadGrapher] exit by no json") 50 + return nil, fmt.Errorf("error: Failed to parse post record JSON") 51 + } 52 + 53 + var rootURI syntax.ATURI 54 + 55 + if anchorPost.Reply != nil && anchorPost.Reply.Root != nil { 56 + rURI, err := syntax.ParseATURI(anchorPost.Reply.Root.Uri) 57 + if err != nil { 58 + log.Println("[ThreadGrapher] exit by invalid root uri") 59 + return nil, fmt.Errorf("invalid root uri in record: %w", err) 60 + } 61 + rootURI = rURI 62 + 63 + // todo: fiine we wont fetch the root post, but we still need to mark it as deleted somehow 64 + // rootPostRecordResponse, err := agnostic.RepoGetRecord(ctx, sl, "", "app.bsky.feed.post", rootURI.Authority().String(), string(rootURI.RecordKey())) 65 + // if err != nil { 66 + // log.Println("[ThreadGrapher] exit by cant fetch root") 67 + // return nil, fmt.Errorf("failed to fetch root: %w", err) 68 + // } 69 + 70 + // var rootPost appbsky.FeedPost 71 + // if err := json.Unmarshal(*rootPostRecordResponse.Value, &rootPost); err != nil { 72 + // log.Println("[ThreadGrapher] exit by cant parse root json") 73 + // return nil, fmt.Errorf("error: Failed to parse post record JSON") 74 + // } 75 + // if err == nil { 76 + // tg.AddNode(rootURI, &rootPost) 77 + // } 78 + } else { 79 + rootURI = threadAnchorURI 80 + } 81 + tg.RootURI = rootURI 82 + 83 + emptystrarray := &[]string{} 84 + var allRepliesATURI []syntax.ATURI 85 + limit := 100 86 + var cursor *string 87 + shouldContinue := true 88 + for shouldContinue { 89 + results, err := constellation.GetBacklinks(ctx, cs, rootURI.String(), "app.bsky.feed.post:reply.root.uri", *emptystrarray, &limit, cursor) 90 + if err != nil { 91 + log.Println("[ThreadGrapher] [root graphing] exit by backlink failure") 92 + return nil, fmt.Errorf("failed to get backlinks: %w", err) 93 + } 94 + if results.Records != nil { 95 + for _, record := range results.Records { 96 + aturi, err := syntax.ParseATURI("at://" + record.Did + "/" + record.Collection + "/" + record.Rkey) 97 + if err == nil { 98 + allRepliesATURI = append(allRepliesATURI, aturi) 99 + } 100 + } 101 + } 102 + if results.Cursor != nil { 103 + cursor = results.Cursor 104 + } else { 105 + shouldContinue = false 106 + } 107 + } 108 + 109 + processingQueue := append([]syntax.ATURI{rootURI}, allRepliesATURI...) 110 + for _, aturi := range processingQueue { 111 + //tg.Nodes = append(tg.Nodes, aturi) 112 + // graphinger 113 + emptystrarray := &[]string{} 114 + var localRepliesATURI []syntax.ATURI 115 + limit := 100 116 + var cursor *string 117 + shouldContinue := true 118 + for shouldContinue { 119 + results, err := constellation.GetBacklinks(ctx, cs, aturi.String(), "app.bsky.feed.post:reply.parent.uri", *emptystrarray, &limit, cursor) 120 + if err != nil { 121 + log.Println("[ThreadGrapher] [parent graphing] exit by no replies") 122 + return nil, fmt.Errorf("failed to get backlinks: %w", err) 123 + } 124 + if results.Records != nil { 125 + for _, record := range results.Records { 126 + aturi, err := syntax.ParseATURI("at://" + record.Did + "/" + record.Collection + "/" + record.Rkey) 127 + if err == nil { 128 + localRepliesATURI = append(localRepliesATURI, aturi) 129 + } 130 + } 131 + } 132 + if results.Cursor != nil { 133 + cursor = results.Cursor 134 + } else { 135 + shouldContinue = false 136 + } 137 + } 138 + for _, reply := range localRepliesATURI { 139 + tg.ParentsMap[reply] = aturi 140 + tg.ChildrenMap[aturi] = append(tg.ChildrenMap[aturi], reply) 141 + } 142 + } 143 + 144 + return tg, nil 145 + }