1package main
2
3import (
4 "context"
5 "crypto/rand"
6 "encoding/hex"
7 "fmt"
8 "os"
9 "sync"
10 "time"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 appbsky "github.com/bluesky-social/indigo/api/bsky"
14 "github.com/bluesky-social/indigo/carstore"
15 lexutil "github.com/bluesky-social/indigo/lex/util"
16 "github.com/bluesky-social/indigo/repo"
17 "github.com/bluesky-social/indigo/testing"
18 "github.com/bluesky-social/indigo/util/cliutil"
19 "github.com/bluesky-social/indigo/xrpc"
20
21 "github.com/ipfs/go-cid"
22 "github.com/ipfs/go-datastore"
23 blockstore "github.com/ipfs/go-ipfs-blockstore"
24 cbor "github.com/ipfs/go-ipld-cbor"
25
26 _ "github.com/joho/godotenv/autoload"
27
28 "github.com/carlmjohnson/versioninfo"
29 "github.com/ipld/go-car"
30 cli "github.com/urfave/cli/v2"
31)
32
33func main() {
34 run(os.Args)
35}
36
37func run(args []string) {
38 app := cli.App{
39 Name: "stress",
40 Usage: "load generation tool for PDS instances",
41 Version: versioninfo.Short(),
42 }
43
44 app.Commands = []*cli.Command{
45 postingCmd,
46 genRepoCmd,
47 }
48
49 app.RunAndExitOnError()
50}
51
52var postingCmd = &cli.Command{
53 Name: "posting",
54 Flags: []cli.Flag{
55 &cli.BoolFlag{
56 Name: "quiet",
57 },
58 &cli.IntFlag{
59 Name: "count",
60 Value: 100,
61 },
62 &cli.IntFlag{
63 Name: "concurrent",
64 Value: 1,
65 },
66 &cli.StringFlag{
67 Name: "pds-host",
68 Usage: "method, hostname, and port of PDS instance",
69 Value: "http://localhost:4849",
70 EnvVars: []string{"ATP_PDS_HOST"},
71 },
72 &cli.StringFlag{
73 Name: "invite",
74 },
75 },
76 Action: func(cctx *cli.Context) error {
77 xrpcc, err := cliutil.GetXrpcClient(cctx, false)
78 if err != nil {
79 return err
80 }
81
82 count := cctx.Int("count")
83 concurrent := cctx.Int("concurrent")
84 quiet := cctx.Bool("quiet")
85 ctx := context.TODO()
86
87 buf := make([]byte, 6)
88 rand.Read(buf)
89 id := hex.EncodeToString(buf)
90
91 var invite *string
92 if inv := cctx.String("invite"); inv != "" {
93 invite = &inv
94 }
95
96 cfg, err := comatproto.ServerDescribeServer(ctx, xrpcc)
97 if err != nil {
98 return err
99 }
100
101 domain := cfg.AvailableUserDomains[0]
102 fmt.Println("domain: ", domain)
103
104 email := fmt.Sprintf("user-%s@test.com", id)
105 pass := "password"
106 resp, err := comatproto.ServerCreateAccount(ctx, xrpcc, &comatproto.ServerCreateAccount_Input{
107 Email: &email,
108 Handle: "user-" + id + domain,
109 Password: &pass,
110 InviteCode: invite,
111 })
112 if err != nil {
113 return err
114 }
115
116 xrpcc.Auth = &xrpc.AuthInfo{
117 AccessJwt: resp.AccessJwt,
118 RefreshJwt: resp.RefreshJwt,
119 Handle: resp.Handle,
120 Did: resp.Did,
121 }
122
123 var wg sync.WaitGroup
124 for con := 0; con < concurrent; con++ {
125 wg.Add(1)
126 go func(worker int) {
127 defer wg.Done()
128 for i := 0; i < count; i++ {
129 buf := make([]byte, 100)
130 rand.Read(buf)
131
132 res, err := comatproto.RepoCreateRecord(context.TODO(), xrpcc, &comatproto.RepoCreateRecord_Input{
133 Collection: "app.bsky.feed.post",
134 Repo: xrpcc.Auth.Did,
135 Record: &lexutil.LexiconTypeDecoder{Val: &appbsky.FeedPost{
136 Text: hex.EncodeToString(buf),
137 CreatedAt: time.Now().Format(time.RFC3339),
138 }},
139 })
140 if err != nil {
141 fmt.Printf("errored on worker %d loop %d: %s\n", worker, i, err)
142 return
143 }
144
145 if !quiet {
146 fmt.Println(res.Cid, res.Uri)
147 }
148 }
149 }(con)
150 }
151
152 wg.Wait()
153
154 return nil
155 },
156}
157
158var genRepoCmd = &cli.Command{
159 Name: "gen-repo",
160 Flags: []cli.Flag{
161 &cli.IntFlag{
162 Name: "len",
163 Value: 50,
164 },
165 &cli.StringFlag{
166 Name: "pds-host",
167 Usage: "method, hostname, and port of PDS instance",
168 Value: "http://localhost:4849",
169 EnvVars: []string{"ATP_PDS_HOST"},
170 },
171 },
172 ArgsUsage: "<car-file-path>",
173 Action: func(cctx *cli.Context) error {
174 fname := cctx.Args().First()
175 if fname == "" {
176 return cli.Exit("must provide car file path", 127)
177 }
178
179 l := cctx.Int("len")
180
181 membs := blockstore.NewBlockstore(datastore.NewMapDatastore())
182
183 ctx := context.Background()
184
185 r := repo.NewRepo(ctx, "did:plc:foobar", membs)
186
187 root, err := testing.GenerateFakeRepo(r, l)
188 if err != nil {
189 return err
190 }
191
192 fi, err := os.Create(fname)
193 if err != nil {
194 return err
195 }
196 defer fi.Close()
197
198 h := &car.CarHeader{
199 Roots: []cid.Cid{root},
200 Version: 1,
201 }
202 hb, err := cbor.DumpObject(h)
203 if err != nil {
204 return err
205 }
206
207 _, err = carstore.LdWrite(fi, hb)
208 if err != nil {
209 return err
210 }
211
212 kc, _ := membs.AllKeysChan(ctx)
213 for k := range kc {
214 blk, err := membs.Get(ctx, k)
215 if err != nil {
216 return err
217 }
218
219 _, err = carstore.LdWrite(fi, k.Bytes(), blk.RawData())
220 if err != nil {
221 return err
222 }
223 }
224
225 return nil
226 },
227}