+20
-30
main.go
+20
-30
main.go
···
396
396
go func(i int, raw string) {
397
397
defer wg.Done()
398
398
399
-
post, _, _ := appbskyfeeddefs.PostView(ctx, raw, sl, BSKYIMAGECDN_URL)
399
+
post, _, _ := appbskyfeeddefs.PostView(ctx, raw, sl, cs, BSKYIMAGECDN_URL)
400
400
401
401
results[i].view = post
402
402
}(i, raw)
···
510
510
511
511
skeletonposts := feekskeleton.Feed
512
512
513
-
type result struct {
514
-
view *appbsky.FeedDefs_FeedViewPost
515
-
}
516
-
517
-
results := make([]result, len(skeletonposts))
518
-
519
-
var wg sync.WaitGroup
520
-
wg.Add(len(skeletonposts))
521
-
522
-
for i, raw := range skeletonposts {
523
-
go func(i int, raw appbsky.FeedDefs_SkeletonFeedPost) {
524
-
defer wg.Done()
525
-
526
-
post, _, err := appbskyfeeddefs.PostView(ctx, raw.Post, sl, BSKYIMAGECDN_URL)
527
-
if err != nil || post == nil {
528
-
return
513
+
concurrentResults := MapConcurrent(
514
+
ctx,
515
+
skeletonposts,
516
+
20,
517
+
func(ctx context.Context, raw *appbsky.FeedDefs_SkeletonFeedPost) (*appbsky.FeedDefs_FeedViewPost, error) {
518
+
post, _, err := appbskyfeeddefs.PostView(ctx, raw.Post, sl, cs, BSKYIMAGECDN_URL)
519
+
if err != nil {
520
+
return nil, err
521
+
}
522
+
if post == nil {
523
+
return nil, fmt.Errorf("post not found")
529
524
}
530
525
531
-
feedviewpost := &appbsky.FeedDefs_FeedViewPost{
526
+
return &appbsky.FeedDefs_FeedViewPost{
532
527
// FeedContext *string `json:"feedContext,omitempty" cborgen:"feedContext,omitempty"`
533
528
// Post *FeedDefs_PostView `json:"post" cborgen:"post"`
534
529
Post: post,
···
553
548
// Reply *FeedDefs_ReplyRef `json:"reply,omitempty" cborgen:"reply,omitempty"`
554
549
// // reqId: Unique identifier per request that may be passed back alongside interactions.
555
550
// ReqId *string `json:"reqId,omitempty" cborgen:"reqId,omitempty"`
556
-
557
-
}
558
-
559
-
results[i].view = feedviewpost
560
-
}(i, *raw)
561
-
}
562
-
563
-
wg.Wait()
551
+
}, nil
552
+
},
553
+
)
564
554
565
555
// build final slice
566
-
out := make([]*appbsky.FeedDefs_FeedViewPost, 0, len(results))
567
-
for _, r := range results {
568
-
if r.view != nil && r.view.Post != nil {
569
-
out = append(out, r.view)
556
+
out := make([]*appbsky.FeedDefs_FeedViewPost, 0, len(concurrentResults))
557
+
for _, r := range concurrentResults {
558
+
if r.Err == nil && r.Value != nil && r.Value.Post != nil {
559
+
out = append(out, r.Value)
570
560
}
571
561
}
572
562
+65
-6
shims/lex/app/bsky/feed/defs/postview.go
+65
-6
shims/lex/app/bsky/feed/defs/postview.go
···
8
8
appbsky "github.com/bluesky-social/indigo/api/bsky"
9
9
"github.com/bluesky-social/indigo/atproto/syntax"
10
10
"tangled.org/whey.party/red-dwarf-server/microcosm"
11
+
"tangled.org/whey.party/red-dwarf-server/microcosm/constellation"
11
12
12
13
//"tangled.org/whey.party/red-dwarf-server/microcosm/constellation"
13
14
//"tangled.org/whey.party/red-dwarf-server/microcosm/slingshot"
···
16
17
"tangled.org/whey.party/red-dwarf-server/shims/utils"
17
18
)
18
19
19
-
func PostView(ctx context.Context, postaturi string, sl *microcosm.MicrocosmClient, imgcdn string) (*appbsky.FeedDefs_PostView, *appbsky.FeedPost, error) {
20
+
func PostView(ctx context.Context, postaturi string, sl *microcosm.MicrocosmClient, cs *microcosm.MicrocosmClient, imgcdn string) (*appbsky.FeedDefs_PostView, *appbsky.FeedPost, error) {
20
21
aturi, err := syntax.ParseATURI(postaturi)
21
22
if err != nil {
22
23
return nil, nil, err
···
43
44
44
45
lexicontypedecoder := &util.LexiconTypeDecoder{Val: &postRecord}
45
46
46
-
fakecount := int64(-1)
47
+
links, err := constellation.LegacyLinksAll(ctx, cs, postRecordResponse.Uri)
48
+
var likeCount int64
49
+
if links != nil &&
50
+
links.Links != nil {
51
+
like, ok := links.Links["app.bsky.feed.like"]
52
+
if ok && like != nil {
53
+
subj, ok := like[".subject.uri"]
54
+
if ok {
55
+
likeCount = int64(subj.Records)
56
+
}
57
+
}
58
+
}
59
+
var repostCount int64
60
+
if links != nil &&
61
+
links.Links != nil {
62
+
like, ok := links.Links["app.bsky.repost.like"]
63
+
if ok && like != nil {
64
+
subj, ok := like[".subject.uri"]
65
+
if ok {
66
+
repostCount = int64(subj.Records)
67
+
}
68
+
}
69
+
}
70
+
var replyCount int64
71
+
if links != nil &&
72
+
links.Links != nil {
73
+
like, ok := links.Links["app.bsky.feed.post"]
74
+
if ok && like != nil {
75
+
subj, ok := like[".reply.parent.uri"]
76
+
if ok {
77
+
replyCount = int64(subj.Records)
78
+
}
79
+
}
80
+
}
81
+
var quoteCount_noEmbed int64
82
+
if links != nil &&
83
+
links.Links != nil {
84
+
like, ok := links.Links["app.bsky.feed.like"]
85
+
if ok && like != nil {
86
+
subj, ok := like[".embed.record.uri"]
87
+
if ok {
88
+
likeCount = int64(subj.Records)
89
+
}
90
+
}
91
+
}
92
+
var quoteCount_withEmbed int64
93
+
if links != nil &&
94
+
links.Links != nil {
95
+
like, ok := links.Links["app.bsky.feed.like"]
96
+
if ok && like != nil {
97
+
subj, ok := like[".embed.record.record.uri"]
98
+
if ok {
99
+
likeCount = int64(subj.Records)
100
+
}
101
+
}
102
+
}
103
+
quoteCount := quoteCount_noEmbed + quoteCount_withEmbed
104
+
105
+
//fakecount := int64(-1)
47
106
48
107
return &appbsky.FeedDefs_PostView{
49
108
// LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.feed.defs#postView"`
···
60
119
IndexedAt: postRecord.CreatedAt,
61
120
// Labels []*comatproto.LabelDefs_Label `json:"labels,omitempty" cborgen:"labels,omitempty"`
62
121
// LikeCount *int64 `json:"likeCount,omitempty" cborgen:"likeCount,omitempty"`
63
-
LikeCount: &fakecount,
122
+
LikeCount: &likeCount,
64
123
// QuoteCount *int64 `json:"quoteCount,omitempty" cborgen:"quoteCount,omitempty"`
65
-
QuoteCount: &fakecount,
124
+
QuoteCount: "eCount,
66
125
// Record *lexutil.LexiconTypeDecoder `json:"record" cborgen:"record"`
67
126
Record: lexicontypedecoder,
68
127
// ReplyCount *int64 `json:"replyCount,omitempty" cborgen:"replyCount,omitempty"`
69
-
ReplyCount: &fakecount,
128
+
ReplyCount: &replyCount,
70
129
// RepostCount *int64 `json:"repostCount,omitempty" cborgen:"repostCount,omitempty"`
71
-
RepostCount: &fakecount,
130
+
RepostCount: &repostCount,
72
131
// Threadgate *FeedDefs_ThreadgateView `json:"threadgate,omitempty" cborgen:"threadgate,omitempty"`
73
132
// Uri string `json:"uri" cborgen:"uri"`
74
133
Uri: postRecordResponse.Uri,
+63
utils.go
+63
utils.go
···
1
1
package main
2
2
3
3
import (
4
+
"context"
4
5
"encoding/json"
5
6
"fmt"
6
7
"net/http"
7
8
"strings"
9
+
"sync"
8
10
)
9
11
10
12
type DIDDocument struct {
···
47
49
48
50
return &doc, nil
49
51
}
52
+
53
+
type AsyncResult[T any] struct {
54
+
Value T
55
+
Err error
56
+
}
57
+
58
+
func MapConcurrent[T any, R any](
59
+
ctx context.Context,
60
+
items []T,
61
+
concurrencyLimit int,
62
+
mapper func(context.Context, T) (R, error),
63
+
) []AsyncResult[R] {
64
+
if len(items) == 0 {
65
+
return nil
66
+
}
67
+
68
+
results := make([]AsyncResult[R], len(items))
69
+
var wg sync.WaitGroup
70
+
71
+
sem := make(chan struct{}, concurrencyLimit)
72
+
73
+
for i, item := range items {
74
+
wg.Add(1)
75
+
go func(idx int, input T) {
76
+
defer wg.Done()
77
+
78
+
sem <- struct{}{}
79
+
defer func() { <-sem }()
80
+
81
+
if ctx.Err() != nil {
82
+
results[idx] = AsyncResult[R]{Err: ctx.Err()}
83
+
return
84
+
}
85
+
86
+
val, err := mapper(ctx, input)
87
+
results[idx] = AsyncResult[R]{Value: val, Err: err}
88
+
}(i, item)
89
+
}
90
+
91
+
wg.Wait()
92
+
return results
93
+
}
94
+
95
+
func RunConcurrent(tasks ...func() error) []error {
96
+
var wg sync.WaitGroup
97
+
errs := make([]error, len(tasks))
98
+
99
+
wg.Add(len(tasks))
100
+
101
+
for i, task := range tasks {
102
+
go func(i int, t func() error) {
103
+
defer wg.Done()
104
+
if err := t(); err != nil {
105
+
errs[i] = err
106
+
}
107
+
}(i, task)
108
+
}
109
+
110
+
wg.Wait()
111
+
return errs
112
+
}