+1
.gitignore
+1
.gitignore
···
1
+
cmd/aturilist/badger_data
+106
-2
cmd/appview/main.go
+106
-2
cmd/appview/main.go
···
16
16
17
17
did "github.com/whyrusleeping/go-did"
18
18
"tangled.org/whey.party/red-dwarf-server/auth"
19
+
aturilist "tangled.org/whey.party/red-dwarf-server/cmd/aturilist/client"
19
20
"tangled.org/whey.party/red-dwarf-server/microcosm/constellation"
20
21
"tangled.org/whey.party/red-dwarf-server/microcosm/slingshot"
21
22
appbskyactordefs "tangled.org/whey.party/red-dwarf-server/shims/lex/app/bsky/actor/defs"
···
44
45
SPACEDUST_URL string
45
46
SLINGSHOT_URL string
46
47
CONSTELLATION_URL string
48
+
ATURILIST_URL string
47
49
)
48
50
49
51
func initURLs(prod bool) {
···
52
54
SPACEDUST_URL = "wss://spacedust.whey.party/subscribe"
53
55
SLINGSHOT_URL = "https://slingshot.whey.party"
54
56
CONSTELLATION_URL = "https://constellation.whey.party"
57
+
ATURILIST_URL = "http://localhost:7155"
55
58
} else {
56
59
JETSTREAM_URL = "ws://localhost:6008/subscribe"
57
60
SPACEDUST_URL = "ws://localhost:9998/subscribe"
58
61
SLINGSHOT_URL = "http://localhost:7729"
59
62
CONSTELLATION_URL = "http://localhost:7728"
63
+
ATURILIST_URL = "http://localhost:7155"
60
64
}
61
65
}
62
66
···
68
72
)
69
73
70
74
func main() {
71
-
log.Println("red-dwarf-server started")
75
+
log.Println("red-dwarf-server AppView Service started")
72
76
prod := flag.Bool("prod", false, "use production URLs instead of localhost")
73
77
flag.Parse()
74
78
···
78
82
mailbox := sticket.New()
79
83
sl := slingshot.NewSlingshot(SLINGSHOT_URL)
80
84
cs := constellation.NewConstellation(CONSTELLATION_URL)
85
+
al := aturilist.NewClient(ATURILIST_URL)
81
86
// spacedust is type definitions only
82
87
// jetstream types is probably available from jetstream/pkg/models
83
88
···
86
91
router_raw := gin.New()
87
92
router_raw.Use(gin.Logger())
88
93
router_raw.Use(gin.Recovery())
89
-
router_raw.Use(cors.Default())
94
+
//router_raw.Use(cors.Default())
95
+
router_raw.Use(cors.New(cors.Config{
96
+
AllowAllOrigins: true,
97
+
AllowMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"},
98
+
// You must explicitly allow the custom ATProto headers here
99
+
AllowHeaders: []string{
100
+
"Origin",
101
+
"Content-Length",
102
+
"Content-Type",
103
+
"Authorization",
104
+
"Accept",
105
+
"Accept-Language",
106
+
"atproto-accept-labelers", // <--- The specific fix for your error
107
+
"atproto-proxy", // Good to have for future compatibility
108
+
},
109
+
ExposeHeaders: []string{"Content-Length", "Link"},
110
+
AllowCredentials: true,
111
+
MaxAge: 12 * time.Hour,
112
+
}))
90
113
91
114
router_raw.GET("/.well-known/did.json", GetWellKnownDID)
92
115
···
652
675
// if err == nil && kvkey != "" {
653
676
// kv.Set(kvkey, bytes, 1*time.Minute)
654
677
// }
678
+
})
679
+
680
+
router.GET("/xrpc/app.bsky.feed.getAuthorFeed",
681
+
func(c *gin.Context) {
682
+
683
+
rawdid := c.GetString("user_did")
684
+
log.Println("getFeed router_unsafe user_did: " + rawdid)
685
+
var viewer *utils.DID
686
+
didval, errdid := utils.NewDID(rawdid)
687
+
if errdid != nil {
688
+
viewer = nil
689
+
} else {
690
+
viewer = &didval
691
+
}
692
+
693
+
actorDidParam := c.Query("actor")
694
+
if actorDidParam == "" {
695
+
c.JSON(http.StatusBadRequest, gin.H{"error": "Missing actor param"})
696
+
return
697
+
}
698
+
cursorRawParam := c.Query("cursor")
699
+
700
+
listResp, err := al.ListRecords(ctx, actorDidParam, "app.bsky.feed.post", cursorRawParam, true)
701
+
if err != nil {
702
+
log.Fatalf("Failed to list: %v", err)
703
+
}
704
+
705
+
concurrentResults := MapConcurrent(
706
+
ctx,
707
+
listResp.Aturis,
708
+
20,
709
+
func(ctx context.Context, raw string, idx int) (*appbsky.FeedDefs_FeedViewPost, error) {
710
+
post, _, err := appbskyfeeddefs.PostView(ctx, raw, sl, cs, BSKYIMAGECDN_URL, viewer, 2)
711
+
if err != nil {
712
+
return nil, err
713
+
}
714
+
if post == nil {
715
+
return nil, fmt.Errorf("post not found")
716
+
}
717
+
718
+
return &appbsky.FeedDefs_FeedViewPost{
719
+
// FeedContext *string `json:"feedContext,omitempty" cborgen:"feedContext,omitempty"`
720
+
// Post *FeedDefs_PostView `json:"post" cborgen:"post"`
721
+
Post: post,
722
+
// Reason *FeedDefs_FeedViewPost_Reason `json:"reason,omitempty" cborgen:"reason,omitempty"`
723
+
// Reason: &appbsky.FeedDefs_FeedViewPost_Reason{
724
+
// // FeedDefs_ReasonRepost *FeedDefs_ReasonRepost
725
+
// FeedDefs_ReasonRepost: &appbsky.FeedDefs_ReasonRepost{
726
+
// // LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.feed.defs#reasonRepost"`
727
+
// LexiconTypeID: "app.bsky.feed.defs#reasonRepost",
728
+
// // By *ActorDefs_ProfileViewBasic `json:"by" cborgen:"by"`
729
+
// // Cid *string `json:"cid,omitempty" cborgen:"cid,omitempty"`
730
+
// // IndexedAt string `json:"indexedAt" cborgen:"indexedAt"`
731
+
// // Uri *string `json:"uri,omitempty" cborgen:"uri,omitempty"`
732
+
// Uri: &raw.Reason.FeedDefs_SkeletonReasonRepost.Repost,
733
+
// },
734
+
// // FeedDefs_ReasonPin *FeedDefs_ReasonPin
735
+
// FeedDefs_ReasonPin: &appbsky.FeedDefs_ReasonPin{
736
+
// // LexiconTypeID string `json:"$type" cborgen:"$type,const=app.bsky.feed.defs#reasonPin"`
737
+
// LexiconTypeID: "app.bsky.feed.defs#reasonPin",
738
+
// },
739
+
// },
740
+
// Reply *FeedDefs_ReplyRef `json:"reply,omitempty" cborgen:"reply,omitempty"`
741
+
// // reqId: Unique identifier per request that may be passed back alongside interactions.
742
+
// ReqId *string `json:"reqId,omitempty" cborgen:"reqId,omitempty"`
743
+
}, nil
744
+
},
745
+
)
746
+
747
+
// build final slice
748
+
out := make([]*appbsky.FeedDefs_FeedViewPost, 0, len(concurrentResults))
749
+
for _, r := range concurrentResults {
750
+
if r.Err == nil && r.Value != nil && r.Value.Post != nil {
751
+
out = append(out, r.Value)
752
+
}
753
+
}
754
+
755
+
c.JSON(http.StatusOK, &appbsky.FeedGetAuthorFeed_Output{
756
+
Cursor: &listResp.Cursor,
757
+
Feed: out,
758
+
})
655
759
})
656
760
657
761
// weird stuff
+208
cmd/aturilist/client/client.go
+208
cmd/aturilist/client/client.go
···
1
+
package aturilist
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"encoding/json"
7
+
"fmt"
8
+
"io"
9
+
"net/http"
10
+
"net/url"
11
+
"time"
12
+
)
13
+
14
+
// Constants for the XRPC methods
15
+
const (
16
+
MethodListRecords = "app.reddwarf.aturilist.listRecords"
17
+
MethodCountRecords = "app.reddwarf.aturilist.countRecords"
18
+
MethodIndexRecord = "app.reddwarf.aturilist.indexRecord"
19
+
MethodValidateRecord = "app.reddwarf.aturilist.validateRecord"
20
+
DefaultProductionHost = "https://aturilist.reddwarf.app"
21
+
)
22
+
23
+
// Client is the API client for the Red Dwarf AtURI List Service.
24
+
type Client struct {
25
+
Host string
26
+
HTTPClient *http.Client
27
+
// AuthToken is the JWT used for the Authorization header
28
+
AuthToken string
29
+
}
30
+
31
+
// NewClient creates a new client. If host is empty, it defaults to production.
32
+
func NewClient(host string) *Client {
33
+
if host == "" {
34
+
host = DefaultProductionHost
35
+
}
36
+
return &Client{
37
+
Host: host,
38
+
HTTPClient: &http.Client{
39
+
Timeout: 10 * time.Second,
40
+
},
41
+
}
42
+
}
43
+
44
+
// --- Response Models ---
45
+
46
+
type ListRecordsResponse struct {
47
+
Aturis []string `json:"aturis"`
48
+
Count int `json:"count"`
49
+
Cursor string `json:"cursor,omitempty"`
50
+
}
51
+
52
+
type CountRecordsResponse struct {
53
+
Repo string `json:"repo"`
54
+
Collection string `json:"collection"`
55
+
Count int `json:"count"`
56
+
}
57
+
58
+
type ErrorResponse struct {
59
+
Error string `json:"error"`
60
+
}
61
+
62
+
// --- Request Models ---
63
+
64
+
type RecordRequest struct {
65
+
Repo string `json:"repo"`
66
+
Collection string `json:"collection"`
67
+
RKey string `json:"rkey"`
68
+
}
69
+
70
+
// --- Methods ---
71
+
72
+
// ListRecords retrieves a list of AT URIs.
73
+
// Set reverse=true to get newest records first.
74
+
func (c *Client) ListRecords(ctx context.Context, repo, collection, cursor string, reverse bool) (*ListRecordsResponse, error) {
75
+
params := url.Values{}
76
+
params.Set("repo", repo)
77
+
params.Set("collection", collection)
78
+
79
+
if cursor != "" {
80
+
params.Set("cursor", cursor)
81
+
}
82
+
83
+
if reverse {
84
+
params.Set("reverse", "true")
85
+
}
86
+
87
+
var resp ListRecordsResponse
88
+
if err := c.doRequest(ctx, http.MethodGet, MethodListRecords, params, nil, &resp); err != nil {
89
+
return nil, err
90
+
}
91
+
92
+
return &resp, nil
93
+
}
94
+
95
+
// CountRecords returns the total number of records indexed for a collection.
96
+
func (c *Client) CountRecords(ctx context.Context, repo, collection string) (*CountRecordsResponse, error) {
97
+
params := url.Values{}
98
+
params.Set("repo", repo)
99
+
params.Set("collection", collection)
100
+
101
+
var resp CountRecordsResponse
102
+
if err := c.doRequest(ctx, http.MethodGet, MethodCountRecords, params, nil, &resp); err != nil {
103
+
return nil, err
104
+
}
105
+
106
+
return &resp, nil
107
+
}
108
+
109
+
// IndexRecord triggers a manual index of a specific record.
110
+
// This endpoint is rate-limited on the server.
111
+
func (c *Client) IndexRecord(ctx context.Context, repo, collection, rkey string) error {
112
+
reqBody := RecordRequest{
113
+
Repo: repo,
114
+
Collection: collection,
115
+
RKey: rkey,
116
+
}
117
+
118
+
// Server returns 200 OK on success, body is empty or status only.
119
+
return c.doRequest(ctx, http.MethodPost, MethodIndexRecord, nil, reqBody, nil)
120
+
}
121
+
122
+
// ValidateRecord checks if a specific record exists in the local DB.
123
+
// Returns true if exists, false if 404, error otherwise.
124
+
func (c *Client) ValidateRecord(ctx context.Context, repo, collection, rkey string) (bool, error) {
125
+
reqBody := RecordRequest{
126
+
Repo: repo,
127
+
Collection: collection,
128
+
RKey: rkey,
129
+
}
130
+
131
+
err := c.doRequest(ctx, http.MethodPost, MethodValidateRecord, nil, reqBody, nil)
132
+
if err != nil {
133
+
// Parse standard error to see if it was a 404
134
+
if clientErr, ok := err.(*ClientError); ok && clientErr.StatusCode == 404 {
135
+
return false, nil
136
+
}
137
+
return false, err
138
+
}
139
+
140
+
return true, nil
141
+
}
142
+
143
+
// --- Internal Helpers ---
144
+
145
+
type ClientError struct {
146
+
StatusCode int
147
+
Message string
148
+
}
149
+
150
+
func (e *ClientError) Error() string {
151
+
return fmt.Sprintf("api error (status %d): %s", e.StatusCode, e.Message)
152
+
}
153
+
154
+
func (c *Client) doRequest(ctx context.Context, method, xrpcMethod string, params url.Values, body interface{}, dest interface{}) error {
155
+
u, err := url.Parse(fmt.Sprintf("%s/xrpc/%s", c.Host, xrpcMethod))
156
+
if err != nil {
157
+
return fmt.Errorf("invalid url: %w", err)
158
+
}
159
+
160
+
if len(params) > 0 {
161
+
u.RawQuery = params.Encode()
162
+
}
163
+
164
+
var bodyReader io.Reader
165
+
if body != nil {
166
+
jsonBytes, err := json.Marshal(body)
167
+
if err != nil {
168
+
return fmt.Errorf("failed to marshal body: %w", err)
169
+
}
170
+
bodyReader = bytes.NewBuffer(jsonBytes)
171
+
}
172
+
173
+
req, err := http.NewRequestWithContext(ctx, method, u.String(), bodyReader)
174
+
if err != nil {
175
+
return fmt.Errorf("failed to create request: %w", err)
176
+
}
177
+
178
+
req.Header.Set("Content-Type", "application/json")
179
+
if c.AuthToken != "" {
180
+
req.Header.Set("Authorization", "Bearer "+c.AuthToken)
181
+
}
182
+
183
+
resp, err := c.HTTPClient.Do(req)
184
+
if err != nil {
185
+
return fmt.Errorf("request failed: %w", err)
186
+
}
187
+
defer resp.Body.Close()
188
+
189
+
// Handle non-200 responses
190
+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
191
+
var errResp ErrorResponse
192
+
// Try to decode server error message
193
+
if decodeErr := json.NewDecoder(resp.Body).Decode(&errResp); decodeErr == nil && errResp.Error != "" {
194
+
return &ClientError{StatusCode: resp.StatusCode, Message: errResp.Error}
195
+
}
196
+
// Fallback if JSON decode fails or empty
197
+
return &ClientError{StatusCode: resp.StatusCode, Message: resp.Status}
198
+
}
199
+
200
+
// Decode response if destination provided
201
+
if dest != nil {
202
+
if err := json.NewDecoder(resp.Body).Decode(dest); err != nil {
203
+
return fmt.Errorf("failed to decode response: %w", err)
204
+
}
205
+
}
206
+
207
+
return nil
208
+
}
+424
cmd/aturilist/main.go
+424
cmd/aturilist/main.go
···
1
+
package main
2
+
3
+
import (
4
+
"context"
5
+
"errors"
6
+
"flag"
7
+
"fmt"
8
+
"log"
9
+
"log/slog"
10
+
"os"
11
+
"strings"
12
+
"sync"
13
+
"time"
14
+
15
+
"github.com/bluesky-social/indigo/api/agnostic"
16
+
"github.com/bluesky-social/indigo/atproto/syntax"
17
+
"github.com/bluesky-social/jetstream/pkg/client"
18
+
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
19
+
"github.com/bluesky-social/jetstream/pkg/models"
20
+
"github.com/dgraph-io/badger/v4"
21
+
"github.com/gin-gonic/gin"
22
+
23
+
// Restored your specific imports
24
+
"tangled.org/whey.party/red-dwarf-server/auth"
25
+
"tangled.org/whey.party/red-dwarf-server/microcosm"
26
+
"tangled.org/whey.party/red-dwarf-server/microcosm/slingshot"
27
+
)
28
+
29
+
type Server struct {
30
+
db *badger.DB
31
+
logger *slog.Logger
32
+
33
+
// Locks for specific operations if needed, though Badger is thread-safe
34
+
backfillTracker map[string]*sync.WaitGroup
35
+
backfillMutex sync.Mutex
36
+
}
37
+
38
+
var (
39
+
JETSTREAM_URL string
40
+
SPACEDUST_URL string
41
+
SLINGSHOT_URL string
42
+
CONSTELLATION_URL string
43
+
)
44
+
45
+
func initURLs(prod bool) {
46
+
if !prod {
47
+
JETSTREAM_URL = "wss://jetstream.whey.party/subscribe"
48
+
SPACEDUST_URL = "wss://spacedust.whey.party/subscribe"
49
+
SLINGSHOT_URL = "https://slingshot.whey.party"
50
+
CONSTELLATION_URL = "https://constellation.whey.party"
51
+
} else {
52
+
JETSTREAM_URL = "ws://localhost:6008/subscribe"
53
+
SPACEDUST_URL = "ws://localhost:9998/subscribe"
54
+
SLINGSHOT_URL = "http://localhost:7729"
55
+
CONSTELLATION_URL = "http://localhost:7728"
56
+
}
57
+
}
58
+
59
+
const (
60
+
BSKYIMAGECDN_URL = "https://cdn.bsky.app"
61
+
BSKYVIDEOCDN_URL = "https://video.bsky.app"
62
+
serviceWebDID = "did:web:aturilist.reddwarf.app"
63
+
serviceWebHost = "https://aturilist.reddwarf.app"
64
+
)
65
+
66
+
func main() {
67
+
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
68
+
log.Println("red-dwarf-server AtURI List Service started")
69
+
70
+
prod := flag.Bool("prod", false, "use production URLs instead of localhost")
71
+
dbPath := flag.String("db", "./badger_data", "path to badger db")
72
+
flag.Parse()
73
+
74
+
initURLs(*prod)
75
+
76
+
// 1. Initialize DB
77
+
db, err := badger.Open(badger.DefaultOptions(*dbPath))
78
+
if err != nil {
79
+
logger.Error("Failed to open BadgerDB", "error", err)
80
+
os.Exit(1)
81
+
}
82
+
defer db.Close()
83
+
84
+
srv := &Server{
85
+
db: db,
86
+
logger: logger,
87
+
}
88
+
89
+
// 2. Initialize Auth
90
+
auther, err := auth.NewAuth(
91
+
100_000,
92
+
time.Hour*12,
93
+
5,
94
+
serviceWebDID, //+"#bsky_appview",
95
+
)
96
+
if err != nil {
97
+
log.Fatalf("Failed to create Auth: %v", err)
98
+
}
99
+
100
+
// 3. Initialize Clients
101
+
ctx := context.Background()
102
+
sl := slingshot.NewSlingshot(SLINGSHOT_URL)
103
+
104
+
// 4. Initialize Jetstream
105
+
config := client.DefaultClientConfig()
106
+
config.WebsocketURL = JETSTREAM_URL
107
+
config.Compress = true
108
+
109
+
handler := &JetstreamHandler{srv: srv}
110
+
scheduler := sequential.NewScheduler("my_app", logger, handler.HandleEvent)
111
+
112
+
c, err := client.NewClient(config, logger, scheduler)
113
+
if err != nil {
114
+
logger.Error("failed to create client", "error", err)
115
+
return
116
+
}
117
+
118
+
// Connect with cursor (5 minutes ago)
119
+
cursor := time.Now().Add(-5 * time.Minute).UnixMicro()
120
+
121
+
go func() {
122
+
logger.Info("Connecting to Jetstream...")
123
+
/*
124
+
If you resume a jetstream firehose from a cursor, everything works fine until you catch up to real time.
125
+
At that point, the connection drops. If you connect without a cursor (going straight to realtime), it keeps working.
126
+
*/
127
+
for {
128
+
if err := c.ConnectAndRead(ctx, &cursor); err != nil {
129
+
logger.Error("jetstream connection disconnected", "error", err)
130
+
}
131
+
132
+
select {
133
+
case <-ctx.Done():
134
+
return // Context cancelled, exit loop
135
+
default:
136
+
logger.Info("Reconnecting to Jetstream in 5 seconds...", "cursor", cursor)
137
+
time.Sleep(5 * time.Second)
138
+
}
139
+
}
140
+
}()
141
+
142
+
// 5. Initialize Router
143
+
router := gin.New()
144
+
router.Use(auther.AuthenticateGinRequestViaJWT)
145
+
146
+
router.GET("/xrpc/app.reddwarf.aturilist.listRecords", srv.handleListRecords)
147
+
148
+
router.GET("/xrpc/app.reddwarf.aturilist.countRecords", srv.handleCountRecords)
149
+
150
+
// heavily rate limited because can be used for spam.
151
+
router.POST("/xrpc/app.reddwarf.aturilist.indexRecord", func(c *gin.Context) {
152
+
srv.handleIndexRecord(c, sl)
153
+
})
154
+
155
+
router.POST("/xrpc/app.reddwarf.aturilist.validateRecord", srv.handleValidateRecord)
156
+
157
+
// router.GET("/xrpc/app.reddwarf.aturilist.requestBackfill", )
158
+
159
+
router.Run(":7155")
160
+
}
161
+
162
+
// --- Jetstream Handler ---
163
+
164
+
type JetstreamHandler struct {
165
+
srv *Server
166
+
}
167
+
168
+
func (h *JetstreamHandler) HandleEvent(ctx context.Context, event *models.Event) error {
169
+
if event != nil {
170
+
if event.Commit != nil {
171
+
// Identify Delete operation
172
+
isDelete := event.Commit.Operation == models.CommitOperationDelete
173
+
174
+
// Process
175
+
h.srv.processRecord(event.Did, event.Commit.Collection, event.Commit.RKey, isDelete)
176
+
177
+
}
178
+
}
179
+
return nil
180
+
}
181
+
182
+
// --- DB Helpers ---
183
+
184
+
func makeKey(repo, collection, rkey string) []byte {
185
+
return []byte(fmt.Sprintf("%s|%s|%s", repo, collection, rkey))
186
+
}
187
+
188
+
func parseKey(key []byte) (repo, collection, rkey string, err error) {
189
+
parts := strings.Split(string(key), "|")
190
+
if len(parts) != 3 {
191
+
return "", "", "", errors.New("invalid key format")
192
+
}
193
+
return parts[0], parts[1], parts[2], nil
194
+
}
195
+
196
+
// processRecord handles the DB write/delete.
197
+
// isDelete=true removes the key. isDelete=false sets the key.
198
+
func (s *Server) processRecord(repo, collection, rkey string, isDelete bool) {
199
+
key := makeKey(repo, collection, rkey)
200
+
201
+
err := s.db.Update(func(txn *badger.Txn) error {
202
+
if isDelete {
203
+
return txn.Delete(key)
204
+
}
205
+
// On create/update, store current timestamp.
206
+
// You can store more data (Cid, etc) here if needed later.
207
+
return txn.Set(key, []byte(time.Now().Format(time.RFC3339)))
208
+
})
209
+
210
+
if err != nil {
211
+
s.logger.Error("Failed to update DB", "repo", repo, "rkey", rkey, "err", err)
212
+
}
213
+
}
214
+
215
+
// --- HTTP Handlers ---
216
+
217
+
func (s *Server) handleListRecords(c *gin.Context) {
218
+
repo := c.Query("repo")
219
+
collection := c.Query("collection")
220
+
cursor := c.Query("cursor")
221
+
reverse := c.Query("reverse") == "true" // 1. Check param
222
+
limit := 50
223
+
224
+
if repo == "" || collection == "" {
225
+
c.JSON(400, gin.H{"error": "repo and collection required"})
226
+
return
227
+
}
228
+
229
+
// Base prefix: "repo|collection|"
230
+
prefixStr := fmt.Sprintf("%s|%s|", repo, collection)
231
+
prefix := []byte(prefixStr)
232
+
233
+
var aturis []string
234
+
var lastRkey string
235
+
236
+
err := s.db.View(func(txn *badger.Txn) error {
237
+
// 2. Configure Iterator Options
238
+
opts := badger.DefaultIteratorOptions
239
+
opts.PrefetchValues = false
240
+
opts.Reverse = reverse // Set reverse mode
241
+
242
+
it := txn.NewIterator(opts)
243
+
defer it.Close()
244
+
245
+
// 3. Determine Start Key
246
+
var startKey []byte
247
+
if cursor != "" {
248
+
// If cursor exists, we seek to it regardless of direction
249
+
startKey = makeKey(repo, collection, cursor)
250
+
} else {
251
+
if reverse {
252
+
// REVERSE START: "repo|collection|" + 0xFF
253
+
// This seeks to the theoretical end of this prefix range
254
+
startKey = append([]byte(prefixStr), 0xFF)
255
+
} else {
256
+
// FORWARD START: "repo|collection|"
257
+
startKey = prefix
258
+
}
259
+
}
260
+
261
+
// 4. Seek and Iterate
262
+
it.Seek(startKey)
263
+
264
+
// Handle Cursor Pagination Skip
265
+
// If we provided a cursor, we likely landed exactly ON that cursor.
266
+
// We want the record *after* (or *before* in reverse) the cursor.
267
+
if cursor != "" && it.Valid() {
268
+
// Badger's Seek moves to key >= seek_key (even in reverse mode logic varies slightly,
269
+
// but practically we check if we landed on the exact cursor).
270
+
if string(it.Item().Key()) == string(startKey) {
271
+
it.Next() // Skip the cursor itself
272
+
}
273
+
}
274
+
275
+
// Iterate as long as the key still starts with our prefix
276
+
for ; it.ValidForPrefix(prefix); it.Next() {
277
+
if len(aturis) >= limit {
278
+
break
279
+
}
280
+
item := it.Item()
281
+
k := item.Key()
282
+
_, _, rkey, err := parseKey(k)
283
+
if err == nil {
284
+
aturis = append(aturis, fmt.Sprintf("at://%s/%s/%s", repo, collection, rkey))
285
+
lastRkey = rkey
286
+
}
287
+
}
288
+
return nil
289
+
})
290
+
291
+
if err != nil {
292
+
c.JSON(500, gin.H{"error": err.Error()})
293
+
return
294
+
}
295
+
296
+
resp := gin.H{
297
+
"aturis": aturis,
298
+
"count": len(aturis),
299
+
}
300
+
301
+
// Only return cursor if we hit the limit, allowing the client to request the next page
302
+
if lastRkey != "" && len(aturis) == limit {
303
+
resp["cursor"] = lastRkey
304
+
}
305
+
306
+
c.JSON(200, resp)
307
+
}
308
+
309
+
func (s *Server) handleCountRecords(c *gin.Context) {
310
+
repo := c.Query("repo")
311
+
collection := c.Query("collection")
312
+
313
+
if repo == "" || collection == "" {
314
+
c.JSON(400, gin.H{"error": "repo and collection required"})
315
+
return
316
+
}
317
+
318
+
prefix := []byte(fmt.Sprintf("%s|%s|", repo, collection))
319
+
count := 0
320
+
321
+
err := s.db.View(func(txn *badger.Txn) error {
322
+
opts := badger.DefaultIteratorOptions
323
+
opts.PrefetchValues = false
324
+
it := txn.NewIterator(opts)
325
+
defer it.Close()
326
+
327
+
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
328
+
count++
329
+
}
330
+
return nil
331
+
})
332
+
333
+
if err != nil {
334
+
c.JSON(500, gin.H{"error": err.Error()})
335
+
return
336
+
}
337
+
338
+
c.JSON(200, gin.H{
339
+
"repo": repo,
340
+
"collection": collection,
341
+
"count": count,
342
+
})
343
+
}
344
+
345
+
// handleIndexRecord now takes the Slingshot client specifically
346
+
func (s *Server) handleIndexRecord(c *gin.Context, sl *microcosm.MicrocosmClient) {
347
+
//authedUserDid := c.GetString("user_did")
348
+
// Support JSON body preferentially, fallback to Query/Form
349
+
var req struct {
350
+
Collection string `json:"collection"`
351
+
Repo string `json:"repo"`
352
+
RKey string `json:"rkey"`
353
+
}
354
+
355
+
if err := c.BindJSON(&req); err != nil {
356
+
req.Collection = c.PostForm("collection")
357
+
req.Repo = c.PostForm("repo")
358
+
req.RKey = c.PostForm("rkey")
359
+
}
360
+
361
+
if req.Collection == "" || req.Repo == "" || req.RKey == "" {
362
+
c.JSON(400, gin.H{"error": "invalid parameters"})
363
+
return
364
+
}
365
+
366
+
// Verify existence using Slingshot/Agnostic
367
+
recordResponse, err := agnostic.RepoGetRecord(c.Request.Context(), sl, "", req.Collection, req.Repo, req.RKey)
368
+
if err != nil {
369
+
// Does not exist remotely -> Delete locally
370
+
s.processRecord(req.Repo, req.Collection, req.RKey, true)
371
+
372
+
// You might want to return 200 even if deleted, to confirm "indexing done"
373
+
c.Status(200)
374
+
return
375
+
}
376
+
377
+
// Exists remotely -> Parse and Insert locally
378
+
uri := recordResponse.Uri
379
+
aturi, err := syntax.ParseATURI(uri)
380
+
if err != nil {
381
+
c.JSON(400, gin.H{"error": "failed to parse aturi from remote"})
382
+
return
383
+
}
384
+
385
+
s.processRecord(aturi.Authority().String(), string(aturi.Collection()), string(aturi.RecordKey()), false)
386
+
c.Status(200)
387
+
}
388
+
389
+
func (s *Server) handleValidateRecord(c *gin.Context) {
390
+
var req struct {
391
+
Collection string `json:"collection"`
392
+
Repo string `json:"repo"`
393
+
RKey string `json:"rkey"`
394
+
}
395
+
if err := c.BindJSON(&req); err != nil {
396
+
c.JSON(400, gin.H{"error": "invalid json"})
397
+
return
398
+
}
399
+
400
+
key := makeKey(req.Repo, req.Collection, req.RKey)
401
+
exists := false
402
+
403
+
err := s.db.View(func(txn *badger.Txn) error {
404
+
_, err := txn.Get(key)
405
+
if err == nil {
406
+
exists = true
407
+
} else if err == badger.ErrKeyNotFound {
408
+
exists = false
409
+
return nil
410
+
}
411
+
return err
412
+
})
413
+
414
+
if err != nil {
415
+
c.JSON(500, gin.H{"error": err.Error()})
416
+
return
417
+
}
418
+
419
+
if exists {
420
+
c.Status(200)
421
+
} else {
422
+
c.Status(404)
423
+
}
424
+
}
+7
-2
go.mod
+7
-2
go.mod
···
3
3
go 1.25.4
4
4
5
5
require (
6
-
github.com/bluesky-social/indigo v0.0.0-20251202051123-81f317e322bc
6
+
github.com/bluesky-social/indigo v0.0.0-20251206005924-d49b45419635
7
7
github.com/ericvolp12/jwt-go-secp256k1 v0.0.2
8
8
github.com/gin-contrib/cors v1.7.6
9
9
github.com/gin-gonic/gin v1.11.0
···
21
21
22
22
require (
23
23
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
24
+
github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect
25
+
github.com/dustin/go-humanize v1.0.1 // indirect
24
26
github.com/gogo/protobuf v1.3.2 // indirect
27
+
github.com/google/flatbuffers v25.2.10+incompatible // indirect
25
28
github.com/hashicorp/golang-lru v1.0.2 // indirect
26
29
github.com/ipfs/bbloom v0.0.4 // indirect
27
30
github.com/ipfs/go-block-format v0.2.0 // indirect
···
39
42
github.com/ipfs/go-merkledag v0.11.0 // indirect
40
43
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
41
44
github.com/ipfs/go-verifcid v0.0.3 // indirect
42
-
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 // indirect
45
+
github.com/ipld/go-car v0.6.2 // indirect
43
46
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
44
47
github.com/ipld/go-ipld-prime v0.21.0 // indirect
45
48
github.com/jbenet/goprocess v0.1.4 // indirect
···
59
62
60
63
require (
61
64
github.com/beorn7/perks v1.0.1 // indirect
65
+
github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1
62
66
github.com/bytedance/sonic v1.14.0 // indirect
63
67
github.com/bytedance/sonic/loader v0.3.0 // indirect
64
68
github.com/cespare/xxhash/v2 v2.3.0 // indirect
65
69
github.com/cloudwego/base64x v0.1.6 // indirect
70
+
github.com/dgraph-io/badger/v4 v4.8.0
66
71
github.com/earthboundkid/versioninfo/v2 v2.24.1 // indirect
67
72
github.com/felixge/httpsnoop v1.0.4 // indirect
68
73
github.com/gabriel-vasile/mimetype v1.4.9 // indirect
+14
go.sum
+14
go.sum
···
6
6
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
7
7
github.com/bluesky-social/indigo v0.0.0-20251202051123-81f317e322bc h1:2t+uAvfzJiCsTMwn5fW85t/IGa0+2I7BXS2ORastK4o=
8
8
github.com/bluesky-social/indigo v0.0.0-20251202051123-81f317e322bc/go.mod h1:Pm2I1+iDXn/hLbF7XCg/DsZi6uDCiOo7hZGWprSM7k0=
9
+
github.com/bluesky-social/indigo v0.0.0-20251206005924-d49b45419635 h1:kNeRrgGJH2g5OvjLqtaQ744YXqduliZYpFkJ/ld47c0=
10
+
github.com/bluesky-social/indigo v0.0.0-20251206005924-d49b45419635/go.mod h1:Pm2I1+iDXn/hLbF7XCg/DsZi6uDCiOo7hZGWprSM7k0=
11
+
github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 h1:ovcRKN1iXZnY5WApVg+0Hw2RkwMH0ziA7lSAA8vellU=
12
+
github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1/go.mod h1:5PtGi4r/PjEVBBl+0xWuQn4mBEjr9h6xsfDBADS6cHs=
9
13
github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ=
10
14
github.com/bytedance/sonic v1.14.0/go.mod h1:WoEbx8WTcFJfzCe0hbmyTGrfjt8PzNEBdxlNUO24NhA=
11
15
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=
···
24
28
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
25
29
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc=
26
30
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40=
31
+
github.com/dgraph-io/badger/v4 v4.8.0 h1:JYph1ChBijCw8SLeybvPINizbDKWZ5n/GYbz2yhN/bs=
32
+
github.com/dgraph-io/badger/v4 v4.8.0/go.mod h1:U6on6e8k/RTbUWxqKR0MvugJuVmkxSNc79ap4917h4w=
33
+
github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINAEJdWGOM=
34
+
github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI=
35
+
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
36
+
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
27
37
github.com/earthboundkid/versioninfo/v2 v2.24.1 h1:SJTMHaoUx3GzjjnUO1QzP3ZXK6Ee/nbWyCm58eY3oUg=
28
38
github.com/earthboundkid/versioninfo/v2 v2.24.1/go.mod h1:VcWEooDEuyUJnMfbdTh0uFN4cfEIg+kHMuWB2CDCLjw=
29
39
github.com/ericvolp12/jwt-go-secp256k1 v0.0.2 h1:puGwrNTY2vCt8eakkSEq2yeNxUD3zb2kPhv1OsF1hPs=
···
63
73
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
64
74
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
65
75
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
76
+
github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q=
77
+
github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
66
78
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
67
79
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
68
80
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
···
136
148
github.com/ipfs/go-verifcid v0.0.3/go.mod h1:gcCtGniVzelKrbk9ooUSX/pM3xlH73fZZJDzQJRvOUw=
137
149
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 h1:oFo19cBmcP0Cmg3XXbrr0V/c+xU9U1huEZp8+OgBzdI=
138
150
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4/go.mod h1:6nkFF8OmR5wLKBzRKi7/YFJpyYR7+oEn1DX+mMWnlLA=
151
+
github.com/ipld/go-car v0.6.2 h1:Hlnl3Awgnq8icK+ze3iRghk805lu8YNq3wlREDTF2qc=
152
+
github.com/ipld/go-car v0.6.2/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8=
139
153
github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc=
140
154
github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s=
141
155
github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E=
+3
readme.md
+3
readme.md
···
29
29
### `/cmd/backstream`
30
30
experimental backfiller that kinda (but not really) conforms to the jetstream event shape. designed to be ingested by consumers expecting jetstream
31
31
32
+
### `/cmd/aturilist`
33
+
experimental listRecords replacement. is not backfilled. uses the official jetstream go client, which means it suffers from this [bug](https://github.com/bluesky-social/jetstream/pull/45)
34
+
32
35
## Packages
33
36
34
37
### `/auth`