bluesky appview implementation using microcosm and other services server.reddwarf.app
appview bluesky reddwarf microcosm

threadgrapher basic concurrency for backlink fetching

Changed files
+205 -42
cmd
appview
shims
lex
app
bsky
feed
unspecced
getpostthreadv2
+34 -8
cmd/appview/main.go
··· 628 628 // V2V2 still doesnt work. should probably make the handler from scratch to fully use the thread grapher. 629 629 // also the thread grapher is still sequental. pls fix that 630 630 //appbskyunspeccedgetpostthreadv2.HandleGetPostThreadV2V2(c, sl, cs, BSKYIMAGECDN_URL) 631 - appbskyunspeccedgetpostthreadv2.HandleGetPostThreadV2V3(c, sl, cs, BSKYIMAGECDN_URL) 631 + 632 + var existingGraph *appbskyunspeccedgetpostthreadv2.ThreadGraph 633 + // var kvkey string 634 + // threadAnchorURIraw := c.Query("anchor") 635 + // if threadAnchorURIraw != "" { 636 + // threadAnchorURI, err := syntax.ParseATURI(threadAnchorURIraw) 637 + // if err == nil { 638 + // kvkey = "ThreadGraph" + threadAnchorURI.String() 639 + // val, ok := kv.Get(kvkey) 640 + // if ok { 641 + // parsed, err := appbskyunspeccedgetpostthreadv2.ThreadGraphFromBytes(val) 642 + // if err != nil { 643 + // existingGraph = parsed 644 + // } 645 + // } 646 + // } 647 + // } 648 + 649 + returnedGraph := appbskyunspeccedgetpostthreadv2.HandleGetPostThreadV2V3(c, sl, cs, BSKYIMAGECDN_URL, existingGraph) 650 + _ = returnedGraph 651 + // bytes, err := returnedGraph.ToBytes() 652 + // if err == nil && kvkey != "" { 653 + // kv.Set(kvkey, bytes, 1*time.Minute) 654 + // } 632 655 }) 633 656 634 657 // weird stuff ··· 663 686 } 664 687 }(clientUUID) 665 688 } 666 - c.String(http.StatusOK, ` 667 - ____ __________ ____ _ _____ ____ ______ _____ __________ _ ____________ 668 - / __ \/ ____/ __ \ / __ \ | / / | / __ \/ ____/ / ___// ____/ __ \ | / / ____/ __ \ 669 - / /_/ / __/ / / / / / / / / | /| / / /| | / /_/ / /_ \__ \/ __/ / /_/ / | / / __/ / /_/ / 670 - / _, _/ /___/ /_/ / / /_/ /| |/ |/ / ___ |/ _, _/ __/ ___/ / /___/ _, _/| |/ / /___/ _, _/ 671 - /_/ |_/_____/_____/ /_____/ |__/|__/_/ |_/_/ |_/_/ /____/_____/_/ |_| |___/_____/_/ |_| 672 - 689 + c.String(http.StatusOK, ` ____ __________ ____ _ _____ ____ ______ 690 + / __ \/ ____/ __ \ / __ \ | / / | / __ \/ ____/ 691 + / /_/ / __/ / / / / / / / / | /| / / /| | / /_/ / /_ 692 + / _, _/ /___/ /_/ / / /_/ /| |/ |/ / ___ |/ _, _/ __/ 693 + /_/ |_/_____/_____/ /_____/ |__/|__/_/ |_/_/ |_/_/ 694 + _____ __________ _ ____________ 695 + / ___// ____/ __ \ | / / ____/ __ \ 696 + \__ \/ __/ / /_/ / | / / __/ / /_/ / 697 + ___/ / /___/ _, _/| |/ / /___/ _, _/ 698 + /____/_____/_/ |_| |___/_____/_/ |_| 673 699 674 700 This is an AT Protocol Application View (AppView) for any application that supports app.bsky.* xrpc methods. 675 701
+20
shims/lex/app/bsky/feed/defs/postview.go
··· 76 76 subj, ok := like[".subject.uri"] 77 77 if ok { 78 78 likeCount = int64(subj.Records) 79 + } else { 80 + likeCount = int64(0) 79 81 } 82 + } else { 83 + likeCount = int64(0) 80 84 } 81 85 } 82 86 if links != nil && ··· 86 90 subj, ok := like[".subject.uri"] 87 91 if ok { 88 92 repostCount = int64(subj.Records) 93 + } else { 94 + repostCount = int64(0) 89 95 } 96 + } else { 97 + repostCount = int64(0) 90 98 } 91 99 } 92 100 if links != nil && ··· 96 104 subj, ok := like[".reply.parent.uri"] 97 105 if ok { 98 106 replyCount = int64(subj.Records) 107 + } else { 108 + replyCount = int64(0) 99 109 } 110 + } else { 111 + replyCount = int64(0) 100 112 } 101 113 } 102 114 if links != nil && ··· 106 118 subj, ok := like[".embed.record.uri"] 107 119 if ok { 108 120 quoteCount_noEmbed = int64(subj.Records) 121 + } else { 122 + quoteCount_noEmbed = int64(0) 109 123 } 124 + } else { 125 + quoteCount_noEmbed = int64(0) 110 126 } 111 127 } 112 128 if links != nil && ··· 116 132 subj, ok := like[".embed.record.record.uri"] 117 133 if ok { 118 134 quoteCount_withEmbed = int64(subj.Records) 135 + } else { 136 + quoteCount_withEmbed = int64(0) 119 137 } 138 + } else { 139 + quoteCount_withEmbed = int64(0) 120 140 } 121 141 } 122 142 quoteCount = quoteCount_noEmbed + quoteCount_withEmbed
+21 -9
shims/lex/app/bsky/unspecced/getpostthreadv2/query.go
··· 243 243 depth int 244 244 } 245 245 246 - func HandleGetPostThreadV2V3(c *gin.Context, sl *microcosm.MicrocosmClient, cs *microcosm.MicrocosmClient, imgcdn string) { 246 + func HandleGetPostThreadV2V3(c *gin.Context, sl *microcosm.MicrocosmClient, cs *microcosm.MicrocosmClient, imgcdn string, existingGraph *ThreadGraph) *ThreadGraph { 247 247 ctx := c.Request.Context() 248 248 249 249 rawdid := c.GetString("user_did") ··· 258 258 threadAnchorURIraw := c.Query("anchor") 259 259 if threadAnchorURIraw == "" { 260 260 c.JSON(http.StatusBadRequest, gin.H{"error": "Missing feed param"}) 261 - return 261 + return existingGraph 262 262 } 263 263 264 264 // "Whether to include parents above the anchor. ··· 314 314 315 315 threadAnchorURI, err := syntax.ParseATURI(threadAnchorURIraw) 316 316 if err != nil { 317 - return 317 + return existingGraph 318 318 } 319 319 320 - threadGraph, err := ThreadGrapher(ctx, cs, sl, threadAnchorURI) 321 - if err != nil { 322 - c.JSON(http.StatusBadRequest, gin.H{"error": ("failed to graph the thread: " + err.Error())}) 323 - return 320 + var workingGraph *ThreadGraph 321 + 322 + if existingGraph != nil { 323 + workingGraph = existingGraph 324 + // update the existing graph to fit our needs in our subtree 325 + workingGraph.UpdateGraphTo(threadAnchorURI) 326 + } else { 327 + newGraph, err := ThreadGrapher(ctx, cs, sl, threadAnchorURI) 328 + if err != nil { 329 + c.JSON(http.StatusBadRequest, gin.H{"error": ("failed to graph the thread: " + err.Error())}) 330 + return nil 331 + } 332 + workingGraph = newGraph 324 333 } 334 + 325 335 var skeletonposts []SkeletonPost 326 336 327 337 // Parent Chain ··· 337 347 // root = current 338 348 break 339 349 } 340 - parent, ok := threadGraph.ParentsMap[current] 350 + parent, ok := workingGraph.ParentsMap[current] 341 351 if !ok { 342 352 // root = current 343 353 break ··· 361 371 362 372 // Tree Replies (with OP thread priority) 363 373 // should probably be recursive 364 - recursiveHandleV2V3TreeReplies(threadGraph, &skeletonposts, threadAnchorURI, &below, &branchingFactor, &sort, 0, 0) 374 + recursiveHandleV2V3TreeReplies(workingGraph, &skeletonposts, threadAnchorURI, &below, &branchingFactor, &sort, 0, 0) 365 375 366 376 //maplen := len(parentsMap) 367 377 concurrentResults := MapConcurrent( ··· 448 458 HasOtherReplies: false, 449 459 } 450 460 c.JSON(http.StatusOK, resp) 461 + 462 + return workingGraph 451 463 } 452 464 453 465 func flipArray[T any](s *[]T) {
+130 -25
shims/lex/app/bsky/unspecced/getpostthreadv2/threadgrapher.go
··· 107 107 } 108 108 109 109 processingQueue := append([]syntax.ATURI{rootURI}, allRepliesATURI...) 110 - for _, aturi := range processingQueue { 111 - //tg.Nodes = append(tg.Nodes, aturi) 112 - // graphinger 113 - emptystrarray := &[]string{} 114 - var localRepliesATURI []syntax.ATURI 115 - limit := 100 116 - var cursor *string 117 - shouldContinue := true 118 - for shouldContinue { 119 - results, err := constellation.GetBacklinks(ctx, cs, aturi.String(), "app.bsky.feed.post:reply.parent.uri", *emptystrarray, &limit, cursor) 120 - if err != nil { 121 - log.Println("[ThreadGrapher] [parent graphing] exit by no replies") 122 - return nil, fmt.Errorf("failed to get backlinks: %w", err) 123 - } 124 - if results.Records != nil { 125 - for _, record := range results.Records { 126 - aturi, err := syntax.ParseATURI("at://" + record.Did + "/" + record.Collection + "/" + record.Rkey) 127 - if err == nil { 128 - localRepliesATURI = append(localRepliesATURI, aturi) 110 + // for _, aturi := range processingQueue { 111 + // //tg.Nodes = append(tg.Nodes, aturi) 112 + // // graphinger 113 + // emptystrarray := &[]string{} 114 + // var localRepliesATURI []syntax.ATURI 115 + // limit := 100 116 + // var cursor *string 117 + // shouldContinue := true 118 + // for shouldContinue { 119 + // results, err := constellation.GetBacklinks(ctx, cs, aturi.String(), "app.bsky.feed.post:reply.parent.uri", *emptystrarray, &limit, cursor) 120 + // if err != nil { 121 + // log.Println("[ThreadGrapher] [parent graphing] exit by no replies") 122 + // return nil, fmt.Errorf("failed to get backlinks: %w", err) 123 + // } 124 + // if results.Records != nil { 125 + // for _, record := range results.Records { 126 + // aturi, err := syntax.ParseATURI("at://" + record.Did + "/" + record.Collection + "/" + record.Rkey) 127 + // if err == nil { 128 + // localRepliesATURI = append(localRepliesATURI, aturi) 129 + // } 130 + // } 131 + // } 132 + // if results.Cursor != nil { 133 + // cursor = results.Cursor 134 + // } else { 135 + // shouldContinue = false 136 + // } 137 + // } 138 + // for _, reply := range localRepliesATURI { 139 + // tg.ParentsMap[reply] = aturi 140 + // tg.ChildrenMap[aturi] = append(tg.ChildrenMap[aturi], reply) 141 + // } 142 + // } 143 + 144 + type concurrentStruct struct { 145 + aturi syntax.ATURI 146 + replies []syntax.ATURI 147 + } 148 + 149 + localRepliesATURIConcurrent := MapConcurrent( 150 + ctx, 151 + processingQueue, 152 + 50, 153 + func(ctx context.Context, aturi syntax.ATURI, idx int) (*concurrentStruct, error) { 154 + //tg.Nodes = append(tg.Nodes, aturi) 155 + // graphinger 156 + emptystrarray := &[]string{} 157 + var localRepliesATURI []syntax.ATURI 158 + limit := 100 159 + var cursor *string 160 + shouldContinue := true 161 + for shouldContinue { 162 + results, err := constellation.GetBacklinks(ctx, cs, aturi.String(), "app.bsky.feed.post:reply.parent.uri", *emptystrarray, &limit, cursor) 163 + if err != nil { 164 + log.Println("[ThreadGrapher] [parent graphing] exit by no replies") 165 + return nil, fmt.Errorf("failed to get backlinks: %w", err) 166 + } 167 + if results.Records != nil { 168 + for _, record := range results.Records { 169 + aturi, err := syntax.ParseATURI("at://" + record.Did + "/" + record.Collection + "/" + record.Rkey) 170 + if err == nil { 171 + localRepliesATURI = append(localRepliesATURI, aturi) 172 + } 129 173 } 130 174 } 175 + if results.Cursor != nil { 176 + cursor = results.Cursor 177 + } else { 178 + shouldContinue = false 179 + } 131 180 } 132 - if results.Cursor != nil { 133 - cursor = results.Cursor 134 - } else { 135 - shouldContinue = false 136 - } 181 + return &concurrentStruct{ 182 + aturi: aturi, 183 + replies: localRepliesATURI, 184 + }, nil 185 + }, 186 + ) 187 + 188 + localRepliesATURI := make([]*concurrentStruct, 0, len(localRepliesATURIConcurrent)) 189 + for _, r := range localRepliesATURIConcurrent { 190 + if /*r != nil &&*/ r.Err == nil && r.Value != nil /*&& r.Value.aturi != nil*/ && r.Value.replies != nil { 191 + localRepliesATURI = append(localRepliesATURI, r.Value) 137 192 } 138 - for _, reply := range localRepliesATURI { 193 + } 194 + for _, replyStruct := range localRepliesATURI { 195 + aturi := replyStruct.aturi 196 + for _, reply := range replyStruct.replies { 139 197 tg.ParentsMap[reply] = aturi 140 198 tg.ChildrenMap[aturi] = append(tg.ChildrenMap[aturi], reply) 141 199 } ··· 143 201 144 202 return tg, nil 145 203 } 204 + 205 + // ToBytes serializes the ThreadGraph into a JSON byte slice. 206 + // It acquires a Read Lock to ensure thread safety during serialization. 207 + func (g *ThreadGraph) ToBytes() ([]byte, error) { 208 + g.mu.RLock() 209 + defer g.mu.RUnlock() 210 + 211 + // sync.RWMutex is unexported, so json.Marshal will automatically skip it. 212 + // syntax.ATURI implements TextMarshaler, so it works as a map key automatically. 213 + return json.Marshal(g) 214 + } 215 + 216 + // ThreadGraphFromBytes deserializes a byte slice back into a ThreadGraph. 217 + // It ensures maps are initialized even if the JSON data was empty. 218 + func ThreadGraphFromBytes(data []byte) (*ThreadGraph, error) { 219 + // Initialize with NewThreadGraph to ensure maps are allocated 220 + tg := NewThreadGraph() 221 + 222 + if err := json.Unmarshal(data, tg); err != nil { 223 + return nil, fmt.Errorf("failed to deserialize ThreadGraph: %w", err) 224 + } 225 + 226 + // Safety check: specific to Go's JSON unmarshal behavior. 227 + // If the JSON contained "null" for the maps, they might be nil again. 228 + // We re-initialize them to avoid panics during concurrent writes later. 229 + if tg.ParentsMap == nil { 230 + tg.ParentsMap = make(map[syntax.ATURI]syntax.ATURI) 231 + } 232 + if tg.ChildrenMap == nil { 233 + tg.ChildrenMap = make(map[syntax.ATURI][]syntax.ATURI) 234 + } 235 + 236 + // The Mutex (tg.mu) is zero-valued (unlocked) by default, which is exactly what we want. 237 + return tg, nil 238 + } 239 + 240 + func (g *ThreadGraph) UpdateGraphTo(anchor syntax.ATURI) { 241 + // path from anchor to root never needs to be updated 242 + // all we need is to update all subtrees of the anchor 243 + // so we should first do a 244 + // recursiveHandleUpdateGraphTo(g, anchor) 245 + // i dont think we should do a recursive thing 246 + // it cant be optimized well, constellation queries will be sequential 247 + // how about, grab the entire tree again, prune branches not part of the tree 248 + // you will get a list of posts that are either new or a part of the subtree 249 + // 250 + }