at main 4.5 kB view raw
1package pds 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "net" 8 "net/http" 9 "time" 10) 11 12type Client struct { 13 httpClient *http.Client 14} 15 16func NewClient(timeout time.Duration) *Client { 17 return &Client{ 18 httpClient: &http.Client{ 19 Timeout: timeout, 20 }, 21 } 22} 23 24// ListReposResponse represents the response from com.atproto.sync.listRepos 25type ListReposResponse struct { 26 Repos []Repo `json:"repos"` 27 Cursor *string `json:"cursor,omitempty"` 28} 29 30// Repo represents a repository in the list 31type Repo struct { 32 DID string `json:"did"` 33 Head string `json:"head,omitempty"` 34 Rev string `json:"rev,omitempty"` 35 Active *bool `json:"active,omitempty"` 36 Status *string `json:"status,omitempty"` 37} 38 39// ListRepos fetches all repositories from a PDS with pagination 40func (c *Client) ListRepos(ctx context.Context, endpoint string) ([]Repo, error) { 41 var allRepos []Repo 42 var cursor *string 43 44 for { 45 // Build URL 46 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=1000", endpoint) 47 if cursor != nil { 48 url += fmt.Sprintf("&cursor=%s", *cursor) 49 } 50 51 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 52 if err != nil { 53 return nil, err 54 } 55 56 resp, err := c.httpClient.Do(req) 57 if err != nil { 58 return nil, err 59 } 60 61 if resp.StatusCode != http.StatusOK { 62 resp.Body.Close() 63 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 64 } 65 66 var result ListReposResponse 67 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 68 resp.Body.Close() 69 return nil, err 70 } 71 resp.Body.Close() 72 73 // Collect repos 74 allRepos = append(allRepos, result.Repos...) 75 76 // Check if there are more pages 77 if result.Cursor == nil || *result.Cursor == "" { 78 break 79 } 80 cursor = result.Cursor 81 } 82 83 return allRepos, nil 84} 85 86// DescribeServer fetches com.atproto.server.describeServer 87// Returns: description, responseTime, usedIP, error 88func (c *Client) DescribeServer(ctx context.Context, endpoint string) (*ServerDescription, time.Duration, string, error) { 89 startTime := time.Now() 90 url := fmt.Sprintf("%s/xrpc/com.atproto.server.describeServer", endpoint) 91 92 // Track which IP was used 93 var usedIP string 94 transport := &http.Transport{ 95 DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { 96 conn, err := (&net.Dialer{ 97 Timeout: 30 * time.Second, 98 KeepAlive: 30 * time.Second, 99 }).DialContext(ctx, network, addr) 100 101 if err == nil && conn != nil { 102 if remoteAddr := conn.RemoteAddr(); remoteAddr != nil { 103 if tcpAddr, ok := remoteAddr.(*net.TCPAddr); ok { 104 usedIP = tcpAddr.IP.String() 105 } 106 } 107 } 108 return conn, err 109 }, 110 } 111 112 client := &http.Client{ 113 Timeout: c.httpClient.Timeout, 114 Transport: transport, 115 } 116 117 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 118 if err != nil { 119 return nil, 0, "", err 120 } 121 122 resp, err := client.Do(req) 123 responseTime := time.Since(startTime) 124 125 if err != nil { 126 return nil, responseTime, usedIP, err 127 } 128 defer resp.Body.Close() 129 130 if resp.StatusCode != http.StatusOK { 131 return nil, responseTime, usedIP, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 132 } 133 134 var desc ServerDescription 135 if err := json.NewDecoder(resp.Body).Decode(&desc); err != nil { 136 return nil, responseTime, usedIP, err 137 } 138 139 return &desc, responseTime, usedIP, nil 140} 141 142// CheckHealth performs a basic health check, ensuring the endpoint returns JSON with a "version" 143// Returns: available, responseTime, version, error 144func (c *Client) CheckHealth(ctx context.Context, endpoint string) (bool, time.Duration, string, error) { 145 startTime := time.Now() 146 147 url := fmt.Sprintf("%s/xrpc/_health", endpoint) 148 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 149 if err != nil { 150 return false, 0, "", err 151 } 152 153 resp, err := c.httpClient.Do(req) 154 duration := time.Since(startTime) 155 156 if err != nil { 157 return false, duration, "", err 158 } 159 defer resp.Body.Close() 160 161 if resp.StatusCode != http.StatusOK { 162 return false, duration, "", fmt.Errorf("health check returned status %d", resp.StatusCode) 163 } 164 165 // Decode the JSON response and check for "version" 166 var healthResponse struct { 167 Version string `json:"version"` 168 } 169 170 if err := json.NewDecoder(resp.Body).Decode(&healthResponse); err != nil { 171 return false, duration, "", fmt.Errorf("failed to decode health JSON: %w", err) 172 } 173 174 if healthResponse.Version == "" { 175 return false, duration, "", fmt.Errorf("health JSON response missing 'version' field") 176 } 177 178 // All checks passed 179 return true, duration, healthResponse.Version, nil 180}