+149
-3
cmd/appview/main.go
+149
-3
cmd/appview/main.go
···
16
16
17
17
did "github.com/whyrusleeping/go-did"
18
18
"tangled.org/whey.party/red-dwarf-server/auth"
19
+
aturilist "tangled.org/whey.party/red-dwarf-server/cmd/aturilist/client"
19
20
"tangled.org/whey.party/red-dwarf-server/microcosm/constellation"
20
21
"tangled.org/whey.party/red-dwarf-server/microcosm/slingshot"
21
22
appbskyactordefs "tangled.org/whey.party/red-dwarf-server/shims/lex/app/bsky/actor/defs"
···
44
45
SPACEDUST_URL string
45
46
SLINGSHOT_URL string
46
47
CONSTELLATION_URL string
48
+
ATURILIST_URL string
47
49
)
48
50
49
51
func initURLs(prod bool) {
···
52
54
SPACEDUST_URL = "wss://spacedust.whey.party/subscribe"
53
55
SLINGSHOT_URL = "https://slingshot.whey.party"
54
56
CONSTELLATION_URL = "https://constellation.whey.party"
57
+
ATURILIST_URL = "http://localhost:7155"
55
58
} else {
56
59
JETSTREAM_URL = "ws://localhost:6008/subscribe"
57
60
SPACEDUST_URL = "ws://localhost:9998/subscribe"
58
61
SLINGSHOT_URL = "http://localhost:7729"
59
62
CONSTELLATION_URL = "http://localhost:7728"
63
+
ATURILIST_URL = "http://localhost:7155"
60
64
}
61
65
}
62
66
···
68
72
)
69
73
70
74
func main() {
71
-
log.Println("red-dwarf-server started")
75
+
log.Println("red-dwarf-server AppView Service started")
72
76
prod := flag.Bool("prod", false, "use production URLs instead of localhost")
73
77
flag.Parse()
74
78
···
78
82
mailbox := sticket.New()
79
83
sl := slingshot.NewSlingshot(SLINGSHOT_URL)
80
84
cs := constellation.NewConstellation(CONSTELLATION_URL)
85
+
al := aturilist.NewClient(ATURILIST_URL)
81
86
// spacedust is type definitions only
82
87
// jetstream types is probably available from jetstream/pkg/models
83
88
···
86
91
router_raw := gin.New()
87
92
router_raw.Use(gin.Logger())
88
93
router_raw.Use(gin.Recovery())
89
-
router_raw.Use(cors.Default())
94
+
//router_raw.Use(cors.Default())
95
+
router_raw.Use(cors.New(cors.Config{
96
+
AllowAllOrigins: true,
97
+
AllowMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"},
98
+
// You must explicitly allow the custom ATProto headers here
99
+
AllowHeaders: []string{
100
+
"Origin",
101
+
"Content-Length",
102
+
"Content-Type",
103
+
"Authorization",
104
+
"Accept",
105
+
"Accept-Language",
106
+
"atproto-accept-labelers", // <--- The specific fix for your error
107
+
"atproto-proxy", // Good to have for future compatibility
108
+
},
109
+
ExposeHeaders: []string{"Content-Length", "Link"},
110
+
AllowCredentials: true,
111
+
MaxAge: 12 * time.Hour,
112
+
}))
90
113
91
114
router_raw.GET("/.well-known/did.json", GetWellKnownDID)
92
115
···
628
651
// V2V2 still doesnt work. should probably make the handler from scratch to fully use the thread grapher.
629
652
// also the thread grapher is still sequental. pls fix that
630
653
//appbskyunspeccedgetpostthreadv2.HandleGetPostThreadV2V2(c, sl, cs, BSKYIMAGECDN_URL)
631
-
appbskyunspeccedgetpostthreadv2.HandleGetPostThreadV2V3(c, sl, cs, BSKYIMAGECDN_URL)
654
+
655
+
var existingGraph *appbskyunspeccedgetpostthreadv2.ThreadGraph
656
+
// var kvkey string
657
+
// threadAnchorURIraw := c.Query("anchor")
658
+
// if threadAnchorURIraw != "" {
659
+
// threadAnchorURI, err := syntax.ParseATURI(threadAnchorURIraw)
660
+
// if err == nil {
661
+
// kvkey = "ThreadGraph" + threadAnchorURI.String()
662
+
// val, ok := kv.Get(kvkey)
663
+
// if ok {
664
+
// parsed, err := appbskyunspeccedgetpostthreadv2.ThreadGraphFromBytes(val)
665
+
// if err != nil {
666
+
// existingGraph = parsed
667
+
// }
668
+
// }
669
+
// }
670
+
// }
671
+
672
+
returnedGraph := appbskyunspeccedgetpostthreadv2.HandleGetPostThreadV2V3(c, sl, cs, BSKYIMAGECDN_URL, existingGraph)
673
+
_ = returnedGraph
674
+
// bytes, err := returnedGraph.ToBytes()
675
+
// if err == nil && kvkey != "" {
676
+
// kv.Set(kvkey, bytes, 1*time.Minute)
677
+
// }
678
+
})
679
+
680
+
router.GET("/xrpc/app.bsky.feed.getAuthorFeed",
681
+
func(c *gin.Context) {
682
+
683
+
rawdid := c.GetString("user_did")
684
+
log.Println("getFeed router_unsafe user_did: " + rawdid)
685
+
var viewer *utils.DID
686
+
didval, errdid := utils.NewDID(rawdid)
687
+
if errdid != nil {
688
+
viewer = nil
689
+
} else {
690
+
viewer = &didval
691
+
}
692
+
693
+
actorDidParam := c.Query("actor")
694
+
if actorDidParam == "" {
695
+
c.JSON(http.StatusBadRequest, gin.H{"error": "Missing actor param"})
696
+
return
697
+
}
698
+
cursorRawParam := c.Query("cursor")
699
+
700
+
listResp, err := al.ListRecords(ctx, actorDidParam, "app.bsky.feed.post", cursorRawParam, true)
701
+
if err != nil {
702
+
log.Fatalf("Failed to list: %v", err)
703
+
}
704
+
705
+
concurrentResults := MapConcurrent(
706
+
ctx,
707
+
listResp.Aturis,
708
+
20,
709
+
func(ctx context.Context, raw string, idx int) (*appbsky.FeedDefs_FeedViewPost, error) {
710
+
post, _, err := appbskyfeeddefs.PostView(ctx, raw, sl, cs, BSKYIMAGECDN_URL, viewer, 2)
711
+
if err != nil {
712
+
return nil, err
713
+
}
714
+
if post == nil {
715
+
return nil, fmt.Errorf("post not found")
716
+
}
717
+
718
+
return &appbsky.FeedDefs_FeedViewPost{
719
+
// FeedContext *string `json:"feedContext,omitempty" cborgen:"feedContext,omitempty"`
720
+
// Post *FeedDefs_PostView `json:"post" cborgen:"post"`
721
+
Post: post,
722
+
// Reason *FeedDefs_FeedViewPost_Reason `json:"reason,omitempty" cborgen:"reason,omitempty"`
723
+
// Reason: &appbsky.FeedDefs_FeedViewPost_Reason{
724
+
// // FeedDefs_ReasonRepost *FeedDefs_ReasonRepost
725
+
// FeedDefs_ReasonRepost: &appbsky.FeedDefs_ReasonRepost{
726
+
// // LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.feed.defs#reasonRepost"`
727
+
// LexiconTypeID: "app.bsky.feed.defs#reasonRepost",
728
+
// // By *ActorDefs_ProfileViewBasic `json:"by" cborgen:"by"`
729
+
// // Cid *string `json:"cid,omitempty" cborgen:"cid,omitempty"`
730
+
// // IndexedAt string `json:"indexedAt" cborgen:"indexedAt"`
731
+
// // Uri *string `json:"uri,omitempty" cborgen:"uri,omitempty"`
732
+
// Uri: &raw.Reason.FeedDefs_SkeletonReasonRepost.Repost,
733
+
// },
734
+
// // FeedDefs_ReasonPin *FeedDefs_ReasonPin
735
+
// FeedDefs_ReasonPin: &appbsky.FeedDefs_ReasonPin{
736
+
// // LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.feed.defs#reasonPin"`
737
+
// LexiconTypeID: "app.bsky.feed.defs#reasonPin",
738
+
// },
739
+
// },
740
+
// Reply *FeedDefs_ReplyRef `json:"reply,omitempty" cborgen:"reply,omitempty"`
741
+
// // reqId: Unique identifier per request that may be passed back alongside interactions.
742
+
// ReqId *string `json:"reqId,omitempty" cborgen:"reqId,omitempty"`
743
+
}, nil
744
+
},
745
+
)
746
+
747
+
// build final slice
748
+
out := make([]*appbsky.FeedDefs_FeedViewPost, 0, len(concurrentResults))
749
+
for _, r := range concurrentResults {
750
+
if r.Err == nil && r.Value != nil && r.Value.Post != nil {
751
+
out = append(out, r.Value)
752
+
}
753
+
}
754
+
755
+
c.JSON(http.StatusOK, &appbsky.FeedGetAuthorFeed_Output{
756
+
Cursor: &listResp.Cursor,
757
+
Feed: out,
758
+
})
632
759
})
633
760
634
761
// weird stuff
···
663
790
}
664
791
}(clientUUID)
665
792
}
793
+
c.String(http.StatusOK, ` ____ __________ ____ _ _____ ____ ______
794
+
/ __ \/ ____/ __ \ / __ \ | / / | / __ \/ ____/
795
+
/ /_/ / __/ / / / / / / / / | /| / / /| | / /_/ / /_
796
+
/ _, _/ /___/ /_/ / / /_/ /| |/ |/ / ___ |/ _, _/ __/
797
+
/_/ |_/_____/_____/ /_____/ |__/|__/_/ |_/_/ |_/_/
798
+
_____ __________ _ ____________
799
+
/ ___// ____/ __ \ | / / ____/ __ \
800
+
\__ \/ __/ / /_/ / | / / __/ / /_/ /
801
+
___/ / /___/ _, _/| |/ / /___/ _, _/
802
+
/____/_____/_/ |_| |___/_____/_/ |_|
803
+
804
+
This is an AT Protocol Application View (AppView) for any application that supports app.bsky.* xrpc methods.
805
+
806
+
Most API routes are under /xrpc/
807
+
808
+
Code: https://tangled.org/whey.party/red-dwarf-server
809
+
Protocol: https://atproto.com
810
+
Try it on: https://new.reddwarf.app
811
+
`)
666
812
})
667
813
router_raw.Run(":7152")
668
814
}
+230
cmd/aturilist/client/client.go
+230
cmd/aturilist/client/client.go
···
1
+
package aturilist
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"encoding/json"
7
+
"fmt"
8
+
"io"
9
+
"net/http"
10
+
"net/url"
11
+
"time"
12
+
)
13
+
14
+
// Constants for the XRPC methods
15
+
const (
16
+
MethodListRecords = "app.reddwarf.aturilist.listRecords"
17
+
MethodCountRecords = "app.reddwarf.aturilist.countRecords"
18
+
MethodIndexRecord = "app.reddwarf.aturilist.indexRecord"
19
+
MethodValidateRecord = "app.reddwarf.aturilist.validateRecord"
20
+
MethodQueryCollectionRkey = "app.reddwarf.aturilist.queryCollectionRkey"
21
+
DefaultProductionHost = "https://aturilist.reddwarf.app"
22
+
)
23
+
24
+
// Client is the API client for the Red Dwarf AtURI List Service.
25
+
type Client struct {
26
+
Host string
27
+
HTTPClient *http.Client
28
+
// AuthToken is the JWT used for the Authorization header
29
+
AuthToken string
30
+
}
31
+
32
+
// NewClient creates a new client. If host is empty, it defaults to production.
33
+
func NewClient(host string) *Client {
34
+
if host == "" {
35
+
host = DefaultProductionHost
36
+
}
37
+
return &Client{
38
+
Host: host,
39
+
HTTPClient: &http.Client{
40
+
Timeout: 10 * time.Second,
41
+
},
42
+
}
43
+
}
44
+
45
+
// --- Response Models ---
46
+
47
+
type ListRecordsResponse struct {
48
+
Aturis []string `json:"aturis"`
49
+
Count int `json:"count"`
50
+
Cursor string `json:"cursor,omitempty"`
51
+
}
52
+
53
+
type CountRecordsResponse struct {
54
+
Repo string `json:"repo"`
55
+
Collection string `json:"collection"`
56
+
Count int `json:"count"`
57
+
}
58
+
59
+
type QueryCollectionRkeyResponse struct {
60
+
Collection string `json:"collection"`
61
+
RKey string `json:"rkey"`
62
+
DIDs []string `json:"dids"`
63
+
Count int `json:"count"`
64
+
}
65
+
66
+
type ErrorResponse struct {
67
+
Error string `json:"error"`
68
+
}
69
+
70
+
// --- Request Models ---
71
+
72
+
type RecordRequest struct {
73
+
Repo string `json:"repo"`
74
+
Collection string `json:"collection"`
75
+
RKey string `json:"rkey"`
76
+
}
77
+
78
+
// --- Methods ---
79
+
80
+
// ListRecords retrieves a list of AT URIs.
81
+
// Set reverse=true to get newest records first.
82
+
func (c *Client) ListRecords(ctx context.Context, repo, collection, cursor string, reverse bool) (*ListRecordsResponse, error) {
83
+
params := url.Values{}
84
+
params.Set("repo", repo)
85
+
params.Set("collection", collection)
86
+
87
+
if cursor != "" {
88
+
params.Set("cursor", cursor)
89
+
}
90
+
91
+
if reverse {
92
+
params.Set("reverse", "true")
93
+
}
94
+
95
+
var resp ListRecordsResponse
96
+
if err := c.doRequest(ctx, http.MethodGet, MethodListRecords, params, nil, &resp); err != nil {
97
+
return nil, err
98
+
}
99
+
100
+
return &resp, nil
101
+
}
102
+
103
+
// CountRecords returns the total number of records indexed for a collection.
104
+
func (c *Client) CountRecords(ctx context.Context, repo, collection string) (*CountRecordsResponse, error) {
105
+
params := url.Values{}
106
+
params.Set("repo", repo)
107
+
params.Set("collection", collection)
108
+
109
+
var resp CountRecordsResponse
110
+
if err := c.doRequest(ctx, http.MethodGet, MethodCountRecords, params, nil, &resp); err != nil {
111
+
return nil, err
112
+
}
113
+
114
+
return &resp, nil
115
+
}
116
+
117
+
// IndexRecord triggers a manual index of a specific record.
118
+
// This endpoint is rate-limited on the server.
119
+
func (c *Client) IndexRecord(ctx context.Context, repo, collection, rkey string) error {
120
+
reqBody := RecordRequest{
121
+
Repo: repo,
122
+
Collection: collection,
123
+
RKey: rkey,
124
+
}
125
+
126
+
// Server returns 200 OK on success, body is empty or status only.
127
+
return c.doRequest(ctx, http.MethodPost, MethodIndexRecord, nil, reqBody, nil)
128
+
}
129
+
130
+
// ValidateRecord checks if a specific record exists in the local DB.
131
+
// Returns true if exists, false if 404, error otherwise.
132
+
func (c *Client) ValidateRecord(ctx context.Context, repo, collection, rkey string) (bool, error) {
133
+
reqBody := RecordRequest{
134
+
Repo: repo,
135
+
Collection: collection,
136
+
RKey: rkey,
137
+
}
138
+
139
+
err := c.doRequest(ctx, http.MethodPost, MethodValidateRecord, nil, reqBody, nil)
140
+
if err != nil {
141
+
// Parse standard error to see if it was a 404
142
+
if clientErr, ok := err.(*ClientError); ok && clientErr.StatusCode == 404 {
143
+
return false, nil
144
+
}
145
+
return false, err
146
+
}
147
+
148
+
return true, nil
149
+
}
150
+
151
+
// QueryCollectionRkey returns a list of DIDs that have a specific collection and rkey pair.
152
+
func (c *Client) QueryCollectionRkey(ctx context.Context, collection, rkey string) (*QueryCollectionRkeyResponse, error) {
153
+
params := url.Values{}
154
+
params.Set("collection", collection)
155
+
params.Set("rkey", rkey)
156
+
157
+
var resp QueryCollectionRkeyResponse
158
+
if err := c.doRequest(ctx, http.MethodGet, MethodQueryCollectionRkey, params, nil, &resp); err != nil {
159
+
return nil, err
160
+
}
161
+
162
+
return &resp, nil
163
+
}
164
+
165
+
// --- Internal Helpers ---
166
+
167
+
type ClientError struct {
168
+
StatusCode int
169
+
Message string
170
+
}
171
+
172
+
func (e *ClientError) Error() string {
173
+
return fmt.Sprintf("api error (status %d): %s", e.StatusCode, e.Message)
174
+
}
175
+
176
+
func (c *Client) doRequest(ctx context.Context, method, xrpcMethod string, params url.Values, body interface{}, dest interface{}) error {
177
+
u, err := url.Parse(fmt.Sprintf("%s/xrpc/%s", c.Host, xrpcMethod))
178
+
if err != nil {
179
+
return fmt.Errorf("invalid url: %w", err)
180
+
}
181
+
182
+
if len(params) > 0 {
183
+
u.RawQuery = params.Encode()
184
+
}
185
+
186
+
var bodyReader io.Reader
187
+
if body != nil {
188
+
jsonBytes, err := json.Marshal(body)
189
+
if err != nil {
190
+
return fmt.Errorf("failed to marshal body: %w", err)
191
+
}
192
+
bodyReader = bytes.NewBuffer(jsonBytes)
193
+
}
194
+
195
+
req, err := http.NewRequestWithContext(ctx, method, u.String(), bodyReader)
196
+
if err != nil {
197
+
return fmt.Errorf("failed to create request: %w", err)
198
+
}
199
+
200
+
req.Header.Set("Content-Type", "application/json")
201
+
if c.AuthToken != "" {
202
+
req.Header.Set("Authorization", "Bearer "+c.AuthToken)
203
+
}
204
+
205
+
resp, err := c.HTTPClient.Do(req)
206
+
if err != nil {
207
+
return fmt.Errorf("request failed: %w", err)
208
+
}
209
+
defer resp.Body.Close()
210
+
211
+
// Handle non-200 responses
212
+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
213
+
var errResp ErrorResponse
214
+
// Try to decode server error message
215
+
if decodeErr := json.NewDecoder(resp.Body).Decode(&errResp); decodeErr == nil && errResp.Error != "" {
216
+
return &ClientError{StatusCode: resp.StatusCode, Message: errResp.Error}
217
+
}
218
+
// Fallback if JSON decode fails or empty
219
+
return &ClientError{StatusCode: resp.StatusCode, Message: resp.Status}
220
+
}
221
+
222
+
// Decode response if destination provided
223
+
if dest != nil {
224
+
if err := json.NewDecoder(resp.Body).Decode(dest); err != nil {
225
+
return fmt.Errorf("failed to decode response: %w", err)
226
+
}
227
+
}
228
+
229
+
return nil
230
+
}
+498
cmd/aturilist/main.go
+498
cmd/aturilist/main.go
···
1
+
package main
2
+
3
+
import (
4
+
"context"
5
+
"encoding/json"
6
+
"errors"
7
+
"flag"
8
+
"fmt"
9
+
"log"
10
+
"log/slog"
11
+
"os"
12
+
"strings"
13
+
"sync"
14
+
"time"
15
+
16
+
"github.com/bluesky-social/indigo/api/agnostic"
17
+
"github.com/bluesky-social/indigo/atproto/syntax"
18
+
"github.com/bluesky-social/jetstream/pkg/client"
19
+
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
20
+
"github.com/bluesky-social/jetstream/pkg/models"
21
+
"github.com/dgraph-io/badger/v4"
22
+
"github.com/gin-gonic/gin"
23
+
24
+
"tangled.org/whey.party/red-dwarf-server/auth"
25
+
"tangled.org/whey.party/red-dwarf-server/microcosm"
26
+
"tangled.org/whey.party/red-dwarf-server/microcosm/slingshot"
27
+
)
28
+
29
+
type Server struct {
30
+
db *badger.DB
31
+
logger *slog.Logger
32
+
33
+
backfillTracker map[string]*sync.WaitGroup
34
+
backfillMutex sync.Mutex
35
+
}
36
+
37
+
var (
38
+
JETSTREAM_URL string
39
+
SPACEDUST_URL string
40
+
SLINGSHOT_URL string
41
+
CONSTELLATION_URL string
42
+
)
43
+
44
+
func initURLs(prod bool) {
45
+
if !prod {
46
+
JETSTREAM_URL = "wss://jetstream.whey.party/subscribe"
47
+
SPACEDUST_URL = "wss://spacedust.whey.party/subscribe"
48
+
SLINGSHOT_URL = "https://slingshot.whey.party"
49
+
CONSTELLATION_URL = "https://constellation.whey.party"
50
+
} else {
51
+
JETSTREAM_URL = "ws://localhost:6008/subscribe"
52
+
SPACEDUST_URL = "ws://localhost:9998/subscribe"
53
+
SLINGSHOT_URL = "http://localhost:7729"
54
+
CONSTELLATION_URL = "http://localhost:7728"
55
+
}
56
+
}
57
+
58
+
const (
59
+
BSKYIMAGECDN_URL = "https://cdn.bsky.app"
60
+
BSKYVIDEOCDN_URL = "https://video.bsky.app"
61
+
serviceWebDID = "did:web:aturilist.reddwarf.app"
62
+
serviceWebHost = "https://aturilist.reddwarf.app"
63
+
)
64
+
65
+
func main() {
66
+
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
67
+
log.Println("red-dwarf-server AtURI List Service started")
68
+
69
+
prod := flag.Bool("prod", false, "use production URLs instead of localhost")
70
+
dbPath := flag.String("db", "./badger_data", "path to badger db")
71
+
flag.Parse()
72
+
73
+
initURLs(*prod)
74
+
75
+
db, err := badger.Open(badger.DefaultOptions(*dbPath))
76
+
if err != nil {
77
+
logger.Error("Failed to open BadgerDB", "error", err)
78
+
os.Exit(1)
79
+
}
80
+
defer db.Close()
81
+
82
+
srv := &Server{
83
+
db: db,
84
+
logger: logger,
85
+
}
86
+
87
+
auther, err := auth.NewAuth(
88
+
100_000,
89
+
time.Hour*12,
90
+
5,
91
+
serviceWebDID,
92
+
)
93
+
if err != nil {
94
+
log.Fatalf("Failed to create Auth: %v", err)
95
+
}
96
+
97
+
ctx := context.Background()
98
+
sl := slingshot.NewSlingshot(SLINGSHOT_URL)
99
+
100
+
config := client.DefaultClientConfig()
101
+
config.WebsocketURL = JETSTREAM_URL
102
+
config.Compress = true
103
+
104
+
handler := &JetstreamHandler{srv: srv}
105
+
scheduler := sequential.NewScheduler("my_app", logger, handler.HandleEvent)
106
+
107
+
c, err := client.NewClient(config, logger, scheduler)
108
+
if err != nil {
109
+
logger.Error("failed to create client", "error", err)
110
+
return
111
+
}
112
+
113
+
cursor := time.Now().Add(-5 * time.Minute).UnixMicro()
114
+
115
+
go func() {
116
+
logger.Info("Connecting to Jetstream...")
117
+
for {
118
+
if err := c.ConnectAndRead(ctx, &cursor); err != nil {
119
+
logger.Error("jetstream connection disconnected", "error", err)
120
+
}
121
+
122
+
select {
123
+
case <-ctx.Done():
124
+
return
125
+
default:
126
+
logger.Info("Reconnecting to Jetstream in 5 seconds...", "cursor", cursor)
127
+
time.Sleep(5 * time.Second)
128
+
}
129
+
}
130
+
}()
131
+
132
+
router := gin.New()
133
+
router.Use(auther.AuthenticateGinRequestViaJWT)
134
+
135
+
router.GET("/xrpc/app.reddwarf.aturilist.listRecords", srv.handleListRecords)
136
+
137
+
router.GET("/xrpc/app.reddwarf.aturilist.countRecords", srv.handleCountRecords)
138
+
139
+
router.POST("/xrpc/app.reddwarf.aturilist.indexRecord", func(c *gin.Context) {
140
+
srv.handleIndexRecord(c, sl)
141
+
})
142
+
143
+
router.POST("/xrpc/app.reddwarf.aturilist.validateRecord", srv.handleValidateRecord)
144
+
145
+
router.GET("/xrpc/app.reddwarf.aturilist.queryCollectionRkey", srv.handleQueryCollectionRkey)
146
+
147
+
// router.GET("/xrpc/app.reddwarf.aturilist.requestBackfill", )
148
+
149
+
router.Run(":7155")
150
+
}
151
+
152
+
type JetstreamHandler struct {
153
+
srv *Server
154
+
}
155
+
156
+
func (h *JetstreamHandler) HandleEvent(ctx context.Context, event *models.Event) error {
157
+
if event != nil {
158
+
if event.Commit != nil {
159
+
isDelete := event.Commit.Operation == models.CommitOperationDelete
160
+
161
+
h.srv.processRecord(event.Did, event.Commit.Collection, event.Commit.RKey, isDelete)
162
+
163
+
}
164
+
}
165
+
return nil
166
+
}
167
+
168
+
func makeKey(repo, collection, rkey string) []byte {
169
+
return []byte(fmt.Sprintf("%s|%s|%s", repo, collection, rkey))
170
+
}
171
+
172
+
func parseKey(key []byte) (repo, collection, rkey string, err error) {
173
+
parts := strings.Split(string(key), "|")
174
+
if len(parts) != 3 {
175
+
return "", "", "", errors.New("invalid key format")
176
+
}
177
+
return parts[0], parts[1], parts[2], nil
178
+
}
179
+
180
+
func makeCollectionRkeyKey(collection, rkey string) []byte {
181
+
return []byte(fmt.Sprintf("cr|%s|%s|", collection, rkey))
182
+
}
183
+
184
+
func parseCollectionRkeyKey(key []byte) (collection, rkey string, err error) {
185
+
parts := strings.Split(string(key), "|")
186
+
if len(parts) < 3 || parts[0] != "cr" {
187
+
return "", "", errors.New("invalid collection+rkey key format")
188
+
}
189
+
return parts[1], parts[2], nil
190
+
}
191
+
192
+
func (s *Server) processRecord(repo, collection, rkey string, isDelete bool) {
193
+
key := makeKey(repo, collection, rkey)
194
+
crKey := makeCollectionRkeyKey(collection, rkey)
195
+
196
+
err := s.db.Update(func(txn *badger.Txn) error {
197
+
if isDelete {
198
+
if err := txn.Delete(key); err != nil {
199
+
return err
200
+
}
201
+
return s.removeDidFromCollectionRkeyIndex(txn, crKey, repo)
202
+
}
203
+
if err := txn.Set(key, []byte(time.Now().Format(time.RFC3339))); err != nil {
204
+
return err
205
+
}
206
+
return s.addDidToCollectionRkeyIndex(txn, crKey, repo)
207
+
})
208
+
209
+
if err != nil {
210
+
s.logger.Error("Failed to update DB", "repo", repo, "rkey", rkey, "err", err)
211
+
}
212
+
}
213
+
214
+
func (s *Server) addDidToCollectionRkeyIndex(txn *badger.Txn, crKey []byte, did string) error {
215
+
item, err := txn.Get(crKey)
216
+
if err == badger.ErrKeyNotFound {
217
+
var dids []string
218
+
dids = append(dids, did)
219
+
didsJSON, _ := json.Marshal(dids)
220
+
return txn.Set(crKey, didsJSON)
221
+
} else if err != nil {
222
+
return err
223
+
}
224
+
225
+
var dids []string
226
+
err = item.Value(func(val []byte) error {
227
+
return json.Unmarshal(val, &dids)
228
+
})
229
+
if err != nil {
230
+
return err
231
+
}
232
+
233
+
for _, existingDid := range dids {
234
+
if existingDid == did {
235
+
return nil
236
+
}
237
+
}
238
+
239
+
dids = append(dids, did)
240
+
didsJSON, _ := json.Marshal(dids)
241
+
return txn.Set(crKey, didsJSON)
242
+
}
243
+
244
+
func (s *Server) removeDidFromCollectionRkeyIndex(txn *badger.Txn, crKey []byte, did string) error {
245
+
item, err := txn.Get(crKey)
246
+
if err == badger.ErrKeyNotFound {
247
+
return nil
248
+
} else if err != nil {
249
+
return err
250
+
}
251
+
252
+
var dids []string
253
+
err = item.Value(func(val []byte) error {
254
+
return json.Unmarshal(val, &dids)
255
+
})
256
+
if err != nil {
257
+
return err
258
+
}
259
+
260
+
var newDids []string
261
+
for _, existingDid := range dids {
262
+
if existingDid != did {
263
+
newDids = append(newDids, existingDid)
264
+
}
265
+
}
266
+
267
+
if len(newDids) == 0 {
268
+
return txn.Delete(crKey)
269
+
}
270
+
271
+
didsJSON, _ := json.Marshal(newDids)
272
+
return txn.Set(crKey, didsJSON)
273
+
}
274
+
275
+
func (s *Server) handleListRecords(c *gin.Context) {
276
+
repo := c.Query("repo")
277
+
collection := c.Query("collection")
278
+
cursor := c.Query("cursor")
279
+
reverse := c.Query("reverse") == "true"
280
+
limit := 50
281
+
282
+
if repo == "" || collection == "" {
283
+
c.JSON(400, gin.H{"error": "repo and collection required"})
284
+
return
285
+
}
286
+
287
+
prefixStr := fmt.Sprintf("%s|%s|", repo, collection)
288
+
prefix := []byte(prefixStr)
289
+
290
+
var aturis []string
291
+
var lastRkey string
292
+
293
+
err := s.db.View(func(txn *badger.Txn) error {
294
+
opts := badger.DefaultIteratorOptions
295
+
opts.PrefetchValues = false
296
+
opts.Reverse = reverse
297
+
298
+
it := txn.NewIterator(opts)
299
+
defer it.Close()
300
+
301
+
var startKey []byte
302
+
if cursor != "" {
303
+
startKey = makeKey(repo, collection, cursor)
304
+
} else {
305
+
if reverse {
306
+
startKey = append([]byte(prefixStr), 0xFF)
307
+
} else {
308
+
startKey = prefix
309
+
}
310
+
}
311
+
312
+
it.Seek(startKey)
313
+
314
+
if cursor != "" && it.Valid() {
315
+
if string(it.Item().Key()) == string(startKey) {
316
+
it.Next()
317
+
}
318
+
}
319
+
320
+
for ; it.ValidForPrefix(prefix); it.Next() {
321
+
if len(aturis) >= limit {
322
+
break
323
+
}
324
+
item := it.Item()
325
+
k := item.Key()
326
+
_, _, rkey, err := parseKey(k)
327
+
if err == nil {
328
+
aturis = append(aturis, fmt.Sprintf("at://%s/%s/%s", repo, collection, rkey))
329
+
lastRkey = rkey
330
+
}
331
+
}
332
+
return nil
333
+
})
334
+
335
+
if err != nil {
336
+
c.JSON(500, gin.H{"error": err.Error()})
337
+
return
338
+
}
339
+
340
+
resp := gin.H{
341
+
"aturis": aturis,
342
+
"count": len(aturis),
343
+
}
344
+
345
+
if lastRkey != "" && len(aturis) == limit {
346
+
resp["cursor"] = lastRkey
347
+
}
348
+
349
+
c.JSON(200, resp)
350
+
}
351
+
352
+
func (s *Server) handleCountRecords(c *gin.Context) {
353
+
repo := c.Query("repo")
354
+
collection := c.Query("collection")
355
+
356
+
if repo == "" || collection == "" {
357
+
c.JSON(400, gin.H{"error": "repo and collection required"})
358
+
return
359
+
}
360
+
361
+
prefix := []byte(fmt.Sprintf("%s|%s|", repo, collection))
362
+
count := 0
363
+
364
+
err := s.db.View(func(txn *badger.Txn) error {
365
+
opts := badger.DefaultIteratorOptions
366
+
opts.PrefetchValues = false
367
+
it := txn.NewIterator(opts)
368
+
defer it.Close()
369
+
370
+
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
371
+
count++
372
+
}
373
+
return nil
374
+
})
375
+
376
+
if err != nil {
377
+
c.JSON(500, gin.H{"error": err.Error()})
378
+
return
379
+
}
380
+
381
+
c.JSON(200, gin.H{
382
+
"repo": repo,
383
+
"collection": collection,
384
+
"count": count,
385
+
})
386
+
}
387
+
388
+
func (s *Server) handleIndexRecord(c *gin.Context, sl *microcosm.MicrocosmClient) {
389
+
var req struct {
390
+
Collection string `json:"collection"`
391
+
Repo string `json:"repo"`
392
+
RKey string `json:"rkey"`
393
+
}
394
+
395
+
if err := c.BindJSON(&req); err != nil {
396
+
req.Collection = c.PostForm("collection")
397
+
req.Repo = c.PostForm("repo")
398
+
req.RKey = c.PostForm("rkey")
399
+
}
400
+
401
+
if req.Collection == "" || req.Repo == "" || req.RKey == "" {
402
+
c.JSON(400, gin.H{"error": "invalid parameters"})
403
+
return
404
+
}
405
+
406
+
recordResponse, err := agnostic.RepoGetRecord(c.Request.Context(), sl, "", req.Collection, req.Repo, req.RKey)
407
+
if err != nil {
408
+
s.processRecord(req.Repo, req.Collection, req.RKey, true)
409
+
410
+
c.Status(200)
411
+
return
412
+
}
413
+
414
+
uri := recordResponse.Uri
415
+
aturi, err := syntax.ParseATURI(uri)
416
+
if err != nil {
417
+
c.JSON(400, gin.H{"error": "failed to parse aturi from remote"})
418
+
return
419
+
}
420
+
421
+
s.processRecord(aturi.Authority().String(), string(aturi.Collection()), string(aturi.RecordKey()), false)
422
+
c.Status(200)
423
+
}
424
+
425
+
func (s *Server) handleValidateRecord(c *gin.Context) {
426
+
var req struct {
427
+
Collection string `json:"collection"`
428
+
Repo string `json:"repo"`
429
+
RKey string `json:"rkey"`
430
+
}
431
+
if err := c.BindJSON(&req); err != nil {
432
+
c.JSON(400, gin.H{"error": "invalid json"})
433
+
return
434
+
}
435
+
436
+
key := makeKey(req.Repo, req.Collection, req.RKey)
437
+
exists := false
438
+
439
+
err := s.db.View(func(txn *badger.Txn) error {
440
+
_, err := txn.Get(key)
441
+
if err == nil {
442
+
exists = true
443
+
} else if err == badger.ErrKeyNotFound {
444
+
exists = false
445
+
return nil
446
+
}
447
+
return err
448
+
})
449
+
450
+
if err != nil {
451
+
c.JSON(500, gin.H{"error": err.Error()})
452
+
return
453
+
}
454
+
455
+
if exists {
456
+
c.Status(200)
457
+
} else {
458
+
c.Status(404)
459
+
}
460
+
}
461
+
462
+
func (s *Server) handleQueryCollectionRkey(c *gin.Context) {
463
+
collection := c.Query("collection")
464
+
rkey := c.Query("rkey")
465
+
466
+
if collection == "" || rkey == "" {
467
+
c.JSON(400, gin.H{"error": "collection and rkey required"})
468
+
return
469
+
}
470
+
471
+
crKey := makeCollectionRkeyKey(collection, rkey)
472
+
var dids []string
473
+
474
+
err := s.db.View(func(txn *badger.Txn) error {
475
+
item, err := txn.Get(crKey)
476
+
if err == badger.ErrKeyNotFound {
477
+
return nil
478
+
} else if err != nil {
479
+
return err
480
+
}
481
+
482
+
return item.Value(func(val []byte) error {
483
+
return json.Unmarshal(val, &dids)
484
+
})
485
+
})
486
+
487
+
if err != nil {
488
+
c.JSON(500, gin.H{"error": err.Error()})
489
+
return
490
+
}
491
+
492
+
c.JSON(200, gin.H{
493
+
"collection": collection,
494
+
"rkey": rkey,
495
+
"dids": dids,
496
+
"count": len(dids),
497
+
})
498
+
}
+293
-10
cmd/jetrelay/main.go
+293
-10
cmd/jetrelay/main.go
···
1
1
package main
2
2
3
3
import (
4
-
"flag"
4
+
"context"
5
+
"encoding/json"
5
6
"fmt"
7
+
"io"
8
+
"log"
9
+
"net/http"
10
+
"sort"
11
+
"sync"
12
+
"time"
13
+
14
+
"github.com/gorilla/websocket"
15
+
"github.com/klauspost/compress/zstd"
6
16
)
7
17
8
-
type multiFlag []string
18
+
const (
19
+
ServerPort = ":3878"
20
+
DictionaryURL = "https://raw.githubusercontent.com/bluesky-social/jetstream/main/pkg/models/zstd_dictionary"
21
+
BufferSize = 100000
22
+
ReconnectDelay = 5 * time.Second
23
+
)
9
24
10
-
func (m *multiFlag) String() string {
11
-
return fmt.Sprint(*m)
25
+
var SourceJetstreams = []string{
26
+
"ws://localhost:6008/subscribe", // local jetstream
27
+
"ws://localhost:3877/subscribe", // local backstream
28
+
}
29
+
30
+
type Event struct {
31
+
Kind string `json:"kind"`
32
+
TimeUS int64 `json:"time_us"`
33
+
Commit json.RawMessage `json:"commit,omitempty"`
34
+
}
35
+
36
+
type BufferedEvent struct {
37
+
RelayTimeUS int64
38
+
RawJSON []byte
12
39
}
13
40
14
-
func (m *multiFlag) Set(value string) error {
15
-
*m = append(*m, value)
41
+
type History struct {
42
+
events []BufferedEvent
43
+
mu sync.RWMutex
44
+
}
45
+
46
+
func (h *History) Add(jsonBytes []byte, relayTime int64) {
47
+
h.mu.Lock()
48
+
defer h.mu.Unlock()
49
+
50
+
h.events = append(h.events, BufferedEvent{
51
+
RelayTimeUS: relayTime,
52
+
RawJSON: jsonBytes,
53
+
})
54
+
55
+
if len(h.events) > BufferSize {
56
+
h.events = h.events[len(h.events)-BufferSize:]
57
+
}
58
+
}
59
+
60
+
func (h *History) GetSince(cursor int64) []BufferedEvent {
61
+
h.mu.RLock()
62
+
defer h.mu.RUnlock()
63
+
64
+
idx := sort.Search(len(h.events), func(i int) bool {
65
+
return h.events[i].RelayTimeUS > cursor
66
+
})
67
+
68
+
if idx < len(h.events) {
69
+
result := make([]BufferedEvent, len(h.events)-idx)
70
+
copy(result, h.events[idx:])
71
+
return result
72
+
}
16
73
return nil
17
74
}
18
75
76
+
var (
77
+
history = &History{events: make([]BufferedEvent, 0, BufferSize)}
78
+
zstdDict []byte
79
+
hub *Hub
80
+
upgrader = websocket.Upgrader{
81
+
CheckOrigin: func(r *http.Request) bool { return true },
82
+
}
83
+
)
84
+
19
85
func main() {
20
-
var js multiFlag
21
-
flag.Var(&js, "j", "jetstream instances 'write multiple to input more than one'")
86
+
log.Println("Initializing Relay...")
87
+
88
+
var err error
89
+
zstdDict, err = downloadDictionary()
90
+
if err != nil {
91
+
log.Fatalf("Failed to load dictionary: %v", err)
92
+
}
93
+
94
+
hub = newHub()
95
+
go hub.run()
96
+
97
+
ctx := context.Background()
98
+
for i, url := range SourceJetstreams {
99
+
go runUpstreamConsumer(ctx, i, url)
100
+
}
101
+
102
+
http.HandleFunc("/subscribe", serveWs)
103
+
log.Printf("๐ฅ Relay Active on %s", ServerPort)
104
+
if err := http.ListenAndServe(ServerPort, nil); err != nil {
105
+
log.Fatal(err)
106
+
}
107
+
}
108
+
109
+
func runUpstreamConsumer(ctx context.Context, id int, baseURL string) {
110
+
var lastSeenCursor int64 = 0
111
+
112
+
for {
113
+
connectURL := baseURL
114
+
if lastSeenCursor > 0 {
115
+
connectURL = fmt.Sprintf("%s?cursor=%d", baseURL, lastSeenCursor)
116
+
log.Printf("[Input %d] Reconnecting with cursor: %d", id, lastSeenCursor)
117
+
} else {
118
+
log.Printf("[Input %d] Connecting fresh...", id)
119
+
}
120
+
121
+
conn, _, err := websocket.DefaultDialer.Dial(connectURL, nil)
122
+
if err != nil {
123
+
log.Printf("[Input %d] Connect failed: %v. Retrying...", id, err)
124
+
time.Sleep(ReconnectDelay)
125
+
continue
126
+
}
127
+
128
+
log.Printf("[Input %d] Connected.", id)
129
+
130
+
for {
131
+
_, msg, err := conn.ReadMessage()
132
+
if err != nil {
133
+
log.Printf("[Input %d] Read error: %v", id, err)
134
+
break
135
+
}
136
+
137
+
var genericEvent map[string]interface{}
138
+
if err := json.Unmarshal(msg, &genericEvent); err != nil {
139
+
continue
140
+
}
141
+
142
+
if t, ok := genericEvent["time_us"].(float64); ok {
143
+
lastSeenCursor = int64(t)
144
+
}
145
+
146
+
nowUS := time.Now().UnixMicro()
147
+
genericEvent["time_us"] = nowUS
148
+
149
+
finalBytes, err := json.Marshal(genericEvent)
150
+
if err != nil {
151
+
continue
152
+
}
153
+
154
+
history.Add(finalBytes, nowUS)
155
+
156
+
hub.broadcast <- BufferedEvent{RelayTimeUS: nowUS, RawJSON: finalBytes}
157
+
}
158
+
conn.Close()
159
+
time.Sleep(ReconnectDelay)
160
+
}
161
+
}
162
+
163
+
func serveWs(w http.ResponseWriter, r *http.Request) {
164
+
conn, err := upgrader.Upgrade(w, r, nil)
165
+
if err != nil {
166
+
return
167
+
}
168
+
169
+
compress := r.URL.Query().Get("compress") == "true"
170
+
171
+
var clientCursor int64 = 0
172
+
cursorStr := r.URL.Query().Get("cursor")
173
+
if cursorStr != "" {
174
+
fmt.Sscanf(cursorStr, "%d", &clientCursor)
175
+
}
176
+
177
+
client := &Client{
178
+
hub: hub,
179
+
conn: conn,
180
+
send: make(chan BufferedEvent, 2048),
181
+
compress: compress,
182
+
lastSentUS: 0,
183
+
}
184
+
185
+
if compress {
186
+
enc, _ := zstd.NewWriter(nil, zstd.WithEncoderDict(zstdDict))
187
+
client.encoder = enc
188
+
}
189
+
190
+
client.hub.register <- client
191
+
192
+
go client.writePump()
193
+
194
+
if clientCursor > 0 {
195
+
log.Printf("Client requested replay from %d", clientCursor)
196
+
missedEvents := history.GetSince(clientCursor)
197
+
for _, evt := range missedEvents {
198
+
client.send <- evt
199
+
}
200
+
}
201
+
202
+
go client.readPump()
203
+
}
204
+
205
+
type Client struct {
206
+
hub *Hub
207
+
conn *websocket.Conn
208
+
send chan BufferedEvent
209
+
compress bool
210
+
encoder *zstd.Encoder
211
+
lastSentUS int64
212
+
}
213
+
214
+
type Hub struct {
215
+
clients map[*Client]bool
216
+
broadcast chan BufferedEvent
217
+
register chan *Client
218
+
unregister chan *Client
219
+
mu sync.RWMutex
220
+
}
221
+
222
+
func newHub() *Hub {
223
+
return &Hub{
224
+
clients: make(map[*Client]bool),
225
+
broadcast: make(chan BufferedEvent, 10000),
226
+
register: make(chan *Client),
227
+
unregister: make(chan *Client),
228
+
}
229
+
}
230
+
231
+
func (h *Hub) run() {
232
+
for {
233
+
select {
234
+
case client := <-h.register:
235
+
h.mu.Lock()
236
+
h.clients[client] = true
237
+
h.mu.Unlock()
238
+
239
+
case client := <-h.unregister:
240
+
h.mu.Lock()
241
+
if _, ok := h.clients[client]; ok {
242
+
delete(h.clients, client)
243
+
close(client.send)
244
+
if client.encoder != nil {
245
+
client.encoder.Close()
246
+
}
247
+
}
248
+
h.mu.Unlock()
249
+
250
+
case msg := <-h.broadcast:
251
+
h.mu.RLock()
252
+
for client := range h.clients {
253
+
select {
254
+
case client.send <- msg:
255
+
default:
256
+
go func(c *Client) {
257
+
h.unregister <- c
258
+
c.conn.Close()
259
+
}(client)
260
+
}
261
+
}
262
+
h.mu.RUnlock()
263
+
}
264
+
}
265
+
}
266
+
267
+
func (c *Client) writePump() {
268
+
defer c.conn.Close()
269
+
270
+
for msg := range c.send {
271
+
if msg.RelayTimeUS <= c.lastSentUS {
272
+
continue
273
+
}
274
+
275
+
c.lastSentUS = msg.RelayTimeUS
276
+
277
+
if c.compress {
278
+
compressed := c.encoder.EncodeAll(msg.RawJSON, nil)
279
+
if err := c.conn.WriteMessage(websocket.BinaryMessage, compressed); err != nil {
280
+
return
281
+
}
282
+
} else {
283
+
if err := c.conn.WriteMessage(websocket.TextMessage, msg.RawJSON); err != nil {
284
+
return
285
+
}
286
+
}
287
+
}
288
+
}
22
289
23
-
flag.Parse()
290
+
func (c *Client) readPump() {
291
+
defer func() {
292
+
c.hub.unregister <- c
293
+
c.conn.Close()
294
+
}()
295
+
for {
296
+
if _, _, err := c.conn.ReadMessage(); err != nil {
297
+
break
298
+
}
299
+
}
300
+
}
24
301
25
-
fmt.Println(js) // prints: [hi hello what]
302
+
func downloadDictionary() ([]byte, error) {
303
+
resp, err := http.Get(DictionaryURL)
304
+
if err != nil {
305
+
return nil, err
306
+
}
307
+
defer resp.Body.Close()
308
+
return io.ReadAll(resp.Body)
26
309
}
+7
-2
go.mod
+7
-2
go.mod
···
3
3
go 1.25.4
4
4
5
5
require (
6
-
github.com/bluesky-social/indigo v0.0.0-20251202051123-81f317e322bc
6
+
github.com/bluesky-social/indigo v0.0.0-20251206005924-d49b45419635
7
7
github.com/ericvolp12/jwt-go-secp256k1 v0.0.2
8
8
github.com/gin-contrib/cors v1.7.6
9
9
github.com/gin-gonic/gin v1.11.0
···
21
21
22
22
require (
23
23
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
24
+
github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect
25
+
github.com/dustin/go-humanize v1.0.1 // indirect
24
26
github.com/gogo/protobuf v1.3.2 // indirect
27
+
github.com/google/flatbuffers v25.2.10+incompatible // indirect
25
28
github.com/hashicorp/golang-lru v1.0.2 // indirect
26
29
github.com/ipfs/bbloom v0.0.4 // indirect
27
30
github.com/ipfs/go-block-format v0.2.0 // indirect
···
39
42
github.com/ipfs/go-merkledag v0.11.0 // indirect
40
43
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
41
44
github.com/ipfs/go-verifcid v0.0.3 // indirect
42
-
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 // indirect
45
+
github.com/ipld/go-car v0.6.2 // indirect
43
46
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
44
47
github.com/ipld/go-ipld-prime v0.21.0 // indirect
45
48
github.com/jbenet/goprocess v0.1.4 // indirect
···
59
62
60
63
require (
61
64
github.com/beorn7/perks v1.0.1 // indirect
65
+
github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1
62
66
github.com/bytedance/sonic v1.14.0 // indirect
63
67
github.com/bytedance/sonic/loader v0.3.0 // indirect
64
68
github.com/cespare/xxhash/v2 v2.3.0 // indirect
65
69
github.com/cloudwego/base64x v0.1.6 // indirect
70
+
github.com/dgraph-io/badger/v4 v4.8.0
66
71
github.com/earthboundkid/versioninfo/v2 v2.24.1 // indirect
67
72
github.com/felixge/httpsnoop v1.0.4 // indirect
68
73
github.com/gabriel-vasile/mimetype v1.4.9 // indirect
+14
go.sum
+14
go.sum
···
6
6
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
7
7
github.com/bluesky-social/indigo v0.0.0-20251202051123-81f317e322bc h1:2t+uAvfzJiCsTMwn5fW85t/IGa0+2I7BXS2ORastK4o=
8
8
github.com/bluesky-social/indigo v0.0.0-20251202051123-81f317e322bc/go.mod h1:Pm2I1+iDXn/hLbF7XCg/DsZi6uDCiOo7hZGWprSM7k0=
9
+
github.com/bluesky-social/indigo v0.0.0-20251206005924-d49b45419635 h1:kNeRrgGJH2g5OvjLqtaQ744YXqduliZYpFkJ/ld47c0=
10
+
github.com/bluesky-social/indigo v0.0.0-20251206005924-d49b45419635/go.mod h1:Pm2I1+iDXn/hLbF7XCg/DsZi6uDCiOo7hZGWprSM7k0=
11
+
github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 h1:ovcRKN1iXZnY5WApVg+0Hw2RkwMH0ziA7lSAA8vellU=
12
+
github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1/go.mod h1:5PtGi4r/PjEVBBl+0xWuQn4mBEjr9h6xsfDBADS6cHs=
9
13
github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ=
10
14
github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA=
11
15
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=
···
24
28
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
25
29
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc=
26
30
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40=
31
+
github.com/dgraph-io/badger/v4 v4.8.0 h1:JYph1ChBijCw8SLeybvPINizbDKWZ5n/GYbz2yhN/bs=
32
+
github.com/dgraph-io/badger/v4 v4.8.0/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
33
+
github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINAEJdWGOM=
34
+
github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI=
35
+
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
36
+
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
27
37
github.com/earthboundkid/versioninfo/v2 v2.24.1 h1:SJTMHaoUx3GzjjnUO1QzP3ZXK6Ee/nbWyCm58eY3oUg=
28
38
github.com/earthboundkid/versioninfo/v2 v2.24.1/go.mod h1:VcWEooDEuyUJnMfbdTh0uFN4cfEIg+kHMuWB2CDCLjw=
29
39
github.com/ericvolp12/jwt-go-secp256k1 v0.0.2 h1:puGwrNTY2vCt8eakkSEq2yeNxUD3zb2kPhv1OsF1hPs=
···
63
73
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
64
74
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
65
75
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
76
+
github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q=
77
+
github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
66
78
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
67
79
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
68
80
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
···
136
148
github.com/ipfs/go-verifcid v0.0.3/go.mod h1:gcCtGniVzelKrbk9ooUSX/pM3xlH73fZZJDzQJRvOUw=
137
149
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 h1:oFo19cBmcP0Cmg3XXbrr0V/c+xU9U1huEZp8+OgBzdI=
138
150
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4/go.mod h1:6nkFF8OmR5wLKBzRKi7/YFJpyYR7+oEn1DX+mMWnlLA=
151
+
github.com/ipld/go-car v0.6.2 h1:Hlnl3Awgnq8icK+ze3iRghk805lu8YNq3wlREDTF2qc=
152
+
github.com/ipld/go-car v0.6.2/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8=
139
153
github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc=
140
154
github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s=
141
155
github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E=
+4
readme.md
+4
readme.md
···
29
29
### `/cmd/backstream`
30
30
experimental backfiller that kinda (but not really) conforms to the jetstream event shape. designed to be ingested by consumers expecting jetstream
31
31
32
+
### `/cmd/aturilist`
33
+
experimental listRecords replacement. is not backfilled. uses the official jetstream go client, which means it suffers from this [bug](https://github.com/bluesky-social/jetstream/pull/45)
34
+
32
35
## Packages
33
36
34
37
### `/auth`
···
54
57
55
58
## todo
56
59
60
+
- clean up /cmd/appview/main.go , its a mess
57
61
- appview-side query caches
58
62
- notification service
59
63
- bookmarks service
+83
-16
shims/lex/app/bsky/feed/defs/embed.go
+83
-16
shims/lex/app/bsky/feed/defs/embed.go
···
41
41
return embedImage, nil
42
42
}
43
43
if feedPost.Embed.EmbedVideo != nil {
44
-
return nil, nil
44
+
//return nil, nil
45
+
videocdn := "https://video.bsky.app" // todo move this
46
+
embedVideo := EmbedVideoViewExtractor(ctx, aturi, feedPost.Embed.EmbedVideo, sl, cs, imgcdn, videocdn, viewer)
47
+
return embedVideo, nil
45
48
//embedType = "EmbedVideo"
46
-
return &appbsky.FeedDefs_PostView_Embed{
47
-
// EmbedImages_View *EmbedImages_View
48
-
// EmbedVideo_View *EmbedVideo_View
49
-
EmbedVideo_View: &appbsky.EmbedVideo_View{
50
-
// LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.embed.video#view"`
51
-
// Alt *string `json:"alt,omitempty" cborgen:"alt,omitempty"`
52
-
// AspectRatio *EmbedDefs_AspectRatio `json:"aspectRatio,omitempty" cborgen:"aspectRatio,omitempty"`
53
-
// Cid string `json:"cid" cborgen:"cid"`
54
-
// Playlist string `json:"playlist" cborgen:"playlist"`
55
-
// Thumbnail *string `json:"thumbnail,omitempty" cborgen:"thumbnail,omitempty"`
56
-
},
57
-
// EmbedExternal_View *EmbedExternal_View
58
-
// EmbedRecord_View *EmbedRecord_View
59
-
// EmbedRecordWithMedia_View *EmbedRecordWithMedia_View
60
-
}, nil
49
+
// return &appbsky.FeedDefs_PostView_Embed{
50
+
// // EmbedImages_View *EmbedImages_View
51
+
// // EmbedVideo_View *EmbedVideo_View
52
+
// EmbedVideo_View: &appbsky.EmbedVideo_View{
53
+
// // LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.embed.video#view"`
54
+
// LexiconTypeID: "app.bsky.embed.video#view",
55
+
// // Alt *string `json:"alt,omitempty" cborgen:"alt,omitempty"`
56
+
// Alt:
57
+
// // AspectRatio *EmbedDefs_AspectRatio `json:"aspectRatio,omitempty" cborgen:"aspectRatio,omitempty"`
58
+
// // Cid string `json:"cid" cborgen:"cid"`
59
+
// // Playlist string `json:"playlist" cborgen:"playlist"`
60
+
// // Thumbnail *string `json:"thumbnail,omitempty" cborgen:"thumbnail,omitempty"`
61
+
// },
62
+
// // EmbedExternal_View *EmbedExternal_View
63
+
// // EmbedRecord_View *EmbedRecord_View
64
+
// // EmbedRecordWithMedia_View *EmbedRecordWithMedia_View
65
+
// }, nil
61
66
}
62
67
if feedPost.Embed.EmbedExternal != nil {
63
68
embedExternal := EmbedExternalViewExtractor(ctx, aturi, feedPost.Embed.EmbedExternal, sl, cs, imgcdn, viewer)
···
156
161
}
157
162
}
158
163
if feedPost.Embed.EmbedRecordWithMedia.Media.EmbedVideo != nil {
164
+
videocdn := "https://video.bsky.app" // todo move this
165
+
embedVideo := EmbedVideoViewExtractor(ctx, aturi, feedPost.Embed.EmbedVideo, sl, cs, imgcdn, videocdn, viewer)
166
+
if embedVideo != nil {
167
+
embedmediaview = &appbsky.EmbedRecordWithMedia_View_Media{
168
+
// EmbedImages_View *EmbedImages_View
169
+
// EmbedVideo_View *EmbedVideo_View
170
+
EmbedVideo_View: embedVideo.EmbedVideo_View,
171
+
// EmbedVideo_View: &appbsky.EmbedVideo_View{
172
+
173
+
// },
174
+
// EmbedExternal_View *EmbedExternal_View
175
+
}
176
+
}
159
177
// // video extractor
160
178
// embedmediaview = &appbsky.EmbedRecordWithMedia_View_Media{
161
179
// // EmbedImages_View *EmbedImages_View
···
242
260
243
261
}
244
262
263
+
func EmbedVideoViewExtractor(ctx context.Context, aturi syntax.ATURI, embedVideo *appbsky.EmbedVideo, sl *microcosm.MicrocosmClient, cs *microcosm.MicrocosmClient, imgcdn string, videocdn string, viewer *utils.DID) *appbsky.FeedDefs_PostView_Embed {
264
+
// u := utils.MakeImageCDN(utils.DID(aturi.Authority().String()), imgcdn, "feed_thumbnail", rawimg.Image.Ref.String())
265
+
// feed_thumbnail = u
266
+
// uf := utils.MakeImageCDN(utils.DID(aturi.Authority().String()), imgcdn, "feed_fullsize", rawimg.Image.Ref.String())
267
+
// feed_fullsize = uf
268
+
/*
269
+
uri at://did:plc:mdjhvva6vlrswsj26cftjttd/app.bsky.feed.post/3m7lci6jy4k2m
270
+
video cid "bafkreifqh5647m6rsmuxpajitmbjigkg5xdfl6p4v4losks76w77vvtau4"
271
+
playlist "https://video.bsky.app/watch/did%3Aplc%3Amdjhvva6vlrswsj26cftjttd/bafkreifqh5647m6rsmuxpajitmbjigkg5xdfl6p4v4losks76w77vvtau4/playlist.m3u8"
272
+
{videocdn}/watch/{uri encoded did}/{video cid}/playlist.m3u8
273
+
thumbnail "https://video.bsky.app/watch/did%3Aplc%3Amdjhvva6vlrswsj26cftjttd/bafkreifqh5647m6rsmuxpajitmbjigkg5xdfl6p4v4losks76w77vvtau4/thumbnail.jpg"
274
+
{videocdn}/watch/{uri encoded did}/{video cid}/thumbnail.jpg
275
+
*/
276
+
if embedVideo == nil || embedVideo.Video == nil {
277
+
return nil
278
+
}
279
+
didstring := aturi.Authority().String()
280
+
did := utils.DID(didstring)
281
+
playlist := utils.MakeVideoCDN(did, videocdn, "playlist.m3u8", embedVideo.Video.Ref.String())
282
+
thumbnail := utils.MakeVideoCDN(did, videocdn, "thumbnail.jpg", embedVideo.Video.Ref.String())
283
+
return &appbsky.FeedDefs_PostView_Embed{
284
+
// EmbedImages_View *EmbedImages_View
285
+
// EmbedVideo_View *EmbedVideo_View
286
+
EmbedVideo_View: &appbsky.EmbedVideo_View{
287
+
// LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.embed.video#view"`
288
+
LexiconTypeID: "app.bsky.embed.video#view",
289
+
// Alt *string `json:"alt,omitempty" cborgen:"alt,omitempty"`
290
+
Alt: embedVideo.Alt,
291
+
// AspectRatio *EmbedDefs_AspectRatio `json:"aspectRatio,omitempty" cborgen:"aspectRatio,omitempty"`
292
+
AspectRatio: embedVideo.AspectRatio,
293
+
// Cid string `json:"cid" cborgen:"cid"`
294
+
Cid: embedVideo.Video.Ref.String(),
295
+
// Playlist string `json:"playlist" cborgen:"playlist"`
296
+
Playlist: playlist,
297
+
// Thumbnail *string `json:"thumbnail,omitempty" cborgen:"thumbnail,omitempty"`
298
+
Thumbnail: &thumbnail,
299
+
},
300
+
// EmbedExternal_View *EmbedExternal_View
301
+
// EmbedRecord_View *EmbedRecord_View
302
+
// EmbedRecordWithMedia_View *EmbedRecordWithMedia_View
303
+
}
304
+
305
+
}
306
+
245
307
func EmbedExternalViewExtractor(ctx context.Context, aturi syntax.ATURI, embedExternal *appbsky.EmbedExternal, sl *microcosm.MicrocosmClient, cs *microcosm.MicrocosmClient, imgcdn string, viewer *utils.DID) *appbsky.FeedDefs_PostView_Embed {
246
308
// todo: gif embeds needs special handling i think? maybe?
247
309
//return nil, nil
···
326
388
}
327
389
if postView.Embed.EmbedVideo_View != nil {
328
390
//has = "video"
391
+
embeds = []*appbsky.EmbedRecord_ViewRecord_Embeds_Elem{
392
+
{
393
+
EmbedVideo_View: postView.Embed.EmbedVideo_View,
394
+
},
395
+
}
329
396
}
330
397
if postView.Embed.EmbedExternal_View != nil {
331
398
embeds = []*appbsky.EmbedRecord_ViewRecord_Embeds_Elem{
+20
shims/lex/app/bsky/feed/defs/postview.go
+20
shims/lex/app/bsky/feed/defs/postview.go
···
76
76
subj, ok := like[".subject.uri"]
77
77
if ok {
78
78
likeCount = int64(subj.Records)
79
+
} else {
80
+
likeCount = int64(0)
79
81
}
82
+
} else {
83
+
likeCount = int64(0)
80
84
}
81
85
}
82
86
if links != nil &&
···
86
90
subj, ok := like[".subject.uri"]
87
91
if ok {
88
92
repostCount = int64(subj.Records)
93
+
} else {
94
+
repostCount = int64(0)
89
95
}
96
+
} else {
97
+
repostCount = int64(0)
90
98
}
91
99
}
92
100
if links != nil &&
···
96
104
subj, ok := like[".reply.parent.uri"]
97
105
if ok {
98
106
replyCount = int64(subj.Records)
107
+
} else {
108
+
replyCount = int64(0)
99
109
}
110
+
} else {
111
+
replyCount = int64(0)
100
112
}
101
113
}
102
114
if links != nil &&
···
106
118
subj, ok := like[".embed.record.uri"]
107
119
if ok {
108
120
quoteCount_noEmbed = int64(subj.Records)
121
+
} else {
122
+
quoteCount_noEmbed = int64(0)
109
123
}
124
+
} else {
125
+
quoteCount_noEmbed = int64(0)
110
126
}
111
127
}
112
128
if links != nil &&
···
116
132
subj, ok := like[".embed.record.record.uri"]
117
133
if ok {
118
134
quoteCount_withEmbed = int64(subj.Records)
135
+
} else {
136
+
quoteCount_withEmbed = int64(0)
119
137
}
138
+
} else {
139
+
quoteCount_withEmbed = int64(0)
120
140
}
121
141
}
122
142
quoteCount = quoteCount_noEmbed + quoteCount_withEmbed
+21
-9
shims/lex/app/bsky/unspecced/getpostthreadv2/query.go
+21
-9
shims/lex/app/bsky/unspecced/getpostthreadv2/query.go
···
243
243
depth int
244
244
}
245
245
246
-
func HandleGetPostThreadV2V3(c *gin.Context, sl *microcosm.MicrocosmClient, cs *microcosm.MicrocosmClient, imgcdn string) {
246
+
func HandleGetPostThreadV2V3(c *gin.Context, sl *microcosm.MicrocosmClient, cs *microcosm.MicrocosmClient, imgcdn string, existingGraph *ThreadGraph) *ThreadGraph {
247
247
ctx := c.Request.Context()
248
248
249
249
rawdid := c.GetString("user_did")
···
258
258
threadAnchorURIraw := c.Query("anchor")
259
259
if threadAnchorURIraw == "" {
260
260
c.JSON(http.StatusBadRequest, gin.H{"error": "Missing feed param"})
261
-
return
261
+
return existingGraph
262
262
}
263
263
264
264
// "Whether to include parents above the anchor.
···
314
314
315
315
threadAnchorURI, err := syntax.ParseATURI(threadAnchorURIraw)
316
316
if err != nil {
317
-
return
317
+
return existingGraph
318
318
}
319
319
320
-
threadGraph, err := ThreadGrapher(ctx, cs, sl, threadAnchorURI)
321
-
if err != nil {
322
-
c.JSON(http.StatusBadRequest, gin.H{"error": ("failed to graph the thread: " + err.Error())})
323
-
return
320
+
var workingGraph *ThreadGraph
321
+
322
+
if existingGraph != nil {
323
+
workingGraph = existingGraph
324
+
// update the existing graph to fit our needs in our subtree
325
+
workingGraph.UpdateGraphTo(threadAnchorURI)
326
+
} else {
327
+
newGraph, err := ThreadGrapher(ctx, cs, sl, threadAnchorURI)
328
+
if err != nil {
329
+
c.JSON(http.StatusBadRequest, gin.H{"error": ("failed to graph the thread: " + err.Error())})
330
+
return nil
331
+
}
332
+
workingGraph = newGraph
324
333
}
334
+
325
335
var skeletonposts []SkeletonPost
326
336
327
337
// Parent Chain
···
337
347
// root = current
338
348
break
339
349
}
340
-
parent, ok := threadGraph.ParentsMap[current]
350
+
parent, ok := workingGraph.ParentsMap[current]
341
351
if !ok {
342
352
// root = current
343
353
break
···
361
371
362
372
// Tree Replies (with OP thread priority)
363
373
// should probably be recursive
364
-
recursiveHandleV2V3TreeReplies(threadGraph, &skeletonposts, threadAnchorURI, &below, &branchingFactor, &sort, 0, 0)
374
+
recursiveHandleV2V3TreeReplies(workingGraph, &skeletonposts, threadAnchorURI, &below, &branchingFactor, &sort, 0, 0)
365
375
366
376
//maplen := len(parentsMap)
367
377
concurrentResults := MapConcurrent(
···
448
458
HasOtherReplies: false,
449
459
}
450
460
c.JSON(http.StatusOK, resp)
461
+
462
+
return workingGraph
451
463
}
452
464
453
465
func flipArray[T any](s *[]T) {
+130
-25
shims/lex/app/bsky/unspecced/getpostthreadv2/threadgrapher.go
+130
-25
shims/lex/app/bsky/unspecced/getpostthreadv2/threadgrapher.go
···
107
107
}
108
108
109
109
processingQueue := append([]syntax.ATURI{rootURI}, allRepliesATURI...)
110
-
for _, aturi := range processingQueue {
111
-
//tg.Nodes = append(tg.Nodes, aturi)
112
-
// graphinger
113
-
emptystrarray := &[]string{}
114
-
var localRepliesATURI []syntax.ATURI
115
-
limit := 100
116
-
var cursor *string
117
-
shouldContinue := true
118
-
for shouldContinue {
119
-
results, err := constellation.GetBacklinks(ctx, cs, aturi.String(), "app.bsky.feed.post:reply.parent.uri", *emptystrarray, &limit, cursor)
120
-
if err != nil {
121
-
log.Println("[ThreadGrapher] [parent graphing] exit by no replies")
122
-
return nil, fmt.Errorf("failed to get backlinks: %w", err)
123
-
}
124
-
if results.Records != nil {
125
-
for _, record := range results.Records {
126
-
aturi, err := syntax.ParseATURI("at://" + record.Did + "/" + record.Collection + "/" + record.Rkey)
127
-
if err == nil {
128
-
localRepliesATURI = append(localRepliesATURI, aturi)
110
+
// for _, aturi := range processingQueue {
111
+
// //tg.Nodes = append(tg.Nodes, aturi)
112
+
// // graphinger
113
+
// emptystrarray := &[]string{}
114
+
// var localRepliesATURI []syntax.ATURI
115
+
// limit := 100
116
+
// var cursor *string
117
+
// shouldContinue := true
118
+
// for shouldContinue {
119
+
// results, err := constellation.GetBacklinks(ctx, cs, aturi.String(), "app.bsky.feed.post:reply.parent.uri", *emptystrarray, &limit, cursor)
120
+
// if err != nil {
121
+
// log.Println("[ThreadGrapher] [parent graphing] exit by no replies")
122
+
// return nil, fmt.Errorf("failed to get backlinks: %w", err)
123
+
// }
124
+
// if results.Records != nil {
125
+
// for _, record := range results.Records {
126
+
// aturi, err := syntax.ParseATURI("at://" + record.Did + "/" + record.Collection + "/" + record.Rkey)
127
+
// if err == nil {
128
+
// localRepliesATURI = append(localRepliesATURI, aturi)
129
+
// }
130
+
// }
131
+
// }
132
+
// if results.Cursor != nil {
133
+
// cursor = results.Cursor
134
+
// } else {
135
+
// shouldContinue = false
136
+
// }
137
+
// }
138
+
// for _, reply := range localRepliesATURI {
139
+
// tg.ParentsMap[reply] = aturi
140
+
// tg.ChildrenMap[aturi] = append(tg.ChildrenMap[aturi], reply)
141
+
// }
142
+
// }
143
+
144
+
type concurrentStruct struct {
145
+
aturi syntax.ATURI
146
+
replies []syntax.ATURI
147
+
}
148
+
149
+
localRepliesATURIConcurrent := MapConcurrent(
150
+
ctx,
151
+
processingQueue,
152
+
50,
153
+
func(ctx context.Context, aturi syntax.ATURI, idx int) (*concurrentStruct, error) {
154
+
//tg.Nodes = append(tg.Nodes, aturi)
155
+
// graphinger
156
+
emptystrarray := &[]string{}
157
+
var localRepliesATURI []syntax.ATURI
158
+
limit := 100
159
+
var cursor *string
160
+
shouldContinue := true
161
+
for shouldContinue {
162
+
results, err := constellation.GetBacklinks(ctx, cs, aturi.String(), "app.bsky.feed.post:reply.parent.uri", *emptystrarray, &limit, cursor)
163
+
if err != nil {
164
+
log.Println("[ThreadGrapher] [parent graphing] exit by no replies")
165
+
return nil, fmt.Errorf("failed to get backlinks: %w", err)
166
+
}
167
+
if results.Records != nil {
168
+
for _, record := range results.Records {
169
+
aturi, err := syntax.ParseATURI("at://" + record.Did + "/" + record.Collection + "/" + record.Rkey)
170
+
if err == nil {
171
+
localRepliesATURI = append(localRepliesATURI, aturi)
172
+
}
129
173
}
130
174
}
175
+
if results.Cursor != nil {
176
+
cursor = results.Cursor
177
+
} else {
178
+
shouldContinue = false
179
+
}
131
180
}
132
-
if results.Cursor != nil {
133
-
cursor = results.Cursor
134
-
} else {
135
-
shouldContinue = false
136
-
}
181
+
return &concurrentStruct{
182
+
aturi: aturi,
183
+
replies: localRepliesATURI,
184
+
}, nil
185
+
},
186
+
)
187
+
188
+
localRepliesATURI := make([]*concurrentStruct, 0, len(localRepliesATURIConcurrent))
189
+
for _, r := range localRepliesATURIConcurrent {
190
+
if /*r != nil &&*/ r.Err == nil && r.Value != nil /*&& r.Value.aturi != nil*/ && r.Value.replies != nil {
191
+
localRepliesATURI = append(localRepliesATURI, r.Value)
137
192
}
138
-
for _, reply := range localRepliesATURI {
193
+
}
194
+
for _, replyStruct := range localRepliesATURI {
195
+
aturi := replyStruct.aturi
196
+
for _, reply := range replyStruct.replies {
139
197
tg.ParentsMap[reply] = aturi
140
198
tg.ChildrenMap[aturi] = append(tg.ChildrenMap[aturi], reply)
141
199
}
···
143
201
144
202
return tg, nil
145
203
}
204
+
205
+
// ToBytes serializes the ThreadGraph into a JSON byte slice.
206
+
// It acquires a Read Lock to ensure thread safety during serialization.
207
+
func (g *ThreadGraph) ToBytes() ([]byte, error) {
208
+
g.mu.RLock()
209
+
defer g.mu.RUnlock()
210
+
211
+
// sync.RWMutex is unexported, so json.Marshal will automatically skip it.
212
+
// syntax.ATURI implements TextMarshaler, so it works as a map key automatically.
213
+
return json.Marshal(g)
214
+
}
215
+
216
+
// ThreadGraphFromBytes deserializes a byte slice back into a ThreadGraph.
217
+
// It ensures maps are initialized even if the JSON data was empty.
218
+
func ThreadGraphFromBytes(data []byte) (*ThreadGraph, error) {
219
+
// Initialize with NewThreadGraph to ensure maps are allocated
220
+
tg := NewThreadGraph()
221
+
222
+
if err := json.Unmarshal(data, tg); err != nil {
223
+
return nil, fmt.Errorf("failed to deserialize ThreadGraph: %w", err)
224
+
}
225
+
226
+
// Safety check: specific to Go's JSON unmarshal behavior.
227
+
// If the JSON contained "null" for the maps, they might be nil again.
228
+
// We re-initialize them to avoid panics during concurrent writes later.
229
+
if tg.ParentsMap == nil {
230
+
tg.ParentsMap = make(map[syntax.ATURI]syntax.ATURI)
231
+
}
232
+
if tg.ChildrenMap == nil {
233
+
tg.ChildrenMap = make(map[syntax.ATURI][]syntax.ATURI)
234
+
}
235
+
236
+
// The Mutex (tg.mu) is zero-valued (unlocked) by default, which is exactly what we want.
237
+
return tg, nil
238
+
}
239
+
240
+
func (g *ThreadGraph) UpdateGraphTo(anchor syntax.ATURI) {
241
+
// path from anchor to root never needs to be updated
242
+
// all we need is to update all subtrees of the anchor
243
+
// so we should first do a
244
+
// recursiveHandleUpdateGraphTo(g, anchor)
245
+
// i dont think we should do a recursive thing
246
+
// it cant be optimized well, constellation queries will be sequential
247
+
// how about, grab the entire tree again, prune branches not part of the tree
248
+
// you will get a list of posts that are either new or a part of the subtree
249
+
//
250
+
}
+5
shims/utils/utils.go
+5
shims/utils/utils.go
···
39
39
func MakeImageCDN(did DID, imgcdn string, kind string, cid string) string {
40
40
return imgcdn + "/img/" + kind + "/plain/" + string(did) + "/" + cid + "@jpeg"
41
41
}
42
+
43
+
func MakeVideoCDN(did DID, videocdn string, kind string, cid string) string {
44
+
//{videocdn}/watch/{uri encoded did}/{video cid}/thumbnail.jpg
45
+
return videocdn + "/watch/" + string(did) + "/" + cid + "/" + kind
46
+
}