A tool for backing up ATProto related data to S3

backup tangled knot stuff

Signed-off-by: Will Andrews <will7989@hotmail.com>

+3
.env.example
··· 4 4 BUCKET_NAME="my-super-duper-bucket" 5 5 DID="the-did-to-backup" 6 6 PDS_HOST="https://your-pds.com" 7 + TANGLED_KNOT_DATABASE_DIRECTORY="/path/to/database/directory" 8 + TANGLED_KNOT_REPOSITORY_DIRECTORY="/path/to/repository/directory" 9 + BUGSNAG_API_KEY="enter-api-key-to-enable"
+4
go.mod
··· 8 8 ) 9 9 10 10 require ( 11 + github.com/bugsnag/bugsnag-go/v2 v2.6.2 // indirect 12 + github.com/bugsnag/panicwrap v1.3.4 // indirect 11 13 github.com/dustin/go-humanize v1.0.1 // indirect 12 14 github.com/go-ini/ini v1.67.0 // indirect 13 15 github.com/goccy/go-json v0.10.5 // indirect 14 16 github.com/google/uuid v1.6.0 // indirect 17 + github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect 15 18 github.com/klauspost/compress v1.18.0 // indirect 16 19 github.com/klauspost/cpuid/v2 v2.2.11 // indirect 17 20 github.com/minio/crc64nvme v1.0.2 // indirect 18 21 github.com/minio/md5-simd v1.1.2 // indirect 19 22 github.com/philhofer/fwd v1.2.0 // indirect 23 + github.com/pkg/errors v0.9.1 // indirect 20 24 github.com/rs/xid v1.6.0 // indirect 21 25 github.com/stretchr/testify v1.10.0 // indirect 22 26 github.com/tinylib/msgp v1.3.0 // indirect
+9
go.sum
··· 1 + github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q= 2 + github.com/bugsnag/bugsnag-go/v2 v2.6.2 h1:gGjr8txMtPYWKovEBC+4o6tthYveuE7fjzu6XYVIApg= 3 + github.com/bugsnag/bugsnag-go/v2 v2.6.2/go.mod h1:S9njhE7l6XCiKycOZ2zp0x1zoEE5nL3HjROCSsKc/3c= 4 + github.com/bugsnag/panicwrap v1.3.4 h1:A6sXFtDGsgU/4BLf5JT0o5uYg3EeKgGx3Sfs+/uk3pU= 5 + github.com/bugsnag/panicwrap v1.3.4/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE= 1 6 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= 2 7 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 3 8 github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= ··· 10 15 github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= 11 16 github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= 12 17 github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= 18 + github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA= 19 + github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= 13 20 github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= 14 21 github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= 15 22 github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= ··· 23 30 github.com/minio/minio-go/v7 v7.0.95/go.mod h1:wOOX3uxS334vImCNRVyIDdXX9OsXDm89ToynKgqUKlo= 24 31 github.com/philhofer/fwd v1.2.0 h1:e6DnBTl7vGY+Gz322/ASL4Gyp1FspeMvx1RNDoToZuM= 25 32 github.com/philhofer/fwd v1.2.0/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM= 33 + github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= 34 + github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= 26 35 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 27 36 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 28 37 github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU=
+15 -161
main.go
··· 1 1 package main 2 2 3 3 import ( 4 - "archive/zip" 5 4 "context" 6 - "encoding/json" 7 - "fmt" 8 - "io" 9 5 "log/slog" 10 - "net/http" 11 6 "os" 12 7 8 + "github.com/bugsnag/bugsnag-go/v2" 13 9 "github.com/joho/godotenv" 14 10 "github.com/minio/minio-go/v7" 15 11 "github.com/minio/minio-go/v7/pkg/credentials" ··· 26 22 } 27 23 } 28 24 25 + configureBugsnag() 26 + 29 27 minioClient, err := createMinioClient() 30 28 if err != nil { 31 29 slog.Error("create minio client", "error", err) 30 + bugsnag.Notify(err) 32 31 return 33 32 } 34 33 ··· 37 36 err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{}) 38 37 if err != nil { 39 38 slog.Error("create bucket", "error", err) 40 - return 41 - } 42 - 43 - err = backupRepo(ctx, minioClient, bucketName) 44 - if err != nil { 45 - slog.Error("backup repo", "error", err) 39 + bugsnag.Notify(err) 46 40 return 47 41 } 48 42 49 - err = backupBlobs(ctx, minioClient, bucketName) 50 - if err != nil { 51 - slog.Error("backup blobs", "error", err) 52 - return 53 - } 43 + backupPDS(ctx, minioClient, bucketName) 54 44 } 55 45 56 46 func createMinioClient() (*minio.Client, error) { ··· 65 55 }) 66 56 } 67 57 68 - func backupRepo(ctx context.Context, minioClient *minio.Client, bucketName string) error { 69 - pdsHost := os.Getenv("PDS_HOST") 70 - did := os.Getenv("DID") 71 - 72 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", pdsHost, did) 73 - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 74 - if err != nil { 75 - return fmt.Errorf("create get repo request: %w", err) 76 - } 77 - 78 - req.Header.Add("ACCEPT", "application/vnd.ipld.car") 79 - resp, err := http.DefaultClient.Do(req) 80 - if err != nil { 81 - return fmt.Errorf("get repo: %w", err) 82 - } 83 - 84 - defer resp.Body.Close() 85 - 86 - _, err = minioClient.PutObject(ctx, bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{}) 87 - if err != nil { 88 - return fmt.Errorf("stream repo to bucket: %w", err) 89 - } 90 - 91 - return nil 92 - } 93 - 94 - func backupBlobs(ctx context.Context, minioClient *minio.Client, bucketName string) error { 95 - cids, err := getAllBlobCIDs(ctx) 96 - if err != nil { 97 - return fmt.Errorf("get all blob CIDs: %w", err) 98 - } 99 - 100 - reader, writer := io.Pipe() 101 - defer reader.Close() 102 - 103 - zipWriter := zip.NewWriter(writer) 104 - 105 - go func() { 106 - defer writer.Close() 107 - defer zipWriter.Close() 108 - 109 - for _, cid := range cids { 110 - slog.Info("processing cid", "cid", cid) 111 - blob, err := getBlob(ctx, cid) 112 - if err != nil { 113 - slog.Error("failed to get blob", "cid", cid, "error", err) 114 - continue 115 - } 116 - 117 - zipFile, err := zipWriter.Create(cid) 118 - if err != nil { 119 - slog.Error("create new file in zipwriter", "cid", cid, "error", err) 120 - blob.Close() 121 - continue 122 - } 123 - 124 - io.Copy(zipFile, blob) 125 - blob.Close() 126 - } 127 - }() 128 - 129 - _, err = minioClient.PutObject(ctx, bucketName, "pds-blobs.zip", reader, -1, minio.PutObjectOptions{}) 130 - if err != nil { 131 - return fmt.Errorf("stream blobs to bucket: %w", err) 132 - } 133 - 134 - return nil 135 - } 136 - 137 - func getAllBlobCIDs(ctx context.Context) ([]string, error) { 138 - cursor := "" 139 - limit := 100 140 - var cids []string 141 - for { 142 - res, err := listBlobs(ctx, cursor, int64(limit)) 143 - if err != nil { 144 - return nil, fmt.Errorf("list blobs: %w", err) 145 - } 146 - if len(res.CIDs) == 0 { 147 - return cids, nil 148 - } 149 - 150 - cids = append(cids, res.CIDs...) 151 - 152 - if len(res.CIDs) < limit { 153 - return cids, nil 154 - } 155 - 156 - cursor = res.Cursor 157 - } 158 - } 159 - 160 - type listBlobsResponse struct { 161 - Cursor string `json:"cursor"` 162 - CIDs []string `json:"cids"` 163 - } 164 - 165 - func listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 166 - pdsHost := os.Getenv("PDS_HOST") 167 - did := os.Getenv("DID") 168 - 169 - // TODO: do proper url encoding of query params 170 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", pdsHost, did, cursor, limit) 171 - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 172 - if err != nil { 173 - return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err) 174 - } 175 - 176 - resp, err := http.DefaultClient.Do(req) 177 - if err != nil { 178 - return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err) 179 - } 180 - 181 - defer resp.Body.Close() 182 - 183 - resBody, err := io.ReadAll(resp.Body) 184 - if err != nil { 185 - return listBlobsResponse{}, fmt.Errorf("failed to read response: %w", err) 186 - } 187 - 188 - var result listBlobsResponse 189 - err = json.Unmarshal(resBody, &result) 190 - if err != nil { 191 - return listBlobsResponse{}, fmt.Errorf("failed to unmarshal response: %w", err) 192 - } 193 - 194 - return result, nil 195 - } 196 - 197 - func getBlob(ctx context.Context, cid string) (io.ReadCloser, error) { 198 - pdsHost := os.Getenv("PDS_HOST") 199 - did := os.Getenv("DID") 200 - 201 - // TODO: do proper url encoding of query params 202 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", pdsHost, did, cid) 203 - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 204 - if err != nil { 205 - return nil, fmt.Errorf("create get blob request: %w", err) 206 - } 207 - 208 - resp, err := http.DefaultClient.Do(req) 209 - if err != nil { 210 - return nil, fmt.Errorf("get blob: %w", err) 58 + func configureBugsnag() { 59 + apiKey := os.Getenv("BUGSNAG_API_KEY") 60 + if apiKey == "" { 61 + slog.Info("bugsnag not configured") 62 + return 211 63 } 212 - 213 - return resp.Body, nil 64 + bugsnag.Configure(bugsnag.Configuration{ 65 + APIKey: apiKey, 66 + ReleaseStage: "production", 67 + }) 214 68 }
+186
pds.go
··· 1 + package main 2 + 3 + import ( 4 + "archive/zip" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "log/slog" 10 + "net/http" 11 + "os" 12 + 13 + "github.com/bugsnag/bugsnag-go/v2" 14 + "github.com/minio/minio-go/v7" 15 + ) 16 + 17 + func backupPDS(ctx context.Context, minioClient *minio.Client, bucketName string) { 18 + if os.Getenv("PDS_HOST") == "" || os.Getenv("DID") == "" { 19 + slog.Info("PDS_HOST or DID env not set - skipping PDS backup") 20 + return 21 + } 22 + 23 + err := backupRepo(ctx, minioClient, bucketName) 24 + if err != nil { 25 + slog.Error("backup repo", "error", err) 26 + bugsnag.Notify(err) 27 + return 28 + } 29 + 30 + err = backupBlobs(ctx, minioClient, bucketName) 31 + if err != nil { 32 + slog.Error("backup blobs", "error", err) 33 + bugsnag.Notify(err) 34 + return 35 + } 36 + } 37 + 38 + func backupRepo(ctx context.Context, minioClient *minio.Client, bucketName string) error { 39 + pdsHost := os.Getenv("PDS_HOST") 40 + did := os.Getenv("DID") 41 + 42 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", pdsHost, did) 43 + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 44 + if err != nil { 45 + return fmt.Errorf("create get repo request: %w", err) 46 + } 47 + 48 + req.Header.Add("ACCEPT", "application/vnd.ipld.car") 49 + resp, err := http.DefaultClient.Do(req) 50 + if err != nil { 51 + return fmt.Errorf("get repo: %w", err) 52 + } 53 + 54 + defer resp.Body.Close() 55 + 56 + _, err = minioClient.PutObject(ctx, bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{}) 57 + if err != nil { 58 + return fmt.Errorf("stream repo to bucket: %w", err) 59 + } 60 + 61 + return nil 62 + } 63 + 64 + func backupBlobs(ctx context.Context, minioClient *minio.Client, bucketName string) error { 65 + cids, err := getAllBlobCIDs(ctx) 66 + if err != nil { 67 + return fmt.Errorf("get all blob CIDs: %w", err) 68 + } 69 + 70 + reader, writer := io.Pipe() 71 + defer reader.Close() 72 + 73 + zipWriter := zip.NewWriter(writer) 74 + 75 + go func() { 76 + defer writer.Close() 77 + defer zipWriter.Close() 78 + 79 + for _, cid := range cids { 80 + slog.Info("processing cid", "cid", cid) 81 + blob, err := getBlob(ctx, cid) 82 + if err != nil { 83 + slog.Error("failed to get blob", "cid", cid, "error", err) 84 + bugsnag.Notify(err) 85 + continue 86 + } 87 + 88 + zipFile, err := zipWriter.Create(cid) 89 + if err != nil { 90 + slog.Error("create new file in zipwriter", "cid", cid, "error", err) 91 + bugsnag.Notify(err) 92 + blob.Close() 93 + continue 94 + } 95 + 96 + io.Copy(zipFile, blob) 97 + blob.Close() 98 + } 99 + }() 100 + 101 + _, err = minioClient.PutObject(ctx, bucketName, "pds-blobs.zip", reader, -1, minio.PutObjectOptions{}) 102 + if err != nil { 103 + return fmt.Errorf("stream blobs to bucket: %w", err) 104 + } 105 + 106 + return nil 107 + } 108 + 109 + func getAllBlobCIDs(ctx context.Context) ([]string, error) { 110 + cursor := "" 111 + limit := 100 112 + var cids []string 113 + for { 114 + res, err := listBlobs(ctx, cursor, int64(limit)) 115 + if err != nil { 116 + return nil, fmt.Errorf("list blobs: %w", err) 117 + } 118 + if len(res.CIDs) == 0 { 119 + return cids, nil 120 + } 121 + 122 + cids = append(cids, res.CIDs...) 123 + 124 + if len(res.CIDs) < limit { 125 + return cids, nil 126 + } 127 + 128 + cursor = res.Cursor 129 + } 130 + } 131 + 132 + type listBlobsResponse struct { 133 + Cursor string `json:"cursor"` 134 + CIDs []string `json:"cids"` 135 + } 136 + 137 + func listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 138 + pdsHost := os.Getenv("PDS_HOST") 139 + did := os.Getenv("DID") 140 + 141 + // TODO: do proper url encoding of query params 142 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", pdsHost, did, cursor, limit) 143 + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 144 + if err != nil { 145 + return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err) 146 + } 147 + 148 + resp, err := http.DefaultClient.Do(req) 149 + if err != nil { 150 + return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err) 151 + } 152 + 153 + defer resp.Body.Close() 154 + 155 + resBody, err := io.ReadAll(resp.Body) 156 + if err != nil { 157 + return listBlobsResponse{}, fmt.Errorf("failed to read response: %w", err) 158 + } 159 + 160 + var result listBlobsResponse 161 + err = json.Unmarshal(resBody, &result) 162 + if err != nil { 163 + return listBlobsResponse{}, fmt.Errorf("failed to unmarshal response: %w", err) 164 + } 165 + 166 + return result, nil 167 + } 168 + 169 + func getBlob(ctx context.Context, cid string) (io.ReadCloser, error) { 170 + pdsHost := os.Getenv("PDS_HOST") 171 + did := os.Getenv("DID") 172 + 173 + // TODO: do proper url encoding of query params 174 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", pdsHost, did, cid) 175 + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 176 + if err != nil { 177 + return nil, fmt.Errorf("create get blob request: %w", err) 178 + } 179 + 180 + resp, err := http.DefaultClient.Do(req) 181 + if err != nil { 182 + return nil, fmt.Errorf("get blob: %w", err) 183 + } 184 + 185 + return resp.Body, nil 186 + }
+98
tangled_knot.go
··· 1 + package main 2 + 3 + import ( 4 + "archive/tar" 5 + "compress/gzip" 6 + "context" 7 + "io" 8 + "log/slog" 9 + "os" 10 + "path/filepath" 11 + 12 + "github.com/bugsnag/bugsnag-go/v2" 13 + "github.com/minio/minio-go/v7" 14 + ) 15 + 16 + func backupTangledKnot(ctx context.Context, minioClient *minio.Client, bucketName string) { 17 + if os.Getenv("BACKUP_TANGLED_KNOT") != "true" { 18 + return 19 + } 20 + 21 + backupKnotDB(ctx, minioClient, bucketName) 22 + } 23 + 24 + func backupKnotDB(ctx context.Context, minioClient *minio.Client, bucketName string) { 25 + dir := os.Getenv("TANGLED_KNOT_DATABASE_DIRECTORY") 26 + 27 + pipeReader, pipeWriter := io.Pipe() 28 + defer pipeReader.Close() 29 + 30 + go compress(dir, pipeWriter) 31 + 32 + _, err := minioClient.PutObject(ctx, bucketName, "knot-db.zip", pipeReader, -1, minio.PutObjectOptions{}) 33 + if err != nil { 34 + slog.Error("stream knot DB to bucket: %w") 35 + bugsnag.Notify(err) 36 + } 37 + } 38 + 39 + func backupKnotRepos(ctx context.Context, minioClient *minio.Client, bucketName string) { 40 + dir := os.Getenv("TANGLED_KNOT_REPOSITORY_DIRECTORY") 41 + 42 + pipeReader, pipeWriter := io.Pipe() 43 + defer pipeReader.Close() 44 + 45 + go compress(dir, pipeWriter) 46 + 47 + _, err := minioClient.PutObject(ctx, bucketName, "knot-repos.zip", pipeReader, -1, minio.PutObjectOptions{}) 48 + if err != nil { 49 + slog.Error("stream knot repos to bucket: %w") 50 + bugsnag.Notify(err) 51 + } 52 + } 53 + 54 + func compress(src string, writer io.WriteCloser) error { 55 + zipWriter := gzip.NewWriter(writer) 56 + tarWriter := tar.NewWriter(zipWriter) 57 + 58 + defer writer.Close() 59 + defer zipWriter.Close() 60 + defer tarWriter.Close() 61 + 62 + filepath.Walk(src, func(file string, fi os.FileInfo, err error) error { 63 + header, err := tar.FileInfoHeader(fi, file) 64 + if err != nil { 65 + return err 66 + } 67 + 68 + // must provide real name 69 + // (see https://golang.org/src/archive/tar/common.go?#L626) 70 + header.Name = filepath.ToSlash(file) 71 + 72 + if err := tarWriter.WriteHeader(header); err != nil { 73 + return err 74 + } 75 + // if not a dir, write file content 76 + if !fi.IsDir() { 77 + data, err := os.Open(file) 78 + if err != nil { 79 + return err 80 + } 81 + if _, err := io.Copy(tarWriter, data); err != nil { 82 + return err 83 + } 84 + } 85 + return nil 86 + }) 87 + 88 + // produce tar 89 + if err := tarWriter.Close(); err != nil { 90 + return err 91 + } 92 + // produce gzip 93 + if err := zipWriter.Close(); err != nil { 94 + return err 95 + } 96 + // 97 + return nil 98 + }