bluesky appview implementation using microcosm and other services server.reddwarf.app
appview bluesky reddwarf microcosm

Compare changes

Choose any two refs to compare.

Changed files
+1456 -65
cmd
appview
aturilist
jetrelay
shims
lex
app
bsky
feed
unspecced
getpostthreadv2
utils
+2
.gitignore
··· 1 + cmd/aturilist/badger_data 2 + cmd/backstream/temp
+149 -3
cmd/appview/main.go
··· 16 16 17 17 did "github.com/whyrusleeping/go-did" 18 18 "tangled.org/whey.party/red-dwarf-server/auth" 19 + aturilist "tangled.org/whey.party/red-dwarf-server/cmd/aturilist/client" 19 20 "tangled.org/whey.party/red-dwarf-server/microcosm/constellation" 20 21 "tangled.org/whey.party/red-dwarf-server/microcosm/slingshot" 21 22 appbskyactordefs "tangled.org/whey.party/red-dwarf-server/shims/lex/app/bsky/actor/defs" ··· 44 45 SPACEDUST_URL string 45 46 SLINGSHOT_URL string 46 47 CONSTELLATION_URL string 48 + ATURILIST_URL string 47 49 ) 48 50 49 51 func initURLs(prod bool) { ··· 52 54 SPACEDUST_URL = "wss://spacedust.whey.party/subscribe" 53 55 SLINGSHOT_URL = "https://slingshot.whey.party" 54 56 CONSTELLATION_URL = "https://constellation.whey.party" 57 + ATURILIST_URL = "http://localhost:7155" 55 58 } else { 56 59 JETSTREAM_URL = "ws://localhost:6008/subscribe" 57 60 SPACEDUST_URL = "ws://localhost:9998/subscribe" 58 61 SLINGSHOT_URL = "http://localhost:7729" 59 62 CONSTELLATION_URL = "http://localhost:7728" 63 + ATURILIST_URL = "http://localhost:7155" 60 64 } 61 65 } 62 66 ··· 68 72 ) 69 73 70 74 func main() { 71 - log.Println("red-dwarf-server started") 75 + log.Println("red-dwarf-server AppView Service started") 72 76 prod := flag.Bool("prod", false, "use production URLs instead of localhost") 73 77 flag.Parse() 74 78 ··· 78 82 mailbox := sticket.New() 79 83 sl := slingshot.NewSlingshot(SLINGSHOT_URL) 80 84 cs := constellation.NewConstellation(CONSTELLATION_URL) 85 + al := aturilist.NewClient(ATURILIST_URL) 81 86 // spacedust is type definitions only 82 87 // jetstream types is probably available from jetstream/pkg/models 83 88 ··· 86 91 router_raw := gin.New() 87 92 router_raw.Use(gin.Logger()) 88 93 router_raw.Use(gin.Recovery()) 89 - router_raw.Use(cors.Default()) 94 + //router_raw.Use(cors.Default()) 95 + router_raw.Use(cors.New(cors.Config{ 96 + AllowAllOrigins: true, 97 + AllowMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"}, 98 + // You must explicitly allow the custom ATProto headers here 99 + AllowHeaders: []string{ 100 + "Origin", 101 + "Content-Length", 102 + "Content-Type", 103 + "Authorization", 104 + "Accept", 105 + "Accept-Language", 106 + "atproto-accept-labelers", // <--- The specific fix for your error 107 + "atproto-proxy", // Good to have for future compatibility 108 + }, 109 + ExposeHeaders: []string{"Content-Length", "Link"}, 110 + AllowCredentials: true, 111 + MaxAge: 12 * time.Hour, 112 + })) 90 113 91 114 router_raw.GET("/.well-known/did.json", GetWellKnownDID) 92 115 ··· 628 651 // V2V2 still doesnt work. should probably make the handler from scratch to fully use the thread grapher. 629 652 // also the thread grapher is still sequental. pls fix that 630 653 //appbskyunspeccedgetpostthreadv2.HandleGetPostThreadV2V2(c, sl, cs, BSKYIMAGECDN_URL) 631 - appbskyunspeccedgetpostthreadv2.HandleGetPostThreadV2V3(c, sl, cs, BSKYIMAGECDN_URL) 654 + 655 + var existingGraph *appbskyunspeccedgetpostthreadv2.ThreadGraph 656 + // var kvkey string 657 + // threadAnchorURIraw := c.Query("anchor") 658 + // if threadAnchorURIraw != "" { 659 + // threadAnchorURI, err := syntax.ParseATURI(threadAnchorURIraw) 660 + // if err == nil { 661 + // kvkey = "ThreadGraph" + threadAnchorURI.String() 662 + // val, ok := kv.Get(kvkey) 663 + // if ok { 664 + // parsed, err := appbskyunspeccedgetpostthreadv2.ThreadGraphFromBytes(val) 665 + // if err != nil { 666 + // existingGraph = parsed 667 + // } 668 + // } 669 + // } 670 + // } 671 + 672 + returnedGraph := appbskyunspeccedgetpostthreadv2.HandleGetPostThreadV2V3(c, sl, cs, BSKYIMAGECDN_URL, existingGraph) 673 + _ = returnedGraph 674 + // bytes, err := returnedGraph.ToBytes() 675 + // if err == nil && kvkey != "" { 676 + // kv.Set(kvkey, bytes, 1*time.Minute) 677 + // } 678 + }) 679 + 680 + router.GET("/xrpc/app.bsky.feed.getAuthorFeed", 681 + func(c *gin.Context) { 682 + 683 + rawdid := c.GetString("user_did") 684 + log.Println("getFeed router_unsafe user_did: " + rawdid) 685 + var viewer *utils.DID 686 + didval, errdid := utils.NewDID(rawdid) 687 + if errdid != nil { 688 + viewer = nil 689 + } else { 690 + viewer = &didval 691 + } 692 + 693 + actorDidParam := c.Query("actor") 694 + if actorDidParam == "" { 695 + c.JSON(http.StatusBadRequest, gin.H{"error": "Missing actor param"}) 696 + return 697 + } 698 + cursorRawParam := c.Query("cursor") 699 + 700 + listResp, err := al.ListRecords(ctx, actorDidParam, "app.bsky.feed.post", cursorRawParam, true) 701 + if err != nil { 702 + log.Fatalf("Failed to list: %v", err) 703 + } 704 + 705 + concurrentResults := MapConcurrent( 706 + ctx, 707 + listResp.Aturis, 708 + 20, 709 + func(ctx context.Context, raw string, idx int) (*appbsky.FeedDefs_FeedViewPost, error) { 710 + post, _, err := appbskyfeeddefs.PostView(ctx, raw, sl, cs, BSKYIMAGECDN_URL, viewer, 2) 711 + if err != nil { 712 + return nil, err 713 + } 714 + if post == nil { 715 + return nil, fmt.Errorf("post not found") 716 + } 717 + 718 + return &appbsky.FeedDefs_FeedViewPost{ 719 + // FeedContext *string `json:"feedContext,omitempty" cborgen:"feedContext,omitempty"` 720 + // Post *FeedDefs_PostView `json:"post" cborgen:"post"` 721 + Post: post, 722 + // Reason *FeedDefs_FeedViewPost_Reason `json:"reason,omitempty" cborgen:"reason,omitempty"` 723 + // Reason: &appbsky.FeedDefs_FeedViewPost_Reason{ 724 + // // FeedDefs_ReasonRepost *FeedDefs_ReasonRepost 725 + // FeedDefs_ReasonRepost: &appbsky.FeedDefs_ReasonRepost{ 726 + // // LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.feed.defs#reasonRepost"` 727 + // LexiconTypeID: "app.bsky.feed.defs#reasonRepost", 728 + // // By *ActorDefs_ProfileViewBasic `json:"by" cborgen:"by"` 729 + // // Cid *string `json:"cid,omitempty" cborgen:"cid,omitempty"` 730 + // // IndexedAt string `json:"indexedAt" cborgen:"indexedAt"` 731 + // // Uri *string `json:"uri,omitempty" cborgen:"uri,omitempty"` 732 + // Uri: &raw.Reason.FeedDefs_SkeletonReasonRepost.Repost, 733 + // }, 734 + // // FeedDefs_ReasonPin *FeedDefs_ReasonPin 735 + // FeedDefs_ReasonPin: &appbsky.FeedDefs_ReasonPin{ 736 + // // LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.feed.defs#reasonPin"` 737 + // LexiconTypeID: "app.bsky.feed.defs#reasonPin", 738 + // }, 739 + // }, 740 + // Reply *FeedDefs_ReplyRef `json:"reply,omitempty" cborgen:"reply,omitempty"` 741 + // // reqId: Unique identifier per request that may be passed back alongside interactions. 742 + // ReqId *string `json:"reqId,omitempty" cborgen:"reqId,omitempty"` 743 + }, nil 744 + }, 745 + ) 746 + 747 + // build final slice 748 + out := make([]*appbsky.FeedDefs_FeedViewPost, 0, len(concurrentResults)) 749 + for _, r := range concurrentResults { 750 + if r.Err == nil && r.Value != nil && r.Value.Post != nil { 751 + out = append(out, r.Value) 752 + } 753 + } 754 + 755 + c.JSON(http.StatusOK, &appbsky.FeedGetAuthorFeed_Output{ 756 + Cursor: &listResp.Cursor, 757 + Feed: out, 758 + }) 632 759 }) 633 760 634 761 // weird stuff ··· 663 790 } 664 791 }(clientUUID) 665 792 } 793 + c.String(http.StatusOK, ` ____ __________ ____ _ _____ ____ ______ 794 + / __ \/ ____/ __ \ / __ \ | / / | / __ \/ ____/ 795 + / /_/ / __/ / / / / / / / / | /| / / /| | / /_/ / /_ 796 + / _, _/ /___/ /_/ / / /_/ /| |/ |/ / ___ |/ _, _/ __/ 797 + /_/ |_/_____/_____/ /_____/ |__/|__/_/ |_/_/ |_/_/ 798 + _____ __________ _ ____________ 799 + / ___// ____/ __ \ | / / ____/ __ \ 800 + \__ \/ __/ / /_/ / | / / __/ / /_/ / 801 + ___/ / /___/ _, _/| |/ / /___/ _, _/ 802 + /____/_____/_/ |_| |___/_____/_/ |_| 803 + 804 + This is an AT Protocol Application View (AppView) for any application that supports app.bsky.* xrpc methods. 805 + 806 + Most API routes are under /xrpc/ 807 + 808 + Code: https://tangled.org/whey.party/red-dwarf-server 809 + Protocol: https://atproto.com 810 + Try it on: https://new.reddwarf.app 811 + `) 666 812 }) 667 813 router_raw.Run(":7152") 668 814 }
+230
cmd/aturilist/client/client.go
··· 1 + package aturilist 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "net/http" 10 + "net/url" 11 + "time" 12 + ) 13 + 14 + // Constants for the XRPC methods 15 + const ( 16 + MethodListRecords = "app.reddwarf.aturilist.listRecords" 17 + MethodCountRecords = "app.reddwarf.aturilist.countRecords" 18 + MethodIndexRecord = "app.reddwarf.aturilist.indexRecord" 19 + MethodValidateRecord = "app.reddwarf.aturilist.validateRecord" 20 + MethodQueryCollectionRkey = "app.reddwarf.aturilist.queryCollectionRkey" 21 + DefaultProductionHost = "https://aturilist.reddwarf.app" 22 + ) 23 + 24 + // Client is the API client for the Red Dwarf AtURI List Service. 25 + type Client struct { 26 + Host string 27 + HTTPClient *http.Client 28 + // AuthToken is the JWT used for the Authorization header 29 + AuthToken string 30 + } 31 + 32 + // NewClient creates a new client. If host is empty, it defaults to production. 33 + func NewClient(host string) *Client { 34 + if host == "" { 35 + host = DefaultProductionHost 36 + } 37 + return &Client{ 38 + Host: host, 39 + HTTPClient: &http.Client{ 40 + Timeout: 10 * time.Second, 41 + }, 42 + } 43 + } 44 + 45 + // --- Response Models --- 46 + 47 + type ListRecordsResponse struct { 48 + Aturis []string `json:"aturis"` 49 + Count int `json:"count"` 50 + Cursor string `json:"cursor,omitempty"` 51 + } 52 + 53 + type CountRecordsResponse struct { 54 + Repo string `json:"repo"` 55 + Collection string `json:"collection"` 56 + Count int `json:"count"` 57 + } 58 + 59 + type QueryCollectionRkeyResponse struct { 60 + Collection string `json:"collection"` 61 + RKey string `json:"rkey"` 62 + DIDs []string `json:"dids"` 63 + Count int `json:"count"` 64 + } 65 + 66 + type ErrorResponse struct { 67 + Error string `json:"error"` 68 + } 69 + 70 + // --- Request Models --- 71 + 72 + type RecordRequest struct { 73 + Repo string `json:"repo"` 74 + Collection string `json:"collection"` 75 + RKey string `json:"rkey"` 76 + } 77 + 78 + // --- Methods --- 79 + 80 + // ListRecords retrieves a list of AT URIs. 81 + // Set reverse=true to get newest records first. 82 + func (c *Client) ListRecords(ctx context.Context, repo, collection, cursor string, reverse bool) (*ListRecordsResponse, error) { 83 + params := url.Values{} 84 + params.Set("repo", repo) 85 + params.Set("collection", collection) 86 + 87 + if cursor != "" { 88 + params.Set("cursor", cursor) 89 + } 90 + 91 + if reverse { 92 + params.Set("reverse", "true") 93 + } 94 + 95 + var resp ListRecordsResponse 96 + if err := c.doRequest(ctx, http.MethodGet, MethodListRecords, params, nil, &resp); err != nil { 97 + return nil, err 98 + } 99 + 100 + return &resp, nil 101 + } 102 + 103 + // CountRecords returns the total number of records indexed for a collection. 104 + func (c *Client) CountRecords(ctx context.Context, repo, collection string) (*CountRecordsResponse, error) { 105 + params := url.Values{} 106 + params.Set("repo", repo) 107 + params.Set("collection", collection) 108 + 109 + var resp CountRecordsResponse 110 + if err := c.doRequest(ctx, http.MethodGet, MethodCountRecords, params, nil, &resp); err != nil { 111 + return nil, err 112 + } 113 + 114 + return &resp, nil 115 + } 116 + 117 + // IndexRecord triggers a manual index of a specific record. 118 + // This endpoint is rate-limited on the server. 119 + func (c *Client) IndexRecord(ctx context.Context, repo, collection, rkey string) error { 120 + reqBody := RecordRequest{ 121 + Repo: repo, 122 + Collection: collection, 123 + RKey: rkey, 124 + } 125 + 126 + // Server returns 200 OK on success, body is empty or status only. 127 + return c.doRequest(ctx, http.MethodPost, MethodIndexRecord, nil, reqBody, nil) 128 + } 129 + 130 + // ValidateRecord checks if a specific record exists in the local DB. 131 + // Returns true if exists, false if 404, error otherwise. 132 + func (c *Client) ValidateRecord(ctx context.Context, repo, collection, rkey string) (bool, error) { 133 + reqBody := RecordRequest{ 134 + Repo: repo, 135 + Collection: collection, 136 + RKey: rkey, 137 + } 138 + 139 + err := c.doRequest(ctx, http.MethodPost, MethodValidateRecord, nil, reqBody, nil) 140 + if err != nil { 141 + // Parse standard error to see if it was a 404 142 + if clientErr, ok := err.(*ClientError); ok && clientErr.StatusCode == 404 { 143 + return false, nil 144 + } 145 + return false, err 146 + } 147 + 148 + return true, nil 149 + } 150 + 151 + // QueryCollectionRkey returns a list of DIDs that have a specific collection and rkey pair. 152 + func (c *Client) QueryCollectionRkey(ctx context.Context, collection, rkey string) (*QueryCollectionRkeyResponse, error) { 153 + params := url.Values{} 154 + params.Set("collection", collection) 155 + params.Set("rkey", rkey) 156 + 157 + var resp QueryCollectionRkeyResponse 158 + if err := c.doRequest(ctx, http.MethodGet, MethodQueryCollectionRkey, params, nil, &resp); err != nil { 159 + return nil, err 160 + } 161 + 162 + return &resp, nil 163 + } 164 + 165 + // --- Internal Helpers --- 166 + 167 + type ClientError struct { 168 + StatusCode int 169 + Message string 170 + } 171 + 172 + func (e *ClientError) Error() string { 173 + return fmt.Sprintf("api error (status %d): %s", e.StatusCode, e.Message) 174 + } 175 + 176 + func (c *Client) doRequest(ctx context.Context, method, xrpcMethod string, params url.Values, body interface{}, dest interface{}) error { 177 + u, err := url.Parse(fmt.Sprintf("%s/xrpc/%s", c.Host, xrpcMethod)) 178 + if err != nil { 179 + return fmt.Errorf("invalid url: %w", err) 180 + } 181 + 182 + if len(params) > 0 { 183 + u.RawQuery = params.Encode() 184 + } 185 + 186 + var bodyReader io.Reader 187 + if body != nil { 188 + jsonBytes, err := json.Marshal(body) 189 + if err != nil { 190 + return fmt.Errorf("failed to marshal body: %w", err) 191 + } 192 + bodyReader = bytes.NewBuffer(jsonBytes) 193 + } 194 + 195 + req, err := http.NewRequestWithContext(ctx, method, u.String(), bodyReader) 196 + if err != nil { 197 + return fmt.Errorf("failed to create request: %w", err) 198 + } 199 + 200 + req.Header.Set("Content-Type", "application/json") 201 + if c.AuthToken != "" { 202 + req.Header.Set("Authorization", "Bearer "+c.AuthToken) 203 + } 204 + 205 + resp, err := c.HTTPClient.Do(req) 206 + if err != nil { 207 + return fmt.Errorf("request failed: %w", err) 208 + } 209 + defer resp.Body.Close() 210 + 211 + // Handle non-200 responses 212 + if resp.StatusCode < 200 || resp.StatusCode >= 300 { 213 + var errResp ErrorResponse 214 + // Try to decode server error message 215 + if decodeErr := json.NewDecoder(resp.Body).Decode(&errResp); decodeErr == nil && errResp.Error != "" { 216 + return &ClientError{StatusCode: resp.StatusCode, Message: errResp.Error} 217 + } 218 + // Fallback if JSON decode fails or empty 219 + return &ClientError{StatusCode: resp.StatusCode, Message: resp.Status} 220 + } 221 + 222 + // Decode response if destination provided 223 + if dest != nil { 224 + if err := json.NewDecoder(resp.Body).Decode(dest); err != nil { 225 + return fmt.Errorf("failed to decode response: %w", err) 226 + } 227 + } 228 + 229 + return nil 230 + }
+498
cmd/aturilist/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "flag" 8 + "fmt" 9 + "log" 10 + "log/slog" 11 + "os" 12 + "strings" 13 + "sync" 14 + "time" 15 + 16 + "github.com/bluesky-social/indigo/api/agnostic" 17 + "github.com/bluesky-social/indigo/atproto/syntax" 18 + "github.com/bluesky-social/jetstream/pkg/client" 19 + "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 20 + "github.com/bluesky-social/jetstream/pkg/models" 21 + "github.com/dgraph-io/badger/v4" 22 + "github.com/gin-gonic/gin" 23 + 24 + "tangled.org/whey.party/red-dwarf-server/auth" 25 + "tangled.org/whey.party/red-dwarf-server/microcosm" 26 + "tangled.org/whey.party/red-dwarf-server/microcosm/slingshot" 27 + ) 28 + 29 + type Server struct { 30 + db *badger.DB 31 + logger *slog.Logger 32 + 33 + backfillTracker map[string]*sync.WaitGroup 34 + backfillMutex sync.Mutex 35 + } 36 + 37 + var ( 38 + JETSTREAM_URL string 39 + SPACEDUST_URL string 40 + SLINGSHOT_URL string 41 + CONSTELLATION_URL string 42 + ) 43 + 44 + func initURLs(prod bool) { 45 + if !prod { 46 + JETSTREAM_URL = "wss://jetstream.whey.party/subscribe" 47 + SPACEDUST_URL = "wss://spacedust.whey.party/subscribe" 48 + SLINGSHOT_URL = "https://slingshot.whey.party" 49 + CONSTELLATION_URL = "https://constellation.whey.party" 50 + } else { 51 + JETSTREAM_URL = "ws://localhost:6008/subscribe" 52 + SPACEDUST_URL = "ws://localhost:9998/subscribe" 53 + SLINGSHOT_URL = "http://localhost:7729" 54 + CONSTELLATION_URL = "http://localhost:7728" 55 + } 56 + } 57 + 58 + const ( 59 + BSKYIMAGECDN_URL = "https://cdn.bsky.app" 60 + BSKYVIDEOCDN_URL = "https://video.bsky.app" 61 + serviceWebDID = "did:web:aturilist.reddwarf.app" 62 + serviceWebHost = "https://aturilist.reddwarf.app" 63 + ) 64 + 65 + func main() { 66 + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) 67 + log.Println("red-dwarf-server AtURI List Service started") 68 + 69 + prod := flag.Bool("prod", false, "use production URLs instead of localhost") 70 + dbPath := flag.String("db", "./badger_data", "path to badger db") 71 + flag.Parse() 72 + 73 + initURLs(*prod) 74 + 75 + db, err := badger.Open(badger.DefaultOptions(*dbPath)) 76 + if err != nil { 77 + logger.Error("Failed to open BadgerDB", "error", err) 78 + os.Exit(1) 79 + } 80 + defer db.Close() 81 + 82 + srv := &Server{ 83 + db: db, 84 + logger: logger, 85 + } 86 + 87 + auther, err := auth.NewAuth( 88 + 100_000, 89 + time.Hour*12, 90 + 5, 91 + serviceWebDID, 92 + ) 93 + if err != nil { 94 + log.Fatalf("Failed to create Auth: %v", err) 95 + } 96 + 97 + ctx := context.Background() 98 + sl := slingshot.NewSlingshot(SLINGSHOT_URL) 99 + 100 + config := client.DefaultClientConfig() 101 + config.WebsocketURL = JETSTREAM_URL 102 + config.Compress = true 103 + 104 + handler := &JetstreamHandler{srv: srv} 105 + scheduler := sequential.NewScheduler("my_app", logger, handler.HandleEvent) 106 + 107 + c, err := client.NewClient(config, logger, scheduler) 108 + if err != nil { 109 + logger.Error("failed to create client", "error", err) 110 + return 111 + } 112 + 113 + cursor := time.Now().Add(-5 * time.Minute).UnixMicro() 114 + 115 + go func() { 116 + logger.Info("Connecting to Jetstream...") 117 + for { 118 + if err := c.ConnectAndRead(ctx, &cursor); err != nil { 119 + logger.Error("jetstream connection disconnected", "error", err) 120 + } 121 + 122 + select { 123 + case <-ctx.Done(): 124 + return 125 + default: 126 + logger.Info("Reconnecting to Jetstream in 5 seconds...", "cursor", cursor) 127 + time.Sleep(5 * time.Second) 128 + } 129 + } 130 + }() 131 + 132 + router := gin.New() 133 + router.Use(auther.AuthenticateGinRequestViaJWT) 134 + 135 + router.GET("/xrpc/app.reddwarf.aturilist.listRecords", srv.handleListRecords) 136 + 137 + router.GET("/xrpc/app.reddwarf.aturilist.countRecords", srv.handleCountRecords) 138 + 139 + router.POST("/xrpc/app.reddwarf.aturilist.indexRecord", func(c *gin.Context) { 140 + srv.handleIndexRecord(c, sl) 141 + }) 142 + 143 + router.POST("/xrpc/app.reddwarf.aturilist.validateRecord", srv.handleValidateRecord) 144 + 145 + router.GET("/xrpc/app.reddwarf.aturilist.queryCollectionRkey", srv.handleQueryCollectionRkey) 146 + 147 + // router.GET("/xrpc/app.reddwarf.aturilist.requestBackfill", ) 148 + 149 + router.Run(":7155") 150 + } 151 + 152 + type JetstreamHandler struct { 153 + srv *Server 154 + } 155 + 156 + func (h *JetstreamHandler) HandleEvent(ctx context.Context, event *models.Event) error { 157 + if event != nil { 158 + if event.Commit != nil { 159 + isDelete := event.Commit.Operation == models.CommitOperationDelete 160 + 161 + h.srv.processRecord(event.Did, event.Commit.Collection, event.Commit.RKey, isDelete) 162 + 163 + } 164 + } 165 + return nil 166 + } 167 + 168 + func makeKey(repo, collection, rkey string) []byte { 169 + return []byte(fmt.Sprintf("%s|%s|%s", repo, collection, rkey)) 170 + } 171 + 172 + func parseKey(key []byte) (repo, collection, rkey string, err error) { 173 + parts := strings.Split(string(key), "|") 174 + if len(parts) != 3 { 175 + return "", "", "", errors.New("invalid key format") 176 + } 177 + return parts[0], parts[1], parts[2], nil 178 + } 179 + 180 + func makeCollectionRkeyKey(collection, rkey string) []byte { 181 + return []byte(fmt.Sprintf("cr|%s|%s|", collection, rkey)) 182 + } 183 + 184 + func parseCollectionRkeyKey(key []byte) (collection, rkey string, err error) { 185 + parts := strings.Split(string(key), "|") 186 + if len(parts) < 3 || parts[0] != "cr" { 187 + return "", "", errors.New("invalid collection+rkey key format") 188 + } 189 + return parts[1], parts[2], nil 190 + } 191 + 192 + func (s *Server) processRecord(repo, collection, rkey string, isDelete bool) { 193 + key := makeKey(repo, collection, rkey) 194 + crKey := makeCollectionRkeyKey(collection, rkey) 195 + 196 + err := s.db.Update(func(txn *badger.Txn) error { 197 + if isDelete { 198 + if err := txn.Delete(key); err != nil { 199 + return err 200 + } 201 + return s.removeDidFromCollectionRkeyIndex(txn, crKey, repo) 202 + } 203 + if err := txn.Set(key, []byte(time.Now().Format(time.RFC3339))); err != nil { 204 + return err 205 + } 206 + return s.addDidToCollectionRkeyIndex(txn, crKey, repo) 207 + }) 208 + 209 + if err != nil { 210 + s.logger.Error("Failed to update DB", "repo", repo, "rkey", rkey, "err", err) 211 + } 212 + } 213 + 214 + func (s *Server) addDidToCollectionRkeyIndex(txn *badger.Txn, crKey []byte, did string) error { 215 + item, err := txn.Get(crKey) 216 + if err == badger.ErrKeyNotFound { 217 + var dids []string 218 + dids = append(dids, did) 219 + didsJSON, _ := json.Marshal(dids) 220 + return txn.Set(crKey, didsJSON) 221 + } else if err != nil { 222 + return err 223 + } 224 + 225 + var dids []string 226 + err = item.Value(func(val []byte) error { 227 + return json.Unmarshal(val, &dids) 228 + }) 229 + if err != nil { 230 + return err 231 + } 232 + 233 + for _, existingDid := range dids { 234 + if existingDid == did { 235 + return nil 236 + } 237 + } 238 + 239 + dids = append(dids, did) 240 + didsJSON, _ := json.Marshal(dids) 241 + return txn.Set(crKey, didsJSON) 242 + } 243 + 244 + func (s *Server) removeDidFromCollectionRkeyIndex(txn *badger.Txn, crKey []byte, did string) error { 245 + item, err := txn.Get(crKey) 246 + if err == badger.ErrKeyNotFound { 247 + return nil 248 + } else if err != nil { 249 + return err 250 + } 251 + 252 + var dids []string 253 + err = item.Value(func(val []byte) error { 254 + return json.Unmarshal(val, &dids) 255 + }) 256 + if err != nil { 257 + return err 258 + } 259 + 260 + var newDids []string 261 + for _, existingDid := range dids { 262 + if existingDid != did { 263 + newDids = append(newDids, existingDid) 264 + } 265 + } 266 + 267 + if len(newDids) == 0 { 268 + return txn.Delete(crKey) 269 + } 270 + 271 + didsJSON, _ := json.Marshal(newDids) 272 + return txn.Set(crKey, didsJSON) 273 + } 274 + 275 + func (s *Server) handleListRecords(c *gin.Context) { 276 + repo := c.Query("repo") 277 + collection := c.Query("collection") 278 + cursor := c.Query("cursor") 279 + reverse := c.Query("reverse") == "true" 280 + limit := 50 281 + 282 + if repo == "" || collection == "" { 283 + c.JSON(400, gin.H{"error": "repo and collection required"}) 284 + return 285 + } 286 + 287 + prefixStr := fmt.Sprintf("%s|%s|", repo, collection) 288 + prefix := []byte(prefixStr) 289 + 290 + var aturis []string 291 + var lastRkey string 292 + 293 + err := s.db.View(func(txn *badger.Txn) error { 294 + opts := badger.DefaultIteratorOptions 295 + opts.PrefetchValues = false 296 + opts.Reverse = reverse 297 + 298 + it := txn.NewIterator(opts) 299 + defer it.Close() 300 + 301 + var startKey []byte 302 + if cursor != "" { 303 + startKey = makeKey(repo, collection, cursor) 304 + } else { 305 + if reverse { 306 + startKey = append([]byte(prefixStr), 0xFF) 307 + } else { 308 + startKey = prefix 309 + } 310 + } 311 + 312 + it.Seek(startKey) 313 + 314 + if cursor != "" && it.Valid() { 315 + if string(it.Item().Key()) == string(startKey) { 316 + it.Next() 317 + } 318 + } 319 + 320 + for ; it.ValidForPrefix(prefix); it.Next() { 321 + if len(aturis) >= limit { 322 + break 323 + } 324 + item := it.Item() 325 + k := item.Key() 326 + _, _, rkey, err := parseKey(k) 327 + if err == nil { 328 + aturis = append(aturis, fmt.Sprintf("at://%s/%s/%s", repo, collection, rkey)) 329 + lastRkey = rkey 330 + } 331 + } 332 + return nil 333 + }) 334 + 335 + if err != nil { 336 + c.JSON(500, gin.H{"error": err.Error()}) 337 + return 338 + } 339 + 340 + resp := gin.H{ 341 + "aturis": aturis, 342 + "count": len(aturis), 343 + } 344 + 345 + if lastRkey != "" && len(aturis) == limit { 346 + resp["cursor"] = lastRkey 347 + } 348 + 349 + c.JSON(200, resp) 350 + } 351 + 352 + func (s *Server) handleCountRecords(c *gin.Context) { 353 + repo := c.Query("repo") 354 + collection := c.Query("collection") 355 + 356 + if repo == "" || collection == "" { 357 + c.JSON(400, gin.H{"error": "repo and collection required"}) 358 + return 359 + } 360 + 361 + prefix := []byte(fmt.Sprintf("%s|%s|", repo, collection)) 362 + count := 0 363 + 364 + err := s.db.View(func(txn *badger.Txn) error { 365 + opts := badger.DefaultIteratorOptions 366 + opts.PrefetchValues = false 367 + it := txn.NewIterator(opts) 368 + defer it.Close() 369 + 370 + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { 371 + count++ 372 + } 373 + return nil 374 + }) 375 + 376 + if err != nil { 377 + c.JSON(500, gin.H{"error": err.Error()}) 378 + return 379 + } 380 + 381 + c.JSON(200, gin.H{ 382 + "repo": repo, 383 + "collection": collection, 384 + "count": count, 385 + }) 386 + } 387 + 388 + func (s *Server) handleIndexRecord(c *gin.Context, sl *microcosm.MicrocosmClient) { 389 + var req struct { 390 + Collection string `json:"collection"` 391 + Repo string `json:"repo"` 392 + RKey string `json:"rkey"` 393 + } 394 + 395 + if err := c.BindJSON(&req); err != nil { 396 + req.Collection = c.PostForm("collection") 397 + req.Repo = c.PostForm("repo") 398 + req.RKey = c.PostForm("rkey") 399 + } 400 + 401 + if req.Collection == "" || req.Repo == "" || req.RKey == "" { 402 + c.JSON(400, gin.H{"error": "invalid parameters"}) 403 + return 404 + } 405 + 406 + recordResponse, err := agnostic.RepoGetRecord(c.Request.Context(), sl, "", req.Collection, req.Repo, req.RKey) 407 + if err != nil { 408 + s.processRecord(req.Repo, req.Collection, req.RKey, true) 409 + 410 + c.Status(200) 411 + return 412 + } 413 + 414 + uri := recordResponse.Uri 415 + aturi, err := syntax.ParseATURI(uri) 416 + if err != nil { 417 + c.JSON(400, gin.H{"error": "failed to parse aturi from remote"}) 418 + return 419 + } 420 + 421 + s.processRecord(aturi.Authority().String(), string(aturi.Collection()), string(aturi.RecordKey()), false) 422 + c.Status(200) 423 + } 424 + 425 + func (s *Server) handleValidateRecord(c *gin.Context) { 426 + var req struct { 427 + Collection string `json:"collection"` 428 + Repo string `json:"repo"` 429 + RKey string `json:"rkey"` 430 + } 431 + if err := c.BindJSON(&req); err != nil { 432 + c.JSON(400, gin.H{"error": "invalid json"}) 433 + return 434 + } 435 + 436 + key := makeKey(req.Repo, req.Collection, req.RKey) 437 + exists := false 438 + 439 + err := s.db.View(func(txn *badger.Txn) error { 440 + _, err := txn.Get(key) 441 + if err == nil { 442 + exists = true 443 + } else if err == badger.ErrKeyNotFound { 444 + exists = false 445 + return nil 446 + } 447 + return err 448 + }) 449 + 450 + if err != nil { 451 + c.JSON(500, gin.H{"error": err.Error()}) 452 + return 453 + } 454 + 455 + if exists { 456 + c.Status(200) 457 + } else { 458 + c.Status(404) 459 + } 460 + } 461 + 462 + func (s *Server) handleQueryCollectionRkey(c *gin.Context) { 463 + collection := c.Query("collection") 464 + rkey := c.Query("rkey") 465 + 466 + if collection == "" || rkey == "" { 467 + c.JSON(400, gin.H{"error": "collection and rkey required"}) 468 + return 469 + } 470 + 471 + crKey := makeCollectionRkeyKey(collection, rkey) 472 + var dids []string 473 + 474 + err := s.db.View(func(txn *badger.Txn) error { 475 + item, err := txn.Get(crKey) 476 + if err == badger.ErrKeyNotFound { 477 + return nil 478 + } else if err != nil { 479 + return err 480 + } 481 + 482 + return item.Value(func(val []byte) error { 483 + return json.Unmarshal(val, &dids) 484 + }) 485 + }) 486 + 487 + if err != nil { 488 + c.JSON(500, gin.H{"error": err.Error()}) 489 + return 490 + } 491 + 492 + c.JSON(200, gin.H{ 493 + "collection": collection, 494 + "rkey": rkey, 495 + "dids": dids, 496 + "count": len(dids), 497 + }) 498 + }
+293 -10
cmd/jetrelay/main.go
··· 1 1 package main 2 2 3 3 import ( 4 - "flag" 4 + "context" 5 + "encoding/json" 5 6 "fmt" 7 + "io" 8 + "log" 9 + "net/http" 10 + "sort" 11 + "sync" 12 + "time" 13 + 14 + "github.com/gorilla/websocket" 15 + "github.com/klauspost/compress/zstd" 6 16 ) 7 17 8 - type multiFlag []string 18 + const ( 19 + ServerPort = ":3878" 20 + DictionaryURL = "https://raw.githubusercontent.com/bluesky-social/jetstream/main/pkg/models/zstd_dictionary" 21 + BufferSize = 100000 22 + ReconnectDelay = 5 * time.Second 23 + ) 9 24 10 - func (m *multiFlag) String() string { 11 - return fmt.Sprint(*m) 25 + var SourceJetstreams = []string{ 26 + "ws://localhost:6008/subscribe", // local jetstream 27 + "ws://localhost:3877/subscribe", // local backstream 28 + } 29 + 30 + type Event struct { 31 + Kind string `json:"kind"` 32 + TimeUS int64 `json:"time_us"` 33 + Commit json.RawMessage `json:"commit,omitempty"` 34 + } 35 + 36 + type BufferedEvent struct { 37 + RelayTimeUS int64 38 + RawJSON []byte 12 39 } 13 40 14 - func (m *multiFlag) Set(value string) error { 15 - *m = append(*m, value) 41 + type History struct { 42 + events []BufferedEvent 43 + mu sync.RWMutex 44 + } 45 + 46 + func (h *History) Add(jsonBytes []byte, relayTime int64) { 47 + h.mu.Lock() 48 + defer h.mu.Unlock() 49 + 50 + h.events = append(h.events, BufferedEvent{ 51 + RelayTimeUS: relayTime, 52 + RawJSON: jsonBytes, 53 + }) 54 + 55 + if len(h.events) > BufferSize { 56 + h.events = h.events[len(h.events)-BufferSize:] 57 + } 58 + } 59 + 60 + func (h *History) GetSince(cursor int64) []BufferedEvent { 61 + h.mu.RLock() 62 + defer h.mu.RUnlock() 63 + 64 + idx := sort.Search(len(h.events), func(i int) bool { 65 + return h.events[i].RelayTimeUS > cursor 66 + }) 67 + 68 + if idx < len(h.events) { 69 + result := make([]BufferedEvent, len(h.events)-idx) 70 + copy(result, h.events[idx:]) 71 + return result 72 + } 16 73 return nil 17 74 } 18 75 76 + var ( 77 + history = &History{events: make([]BufferedEvent, 0, BufferSize)} 78 + zstdDict []byte 79 + hub *Hub 80 + upgrader = websocket.Upgrader{ 81 + CheckOrigin: func(r *http.Request) bool { return true }, 82 + } 83 + ) 84 + 19 85 func main() { 20 - var js multiFlag 21 - flag.Var(&js, "j", "jetstream instances 'write multiple to input more than one'") 86 + log.Println("Initializing Relay...") 87 + 88 + var err error 89 + zstdDict, err = downloadDictionary() 90 + if err != nil { 91 + log.Fatalf("Failed to load dictionary: %v", err) 92 + } 93 + 94 + hub = newHub() 95 + go hub.run() 96 + 97 + ctx := context.Background() 98 + for i, url := range SourceJetstreams { 99 + go runUpstreamConsumer(ctx, i, url) 100 + } 101 + 102 + http.HandleFunc("/subscribe", serveWs) 103 + log.Printf("๐Ÿ”ฅ Relay Active on %s", ServerPort) 104 + if err := http.ListenAndServe(ServerPort, nil); err != nil { 105 + log.Fatal(err) 106 + } 107 + } 108 + 109 + func runUpstreamConsumer(ctx context.Context, id int, baseURL string) { 110 + var lastSeenCursor int64 = 0 111 + 112 + for { 113 + connectURL := baseURL 114 + if lastSeenCursor > 0 { 115 + connectURL = fmt.Sprintf("%s?cursor=%d", baseURL, lastSeenCursor) 116 + log.Printf("[Input %d] Reconnecting with cursor: %d", id, lastSeenCursor) 117 + } else { 118 + log.Printf("[Input %d] Connecting fresh...", id) 119 + } 120 + 121 + conn, _, err := websocket.DefaultDialer.Dial(connectURL, nil) 122 + if err != nil { 123 + log.Printf("[Input %d] Connect failed: %v. Retrying...", id, err) 124 + time.Sleep(ReconnectDelay) 125 + continue 126 + } 127 + 128 + log.Printf("[Input %d] Connected.", id) 129 + 130 + for { 131 + _, msg, err := conn.ReadMessage() 132 + if err != nil { 133 + log.Printf("[Input %d] Read error: %v", id, err) 134 + break 135 + } 136 + 137 + var genericEvent map[string]interface{} 138 + if err := json.Unmarshal(msg, &genericEvent); err != nil { 139 + continue 140 + } 141 + 142 + if t, ok := genericEvent["time_us"].(float64); ok { 143 + lastSeenCursor = int64(t) 144 + } 145 + 146 + nowUS := time.Now().UnixMicro() 147 + genericEvent["time_us"] = nowUS 148 + 149 + finalBytes, err := json.Marshal(genericEvent) 150 + if err != nil { 151 + continue 152 + } 153 + 154 + history.Add(finalBytes, nowUS) 155 + 156 + hub.broadcast <- BufferedEvent{RelayTimeUS: nowUS, RawJSON: finalBytes} 157 + } 158 + conn.Close() 159 + time.Sleep(ReconnectDelay) 160 + } 161 + } 162 + 163 + func serveWs(w http.ResponseWriter, r *http.Request) { 164 + conn, err := upgrader.Upgrade(w, r, nil) 165 + if err != nil { 166 + return 167 + } 168 + 169 + compress := r.URL.Query().Get("compress") == "true" 170 + 171 + var clientCursor int64 = 0 172 + cursorStr := r.URL.Query().Get("cursor") 173 + if cursorStr != "" { 174 + fmt.Sscanf(cursorStr, "%d", &clientCursor) 175 + } 176 + 177 + client := &Client{ 178 + hub: hub, 179 + conn: conn, 180 + send: make(chan BufferedEvent, 2048), 181 + compress: compress, 182 + lastSentUS: 0, 183 + } 184 + 185 + if compress { 186 + enc, _ := zstd.NewWriter(nil, zstd.WithEncoderDict(zstdDict)) 187 + client.encoder = enc 188 + } 189 + 190 + client.hub.register <- client 191 + 192 + go client.writePump() 193 + 194 + if clientCursor > 0 { 195 + log.Printf("Client requested replay from %d", clientCursor) 196 + missedEvents := history.GetSince(clientCursor) 197 + for _, evt := range missedEvents { 198 + client.send <- evt 199 + } 200 + } 201 + 202 + go client.readPump() 203 + } 204 + 205 + type Client struct { 206 + hub *Hub 207 + conn *websocket.Conn 208 + send chan BufferedEvent 209 + compress bool 210 + encoder *zstd.Encoder 211 + lastSentUS int64 212 + } 213 + 214 + type Hub struct { 215 + clients map[*Client]bool 216 + broadcast chan BufferedEvent 217 + register chan *Client 218 + unregister chan *Client 219 + mu sync.RWMutex 220 + } 221 + 222 + func newHub() *Hub { 223 + return &Hub{ 224 + clients: make(map[*Client]bool), 225 + broadcast: make(chan BufferedEvent, 10000), 226 + register: make(chan *Client), 227 + unregister: make(chan *Client), 228 + } 229 + } 230 + 231 + func (h *Hub) run() { 232 + for { 233 + select { 234 + case client := <-h.register: 235 + h.mu.Lock() 236 + h.clients[client] = true 237 + h.mu.Unlock() 238 + 239 + case client := <-h.unregister: 240 + h.mu.Lock() 241 + if _, ok := h.clients[client]; ok { 242 + delete(h.clients, client) 243 + close(client.send) 244 + if client.encoder != nil { 245 + client.encoder.Close() 246 + } 247 + } 248 + h.mu.Unlock() 249 + 250 + case msg := <-h.broadcast: 251 + h.mu.RLock() 252 + for client := range h.clients { 253 + select { 254 + case client.send <- msg: 255 + default: 256 + go func(c *Client) { 257 + h.unregister <- c 258 + c.conn.Close() 259 + }(client) 260 + } 261 + } 262 + h.mu.RUnlock() 263 + } 264 + } 265 + } 266 + 267 + func (c *Client) writePump() { 268 + defer c.conn.Close() 269 + 270 + for msg := range c.send { 271 + if msg.RelayTimeUS <= c.lastSentUS { 272 + continue 273 + } 274 + 275 + c.lastSentUS = msg.RelayTimeUS 276 + 277 + if c.compress { 278 + compressed := c.encoder.EncodeAll(msg.RawJSON, nil) 279 + if err := c.conn.WriteMessage(websocket.BinaryMessage, compressed); err != nil { 280 + return 281 + } 282 + } else { 283 + if err := c.conn.WriteMessage(websocket.TextMessage, msg.RawJSON); err != nil { 284 + return 285 + } 286 + } 287 + } 288 + } 22 289 23 - flag.Parse() 290 + func (c *Client) readPump() { 291 + defer func() { 292 + c.hub.unregister <- c 293 + c.conn.Close() 294 + }() 295 + for { 296 + if _, _, err := c.conn.ReadMessage(); err != nil { 297 + break 298 + } 299 + } 300 + } 24 301 25 - fmt.Println(js) // prints: [hi hello what] 302 + func downloadDictionary() ([]byte, error) { 303 + resp, err := http.Get(DictionaryURL) 304 + if err != nil { 305 + return nil, err 306 + } 307 + defer resp.Body.Close() 308 + return io.ReadAll(resp.Body) 26 309 }
+7 -2
go.mod
··· 3 3 go 1.25.4 4 4 5 5 require ( 6 - github.com/bluesky-social/indigo v0.0.0-20251202051123-81f317e322bc 6 + github.com/bluesky-social/indigo v0.0.0-20251206005924-d49b45419635 7 7 github.com/ericvolp12/jwt-go-secp256k1 v0.0.2 8 8 github.com/gin-contrib/cors v1.7.6 9 9 github.com/gin-gonic/gin v1.11.0 ··· 21 21 22 22 require ( 23 23 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect 24 + github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect 25 + github.com/dustin/go-humanize v1.0.1 // indirect 24 26 github.com/gogo/protobuf v1.3.2 // indirect 27 + github.com/google/flatbuffers v25.2.10+incompatible // indirect 25 28 github.com/hashicorp/golang-lru v1.0.2 // indirect 26 29 github.com/ipfs/bbloom v0.0.4 // indirect 27 30 github.com/ipfs/go-block-format v0.2.0 // indirect ··· 39 42 github.com/ipfs/go-merkledag v0.11.0 // indirect 40 43 github.com/ipfs/go-metrics-interface v0.0.1 // indirect 41 44 github.com/ipfs/go-verifcid v0.0.3 // indirect 42 - github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 // indirect 45 + github.com/ipld/go-car v0.6.2 // indirect 43 46 github.com/ipld/go-codec-dagpb v1.6.0 // indirect 44 47 github.com/ipld/go-ipld-prime v0.21.0 // indirect 45 48 github.com/jbenet/goprocess v0.1.4 // indirect ··· 59 62 60 63 require ( 61 64 github.com/beorn7/perks v1.0.1 // indirect 65 + github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 62 66 github.com/bytedance/sonic v1.14.0 // indirect 63 67 github.com/bytedance/sonic/loader v0.3.0 // indirect 64 68 github.com/cespare/xxhash/v2 v2.3.0 // indirect 65 69 github.com/cloudwego/base64x v0.1.6 // indirect 70 + github.com/dgraph-io/badger/v4 v4.8.0 66 71 github.com/earthboundkid/versioninfo/v2 v2.24.1 // indirect 67 72 github.com/felixge/httpsnoop v1.0.4 // indirect 68 73 github.com/gabriel-vasile/mimetype v1.4.9 // indirect
+14
go.sum
··· 6 6 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= 7 7 github.com/bluesky-social/indigo v0.0.0-20251202051123-81f317e322bc h1:2t+uAvfzJiCsTMwn5fW85t/IGa0+2I7BXS2ORastK4o= 8 8 github.com/bluesky-social/indigo v0.0.0-20251202051123-81f317e322bc/go.mod h1:Pm2I1+iDXn/hLbF7XCg/DsZi6uDCiOo7hZGWprSM7k0= 9 + github.com/bluesky-social/indigo v0.0.0-20251206005924-d49b45419635 h1:kNeRrgGJH2g5OvjLqtaQ744YXqduliZYpFkJ/ld47c0= 10 + github.com/bluesky-social/indigo v0.0.0-20251206005924-d49b45419635/go.mod h1:Pm2I1+iDXn/hLbF7XCg/DsZi6uDCiOo7hZGWprSM7k0= 11 + github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 h1:ovcRKN1iXZnY5WApVg+0Hw2RkwMH0ziA7lSAA8vellU= 12 + github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1/go.mod h1:5PtGi4r/PjEVBBl+0xWuQn4mBEjr9h6xsfDBADS6cHs= 9 13 github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ= 10 14 github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA= 11 15 github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= ··· 24 28 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= 25 29 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc= 26 30 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40= 31 + github.com/dgraph-io/badger/v4 v4.8.0 h1:JYph1ChBijCw8SLeybvPINizbDKWZ5n/GYbz2yhN/bs= 32 + github.com/dgraph-io/badger/v4 v4.8.0/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w= 33 + github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINAEJdWGOM= 34 + github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI= 35 + github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= 36 + github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= 27 37 github.com/earthboundkid/versioninfo/v2 v2.24.1 h1:SJTMHaoUx3GzjjnUO1QzP3ZXK6Ee/nbWyCm58eY3oUg= 28 38 github.com/earthboundkid/versioninfo/v2 v2.24.1/go.mod h1:VcWEooDEuyUJnMfbdTh0uFN4cfEIg+kHMuWB2CDCLjw= 29 39 github.com/ericvolp12/jwt-go-secp256k1 v0.0.2 h1:puGwrNTY2vCt8eakkSEq2yeNxUD3zb2kPhv1OsF1hPs= ··· 63 73 github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= 64 74 github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= 65 75 github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= 76 + github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q= 77 + github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= 66 78 github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= 67 79 github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= 68 80 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= ··· 136 148 github.com/ipfs/go-verifcid v0.0.3/go.mod h1:gcCtGniVzelKrbk9ooUSX/pM3xlH73fZZJDzQJRvOUw= 137 149 github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 h1:oFo19cBmcP0Cmg3XXbrr0V/c+xU9U1huEZp8+OgBzdI= 138 150 github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4/go.mod h1:6nkFF8OmR5wLKBzRKi7/YFJpyYR7+oEn1DX+mMWnlLA= 151 + github.com/ipld/go-car v0.6.2 h1:Hlnl3Awgnq8icK+ze3iRghk805lu8YNq3wlREDTF2qc= 152 + github.com/ipld/go-car v0.6.2/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8= 139 153 github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= 140 154 github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s= 141 155 github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E=
+4
readme.md
··· 29 29 ### `/cmd/backstream` 30 30 experimental backfiller that kinda (but not really) conforms to the jetstream event shape. designed to be ingested by consumers expecting jetstream 31 31 32 + ### `/cmd/aturilist` 33 + experimental listRecords replacement. is not backfilled. uses the official jetstream go client, which means it suffers from this [bug](https://github.com/bluesky-social/jetstream/pull/45) 34 + 32 35 ## Packages 33 36 34 37 ### `/auth` ··· 54 57 55 58 ## todo 56 59 60 + - clean up /cmd/appview/main.go , its a mess 57 61 - appview-side query caches 58 62 - notification service 59 63 - bookmarks service
+83 -16
shims/lex/app/bsky/feed/defs/embed.go
··· 41 41 return embedImage, nil 42 42 } 43 43 if feedPost.Embed.EmbedVideo != nil { 44 - return nil, nil 44 + //return nil, nil 45 + videocdn := "https://video.bsky.app" // todo move this 46 + embedVideo := EmbedVideoViewExtractor(ctx, aturi, feedPost.Embed.EmbedVideo, sl, cs, imgcdn, videocdn, viewer) 47 + return embedVideo, nil 45 48 //embedType = "EmbedVideo" 46 - return &appbsky.FeedDefs_PostView_Embed{ 47 - // EmbedImages_View *EmbedImages_View 48 - // EmbedVideo_View *EmbedVideo_View 49 - EmbedVideo_View: &appbsky.EmbedVideo_View{ 50 - // LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.embed.video#view"` 51 - // Alt *string `json:"alt,omitempty" cborgen:"alt,omitempty"` 52 - // AspectRatio *EmbedDefs_AspectRatio `json:"aspectRatio,omitempty" cborgen:"aspectRatio,omitempty"` 53 - // Cid string `json:"cid" cborgen:"cid"` 54 - // Playlist string `json:"playlist" cborgen:"playlist"` 55 - // Thumbnail *string `json:"thumbnail,omitempty" cborgen:"thumbnail,omitempty"` 56 - }, 57 - // EmbedExternal_View *EmbedExternal_View 58 - // EmbedRecord_View *EmbedRecord_View 59 - // EmbedRecordWithMedia_View *EmbedRecordWithMedia_View 60 - }, nil 49 + // return &appbsky.FeedDefs_PostView_Embed{ 50 + // // EmbedImages_View *EmbedImages_View 51 + // // EmbedVideo_View *EmbedVideo_View 52 + // EmbedVideo_View: &appbsky.EmbedVideo_View{ 53 + // // LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.embed.video#view"` 54 + // LexiconTypeID: "app.bsky.embed.video#view", 55 + // // Alt *string `json:"alt,omitempty" cborgen:"alt,omitempty"` 56 + // Alt: 57 + // // AspectRatio *EmbedDefs_AspectRatio `json:"aspectRatio,omitempty" cborgen:"aspectRatio,omitempty"` 58 + // // Cid string `json:"cid" cborgen:"cid"` 59 + // // Playlist string `json:"playlist" cborgen:"playlist"` 60 + // // Thumbnail *string `json:"thumbnail,omitempty" cborgen:"thumbnail,omitempty"` 61 + // }, 62 + // // EmbedExternal_View *EmbedExternal_View 63 + // // EmbedRecord_View *EmbedRecord_View 64 + // // EmbedRecordWithMedia_View *EmbedRecordWithMedia_View 65 + // }, nil 61 66 } 62 67 if feedPost.Embed.EmbedExternal != nil { 63 68 embedExternal := EmbedExternalViewExtractor(ctx, aturi, feedPost.Embed.EmbedExternal, sl, cs, imgcdn, viewer) ··· 156 161 } 157 162 } 158 163 if feedPost.Embed.EmbedRecordWithMedia.Media.EmbedVideo != nil { 164 + videocdn := "https://video.bsky.app" // todo move this 165 + embedVideo := EmbedVideoViewExtractor(ctx, aturi, feedPost.Embed.EmbedVideo, sl, cs, imgcdn, videocdn, viewer) 166 + if embedVideo != nil { 167 + embedmediaview = &appbsky.EmbedRecordWithMedia_View_Media{ 168 + // EmbedImages_View *EmbedImages_View 169 + // EmbedVideo_View *EmbedVideo_View 170 + EmbedVideo_View: embedVideo.EmbedVideo_View, 171 + // EmbedVideo_View: &appbsky.EmbedVideo_View{ 172 + 173 + // }, 174 + // EmbedExternal_View *EmbedExternal_View 175 + } 176 + } 159 177 // // video extractor 160 178 // embedmediaview = &appbsky.EmbedRecordWithMedia_View_Media{ 161 179 // // EmbedImages_View *EmbedImages_View ··· 242 260 243 261 } 244 262 263 + func EmbedVideoViewExtractor(ctx context.Context, aturi syntax.ATURI, embedVideo *appbsky.EmbedVideo, sl *microcosm.MicrocosmClient, cs *microcosm.MicrocosmClient, imgcdn string, videocdn string, viewer *utils.DID) *appbsky.FeedDefs_PostView_Embed { 264 + // u := utils.MakeImageCDN(utils.DID(aturi.Authority().String()), imgcdn, "feed_thumbnail", rawimg.Image.Ref.String()) 265 + // feed_thumbnail = u 266 + // uf := utils.MakeImageCDN(utils.DID(aturi.Authority().String()), imgcdn, "feed_fullsize", rawimg.Image.Ref.String()) 267 + // feed_fullsize = uf 268 + /* 269 + uri at://did:plc:mdjhvva6vlrswsj26cftjttd/app.bsky.feed.post/3m7lci6jy4k2m 270 + video cid "bafkreifqh5647m6rsmuxpajitmbjigkg5xdfl6p4v4losks76w77vvtau4" 271 + playlist "https://video.bsky.app/watch/did%3Aplc%3Amdjhvva6vlrswsj26cftjttd/bafkreifqh5647m6rsmuxpajitmbjigkg5xdfl6p4v4losks76w77vvtau4/playlist.m3u8" 272 + {videocdn}/watch/{uri encoded did}/{video cid}/playlist.m3u8 273 + thumbnail "https://video.bsky.app/watch/did%3Aplc%3Amdjhvva6vlrswsj26cftjttd/bafkreifqh5647m6rsmuxpajitmbjigkg5xdfl6p4v4losks76w77vvtau4/thumbnail.jpg" 274 + {videocdn}/watch/{uri encoded did}/{video cid}/thumbnail.jpg 275 + */ 276 + if embedVideo == nil || embedVideo.Video == nil { 277 + return nil 278 + } 279 + didstring := aturi.Authority().String() 280 + did := utils.DID(didstring) 281 + playlist := utils.MakeVideoCDN(did, videocdn, "playlist.m3u8", embedVideo.Video.Ref.String()) 282 + thumbnail := utils.MakeVideoCDN(did, videocdn, "thumbnail.jpg", embedVideo.Video.Ref.String()) 283 + return &appbsky.FeedDefs_PostView_Embed{ 284 + // EmbedImages_View *EmbedImages_View 285 + // EmbedVideo_View *EmbedVideo_View 286 + EmbedVideo_View: &appbsky.EmbedVideo_View{ 287 + // LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.embed.video#view"` 288 + LexiconTypeID: "app.bsky.embed.video#view", 289 + // Alt *string `json:"alt,omitempty" cborgen:"alt,omitempty"` 290 + Alt: embedVideo.Alt, 291 + // AspectRatio *EmbedDefs_AspectRatio `json:"aspectRatio,omitempty" cborgen:"aspectRatio,omitempty"` 292 + AspectRatio: embedVideo.AspectRatio, 293 + // Cid string `json:"cid" cborgen:"cid"` 294 + Cid: embedVideo.Video.Ref.String(), 295 + // Playlist string `json:"playlist" cborgen:"playlist"` 296 + Playlist: playlist, 297 + // Thumbnail *string `json:"thumbnail,omitempty" cborgen:"thumbnail,omitempty"` 298 + Thumbnail: &thumbnail, 299 + }, 300 + // EmbedExternal_View *EmbedExternal_View 301 + // EmbedRecord_View *EmbedRecord_View 302 + // EmbedRecordWithMedia_View *EmbedRecordWithMedia_View 303 + } 304 + 305 + } 306 + 245 307 func EmbedExternalViewExtractor(ctx context.Context, aturi syntax.ATURI, embedExternal *appbsky.EmbedExternal, sl *microcosm.MicrocosmClient, cs *microcosm.MicrocosmClient, imgcdn string, viewer *utils.DID) *appbsky.FeedDefs_PostView_Embed { 246 308 // todo: gif embeds needs special handling i think? maybe? 247 309 //return nil, nil ··· 326 388 } 327 389 if postView.Embed.EmbedVideo_View != nil { 328 390 //has = "video" 391 + embeds = []*appbsky.EmbedRecord_ViewRecord_Embeds_Elem{ 392 + { 393 + EmbedVideo_View: postView.Embed.EmbedVideo_View, 394 + }, 395 + } 329 396 } 330 397 if postView.Embed.EmbedExternal_View != nil { 331 398 embeds = []*appbsky.EmbedRecord_ViewRecord_Embeds_Elem{
+20
shims/lex/app/bsky/feed/defs/postview.go
··· 76 76 subj, ok := like[".subject.uri"] 77 77 if ok { 78 78 likeCount = int64(subj.Records) 79 + } else { 80 + likeCount = int64(0) 79 81 } 82 + } else { 83 + likeCount = int64(0) 80 84 } 81 85 } 82 86 if links != nil && ··· 86 90 subj, ok := like[".subject.uri"] 87 91 if ok { 88 92 repostCount = int64(subj.Records) 93 + } else { 94 + repostCount = int64(0) 89 95 } 96 + } else { 97 + repostCount = int64(0) 90 98 } 91 99 } 92 100 if links != nil && ··· 96 104 subj, ok := like[".reply.parent.uri"] 97 105 if ok { 98 106 replyCount = int64(subj.Records) 107 + } else { 108 + replyCount = int64(0) 99 109 } 110 + } else { 111 + replyCount = int64(0) 100 112 } 101 113 } 102 114 if links != nil && ··· 106 118 subj, ok := like[".embed.record.uri"] 107 119 if ok { 108 120 quoteCount_noEmbed = int64(subj.Records) 121 + } else { 122 + quoteCount_noEmbed = int64(0) 109 123 } 124 + } else { 125 + quoteCount_noEmbed = int64(0) 110 126 } 111 127 } 112 128 if links != nil && ··· 116 132 subj, ok := like[".embed.record.record.uri"] 117 133 if ok { 118 134 quoteCount_withEmbed = int64(subj.Records) 135 + } else { 136 + quoteCount_withEmbed = int64(0) 119 137 } 138 + } else { 139 + quoteCount_withEmbed = int64(0) 120 140 } 121 141 } 122 142 quoteCount = quoteCount_noEmbed + quoteCount_withEmbed
+21 -9
shims/lex/app/bsky/unspecced/getpostthreadv2/query.go
··· 243 243 depth int 244 244 } 245 245 246 - func HandleGetPostThreadV2V3(c *gin.Context, sl *microcosm.MicrocosmClient, cs *microcosm.MicrocosmClient, imgcdn string) { 246 + func HandleGetPostThreadV2V3(c *gin.Context, sl *microcosm.MicrocosmClient, cs *microcosm.MicrocosmClient, imgcdn string, existingGraph *ThreadGraph) *ThreadGraph { 247 247 ctx := c.Request.Context() 248 248 249 249 rawdid := c.GetString("user_did") ··· 258 258 threadAnchorURIraw := c.Query("anchor") 259 259 if threadAnchorURIraw == "" { 260 260 c.JSON(http.StatusBadRequest, gin.H{"error": "Missing feed param"}) 261 - return 261 + return existingGraph 262 262 } 263 263 264 264 // "Whether to include parents above the anchor. ··· 314 314 315 315 threadAnchorURI, err := syntax.ParseATURI(threadAnchorURIraw) 316 316 if err != nil { 317 - return 317 + return existingGraph 318 318 } 319 319 320 - threadGraph, err := ThreadGrapher(ctx, cs, sl, threadAnchorURI) 321 - if err != nil { 322 - c.JSON(http.StatusBadRequest, gin.H{"error": ("failed to graph the thread: " + err.Error())}) 323 - return 320 + var workingGraph *ThreadGraph 321 + 322 + if existingGraph != nil { 323 + workingGraph = existingGraph 324 + // update the existing graph to fit our needs in our subtree 325 + workingGraph.UpdateGraphTo(threadAnchorURI) 326 + } else { 327 + newGraph, err := ThreadGrapher(ctx, cs, sl, threadAnchorURI) 328 + if err != nil { 329 + c.JSON(http.StatusBadRequest, gin.H{"error": ("failed to graph the thread: " + err.Error())}) 330 + return nil 331 + } 332 + workingGraph = newGraph 324 333 } 334 + 325 335 var skeletonposts []SkeletonPost 326 336 327 337 // Parent Chain ··· 337 347 // root = current 338 348 break 339 349 } 340 - parent, ok := threadGraph.ParentsMap[current] 350 + parent, ok := workingGraph.ParentsMap[current] 341 351 if !ok { 342 352 // root = current 343 353 break ··· 361 371 362 372 // Tree Replies (with OP thread priority) 363 373 // should probably be recursive 364 - recursiveHandleV2V3TreeReplies(threadGraph, &skeletonposts, threadAnchorURI, &below, &branchingFactor, &sort, 0, 0) 374 + recursiveHandleV2V3TreeReplies(workingGraph, &skeletonposts, threadAnchorURI, &below, &branchingFactor, &sort, 0, 0) 365 375 366 376 //maplen := len(parentsMap) 367 377 concurrentResults := MapConcurrent( ··· 448 458 HasOtherReplies: false, 449 459 } 450 460 c.JSON(http.StatusOK, resp) 461 + 462 + return workingGraph 451 463 } 452 464 453 465 func flipArray[T any](s *[]T) {
+130 -25
shims/lex/app/bsky/unspecced/getpostthreadv2/threadgrapher.go
··· 107 107 } 108 108 109 109 processingQueue := append([]syntax.ATURI{rootURI}, allRepliesATURI...) 110 - for _, aturi := range processingQueue { 111 - //tg.Nodes = append(tg.Nodes, aturi) 112 - // graphinger 113 - emptystrarray := &[]string{} 114 - var localRepliesATURI []syntax.ATURI 115 - limit := 100 116 - var cursor *string 117 - shouldContinue := true 118 - for shouldContinue { 119 - results, err := constellation.GetBacklinks(ctx, cs, aturi.String(), "app.bsky.feed.post:reply.parent.uri", *emptystrarray, &limit, cursor) 120 - if err != nil { 121 - log.Println("[ThreadGrapher] [parent graphing] exit by no replies") 122 - return nil, fmt.Errorf("failed to get backlinks: %w", err) 123 - } 124 - if results.Records != nil { 125 - for _, record := range results.Records { 126 - aturi, err := syntax.ParseATURI("at://" + record.Did + "/" + record.Collection + "/" + record.Rkey) 127 - if err == nil { 128 - localRepliesATURI = append(localRepliesATURI, aturi) 110 + // for _, aturi := range processingQueue { 111 + // //tg.Nodes = append(tg.Nodes, aturi) 112 + // // graphinger 113 + // emptystrarray := &[]string{} 114 + // var localRepliesATURI []syntax.ATURI 115 + // limit := 100 116 + // var cursor *string 117 + // shouldContinue := true 118 + // for shouldContinue { 119 + // results, err := constellation.GetBacklinks(ctx, cs, aturi.String(), "app.bsky.feed.post:reply.parent.uri", *emptystrarray, &limit, cursor) 120 + // if err != nil { 121 + // log.Println("[ThreadGrapher] [parent graphing] exit by no replies") 122 + // return nil, fmt.Errorf("failed to get backlinks: %w", err) 123 + // } 124 + // if results.Records != nil { 125 + // for _, record := range results.Records { 126 + // aturi, err := syntax.ParseATURI("at://" + record.Did + "/" + record.Collection + "/" + record.Rkey) 127 + // if err == nil { 128 + // localRepliesATURI = append(localRepliesATURI, aturi) 129 + // } 130 + // } 131 + // } 132 + // if results.Cursor != nil { 133 + // cursor = results.Cursor 134 + // } else { 135 + // shouldContinue = false 136 + // } 137 + // } 138 + // for _, reply := range localRepliesATURI { 139 + // tg.ParentsMap[reply] = aturi 140 + // tg.ChildrenMap[aturi] = append(tg.ChildrenMap[aturi], reply) 141 + // } 142 + // } 143 + 144 + type concurrentStruct struct { 145 + aturi syntax.ATURI 146 + replies []syntax.ATURI 147 + } 148 + 149 + localRepliesATURIConcurrent := MapConcurrent( 150 + ctx, 151 + processingQueue, 152 + 50, 153 + func(ctx context.Context, aturi syntax.ATURI, idx int) (*concurrentStruct, error) { 154 + //tg.Nodes = append(tg.Nodes, aturi) 155 + // graphinger 156 + emptystrarray := &[]string{} 157 + var localRepliesATURI []syntax.ATURI 158 + limit := 100 159 + var cursor *string 160 + shouldContinue := true 161 + for shouldContinue { 162 + results, err := constellation.GetBacklinks(ctx, cs, aturi.String(), "app.bsky.feed.post:reply.parent.uri", *emptystrarray, &limit, cursor) 163 + if err != nil { 164 + log.Println("[ThreadGrapher] [parent graphing] exit by no replies") 165 + return nil, fmt.Errorf("failed to get backlinks: %w", err) 166 + } 167 + if results.Records != nil { 168 + for _, record := range results.Records { 169 + aturi, err := syntax.ParseATURI("at://" + record.Did + "/" + record.Collection + "/" + record.Rkey) 170 + if err == nil { 171 + localRepliesATURI = append(localRepliesATURI, aturi) 172 + } 129 173 } 130 174 } 175 + if results.Cursor != nil { 176 + cursor = results.Cursor 177 + } else { 178 + shouldContinue = false 179 + } 131 180 } 132 - if results.Cursor != nil { 133 - cursor = results.Cursor 134 - } else { 135 - shouldContinue = false 136 - } 181 + return &concurrentStruct{ 182 + aturi: aturi, 183 + replies: localRepliesATURI, 184 + }, nil 185 + }, 186 + ) 187 + 188 + localRepliesATURI := make([]*concurrentStruct, 0, len(localRepliesATURIConcurrent)) 189 + for _, r := range localRepliesATURIConcurrent { 190 + if /*r != nil &&*/ r.Err == nil && r.Value != nil /*&& r.Value.aturi != nil*/ && r.Value.replies != nil { 191 + localRepliesATURI = append(localRepliesATURI, r.Value) 137 192 } 138 - for _, reply := range localRepliesATURI { 193 + } 194 + for _, replyStruct := range localRepliesATURI { 195 + aturi := replyStruct.aturi 196 + for _, reply := range replyStruct.replies { 139 197 tg.ParentsMap[reply] = aturi 140 198 tg.ChildrenMap[aturi] = append(tg.ChildrenMap[aturi], reply) 141 199 } ··· 143 201 144 202 return tg, nil 145 203 } 204 + 205 + // ToBytes serializes the ThreadGraph into a JSON byte slice. 206 + // It acquires a Read Lock to ensure thread safety during serialization. 207 + func (g *ThreadGraph) ToBytes() ([]byte, error) { 208 + g.mu.RLock() 209 + defer g.mu.RUnlock() 210 + 211 + // sync.RWMutex is unexported, so json.Marshal will automatically skip it. 212 + // syntax.ATURI implements TextMarshaler, so it works as a map key automatically. 213 + return json.Marshal(g) 214 + } 215 + 216 + // ThreadGraphFromBytes deserializes a byte slice back into a ThreadGraph. 217 + // It ensures maps are initialized even if the JSON data was empty. 218 + func ThreadGraphFromBytes(data []byte) (*ThreadGraph, error) { 219 + // Initialize with NewThreadGraph to ensure maps are allocated 220 + tg := NewThreadGraph() 221 + 222 + if err := json.Unmarshal(data, tg); err != nil { 223 + return nil, fmt.Errorf("failed to deserialize ThreadGraph: %w", err) 224 + } 225 + 226 + // Safety check: specific to Go's JSON unmarshal behavior. 227 + // If the JSON contained "null" for the maps, they might be nil again. 228 + // We re-initialize them to avoid panics during concurrent writes later. 229 + if tg.ParentsMap == nil { 230 + tg.ParentsMap = make(map[syntax.ATURI]syntax.ATURI) 231 + } 232 + if tg.ChildrenMap == nil { 233 + tg.ChildrenMap = make(map[syntax.ATURI][]syntax.ATURI) 234 + } 235 + 236 + // The Mutex (tg.mu) is zero-valued (unlocked) by default, which is exactly what we want. 237 + return tg, nil 238 + } 239 + 240 + func (g *ThreadGraph) UpdateGraphTo(anchor syntax.ATURI) { 241 + // path from anchor to root never needs to be updated 242 + // all we need is to update all subtrees of the anchor 243 + // so we should first do a 244 + // recursiveHandleUpdateGraphTo(g, anchor) 245 + // i dont think we should do a recursive thing 246 + // it cant be optimized well, constellation queries will be sequential 247 + // how about, grab the entire tree again, prune branches not part of the tree 248 + // you will get a list of posts that are either new or a part of the subtree 249 + // 250 + }
+5
shims/utils/utils.go
··· 39 39 func MakeImageCDN(did DID, imgcdn string, kind string, cid string) string { 40 40 return imgcdn + "/img/" + kind + "/plain/" + string(did) + "/" + cid + "@jpeg" 41 41 } 42 + 43 + func MakeVideoCDN(did DID, videocdn string, kind string, cid string) string { 44 + //{videocdn}/watch/{uri encoded did}/{video cid}/thumbnail.jpg 45 + return videocdn + "/watch/" + string(did) + "/" + cid + "/" + kind 46 + }