tangled
alpha
login
or
join now
moll.dev
/
core
forked from
tangled.org/core
this repo has no description
0
fork
atom
overview
issues
pulls
pipelines
cmd/jstest: test tool for jetstream client
anirudh.fi
10 months ago
61d24980
e49f5e4d
+150
1 changed file
expand all
collapse all
unified
split
cmd
jstest
main.go
+150
cmd/jstest/main.go
···
1
1
+
package main
2
2
+
3
3
+
import (
4
4
+
"context"
5
5
+
"flag"
6
6
+
"log/slog"
7
7
+
"os"
8
8
+
"os/signal"
9
9
+
"strings"
10
10
+
"syscall"
11
11
+
"time"
12
12
+
13
13
+
"github.com/bluesky-social/jetstream/pkg/client"
14
14
+
"github.com/bluesky-social/jetstream/pkg/models"
15
15
+
"github.com/sotangled/tangled/jetstream"
16
16
+
)
17
17
+
18
18
+
// Simple in-memory implementation of DB interface
19
19
+
type MemoryDB struct {
20
20
+
lastTimeUs int64
21
21
+
}
22
22
+
23
23
+
func (m *MemoryDB) GetLastTimeUs() (int64, error) {
24
24
+
if m.lastTimeUs == 0 {
25
25
+
return time.Now().UnixMicro(), nil
26
26
+
}
27
27
+
return m.lastTimeUs, nil
28
28
+
}
29
29
+
30
30
+
func (m *MemoryDB) SaveLastTimeUs(ts int64) error {
31
31
+
m.lastTimeUs = ts
32
32
+
return nil
33
33
+
}
34
34
+
35
35
+
func (m *MemoryDB) UpdateLastTimeUs(ts int64) error {
36
36
+
m.lastTimeUs = ts
37
37
+
return nil
38
38
+
}
39
39
+
40
40
+
func main() {
41
41
+
// Setup logger
42
42
+
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
43
43
+
Level: slog.LevelInfo,
44
44
+
}))
45
45
+
46
46
+
// Create in-memory DB
47
47
+
db := &MemoryDB{}
48
48
+
49
49
+
// Get query URL from flag
50
50
+
var queryURL string
51
51
+
flag.StringVar(&queryURL, "query-url", "", "Jetstream query URL containing DIDs")
52
52
+
flag.Parse()
53
53
+
54
54
+
if queryURL == "" {
55
55
+
logger.Error("No query URL provided, use --query-url flag")
56
56
+
os.Exit(1)
57
57
+
}
58
58
+
59
59
+
// Extract wantedDids parameters
60
60
+
didParams := strings.Split(queryURL, "&wantedDids=")
61
61
+
dids := make([]string, 0, len(didParams)-1)
62
62
+
for i, param := range didParams {
63
63
+
if i == 0 {
64
64
+
// Skip the first part (the base URL with cursor)
65
65
+
continue
66
66
+
}
67
67
+
dids = append(dids, param)
68
68
+
}
69
69
+
70
70
+
// Extract collections
71
71
+
collections := []string{"sh.tangled.publicKey", "sh.tangled.knot.member"}
72
72
+
73
73
+
// Create client configuration
74
74
+
cfg := client.DefaultClientConfig()
75
75
+
cfg.WebsocketURL = "wss://jetstream2.us-west.bsky.network/subscribe"
76
76
+
cfg.WantedCollections = collections
77
77
+
78
78
+
// Create jetstream client
79
79
+
jsClient, err := jetstream.NewJetstreamClient(
80
80
+
cfg.WebsocketURL,
81
81
+
"tangled-jetstream",
82
82
+
collections,
83
83
+
cfg,
84
84
+
logger,
85
85
+
db,
86
86
+
false,
87
87
+
)
88
88
+
if err != nil {
89
89
+
logger.Error("Failed to create jetstream client", "error", err)
90
90
+
os.Exit(1)
91
91
+
}
92
92
+
93
93
+
// Update DIDs
94
94
+
jsClient.UpdateDids(dids)
95
95
+
96
96
+
// Create a context that will be canceled on SIGINT or SIGTERM
97
97
+
ctx, cancel := context.WithCancel(context.Background())
98
98
+
defer cancel()
99
99
+
100
100
+
// Setup signal handling with a buffered channel
101
101
+
sigCh := make(chan os.Signal, 1)
102
102
+
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
103
103
+
104
104
+
// Process function for events
105
105
+
processFunc := func(ctx context.Context, event *models.Event) error {
106
106
+
// Log the event details
107
107
+
logger.Info("Received event",
108
108
+
"collection", event.Commit.Collection,
109
109
+
"did", event.Did,
110
110
+
"rkey", event.Commit.RKey,
111
111
+
"action", event.Kind,
112
112
+
"time_us", event.TimeUS,
113
113
+
)
114
114
+
115
115
+
// Save the last time_us
116
116
+
if err := db.UpdateLastTimeUs(event.TimeUS); err != nil {
117
117
+
logger.Error("Failed to update last time_us", "error", err)
118
118
+
}
119
119
+
120
120
+
return nil
121
121
+
}
122
122
+
123
123
+
// Start jetstream
124
124
+
if err := jsClient.StartJetstream(ctx, processFunc); err != nil {
125
125
+
logger.Error("Failed to start jetstream", "error", err)
126
126
+
os.Exit(1)
127
127
+
}
128
128
+
129
129
+
// Wait for signal instead of context.Done()
130
130
+
sig := <-sigCh
131
131
+
logger.Info("Received signal, shutting down", "signal", sig)
132
132
+
cancel() // Cancel context after receiving signal
133
133
+
134
134
+
// Shutdown gracefully with a timeout
135
135
+
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
136
136
+
defer shutdownCancel()
137
137
+
138
138
+
done := make(chan struct{})
139
139
+
go func() {
140
140
+
jsClient.Shutdown()
141
141
+
close(done)
142
142
+
}()
143
143
+
144
144
+
select {
145
145
+
case <-done:
146
146
+
logger.Info("Jetstream client shut down gracefully")
147
147
+
case <-shutdownCtx.Done():
148
148
+
logger.Warn("Shutdown timed out, forcing exit")
149
149
+
}
150
150
+
}