A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
at did-resolver 291 lines 6.9 kB view raw
1package plc 2 3import ( 4 "bufio" 5 "context" 6 "fmt" 7 "io" 8 "log" 9 "net/http" 10 "strconv" 11 "time" 12 13 "github.com/goccy/go-json" 14) 15 16// Client is a client for the PLC directory 17type Client struct { 18 baseURL string 19 httpClient *http.Client 20 rateLimiter *RateLimiter 21 logger Logger 22} 23 24// Logger is a simple logging interface 25type Logger interface { 26 Printf(format string, v ...interface{}) 27 Println(v ...interface{}) 28} 29 30// defaultLogger uses standard log package 31type defaultLogger struct{} 32 33func (d defaultLogger) Printf(format string, v ...interface{}) { 34 log.Printf(format, v...) 35} 36 37func (d defaultLogger) Println(v ...interface{}) { 38 log.Println(v...) 39} 40 41// ClientOption is a functional option for configuring the Client 42type ClientOption func(*Client) 43 44// WithLogger sets a custom logger 45func WithLogger(logger Logger) ClientOption { 46 return func(c *Client) { 47 c.logger = logger 48 } 49} 50 51// WithTimeout sets a custom HTTP timeout 52func WithTimeout(timeout time.Duration) ClientOption { 53 return func(c *Client) { 54 c.httpClient.Timeout = timeout 55 } 56} 57 58// WithRateLimit sets a custom rate limit (requests per period) 59func WithRateLimit(requestsPerPeriod int, period time.Duration) ClientOption { 60 return func(c *Client) { 61 if c.rateLimiter != nil { 62 c.rateLimiter.Stop() 63 } 64 c.rateLimiter = NewRateLimiter(requestsPerPeriod, period) 65 } 66} 67 68// NewClient creates a new PLC directory client 69// Default: 90 requests per minute, 60 second timeout 70func NewClient(baseURL string, opts ...ClientOption) *Client { 71 c := &Client{ 72 baseURL: baseURL, 73 httpClient: &http.Client{ 74 Timeout: 60 * time.Second, 75 }, 76 rateLimiter: NewRateLimiter(90, time.Minute), 77 logger: defaultLogger{}, 78 } 79 80 for _, opt := range opts { 81 opt(c) 82 } 83 84 return c 85} 86 87// Close closes the client and cleans up resources 88func (c *Client) Close() { 89 if c.rateLimiter != nil { 90 c.rateLimiter.Stop() 91 } 92} 93 94// Export fetches export data from PLC directory with rate limiting and retry 95func (c *Client) Export(ctx context.Context, opts ExportOptions) ([]PLCOperation, error) { 96 return c.exportWithRetry(ctx, opts, 5) 97} 98 99// exportWithRetry implements retry logic with exponential backoff for rate limits 100func (c *Client) exportWithRetry(ctx context.Context, opts ExportOptions, maxRetries int) ([]PLCOperation, error) { 101 var lastErr error 102 backoff := 1 * time.Second 103 104 for attempt := 1; attempt <= maxRetries; attempt++ { 105 // Wait for rate limiter token 106 if err := c.rateLimiter.Wait(ctx); err != nil { 107 return nil, err 108 } 109 110 operations, retryAfter, err := c.doExport(ctx, opts) 111 112 if err == nil { 113 return operations, nil 114 } 115 116 lastErr = err 117 118 // Check if it's a rate limit error (429) 119 if retryAfter > 0 { 120 c.logger.Printf("Rate limited by PLC directory, waiting %v before retry %d/%d", 121 retryAfter, attempt, maxRetries) 122 123 select { 124 case <-time.After(retryAfter): 125 continue 126 case <-ctx.Done(): 127 return nil, ctx.Err() 128 } 129 } 130 131 // Other errors - exponential backoff 132 if attempt < maxRetries { 133 c.logger.Printf("Request failed (attempt %d/%d): %v, retrying in %v", 134 attempt, maxRetries, err, backoff) 135 136 select { 137 case <-time.After(backoff): 138 backoff *= 2 // Exponential backoff 139 case <-ctx.Done(): 140 return nil, ctx.Err() 141 } 142 } 143 } 144 145 return nil, fmt.Errorf("failed after %d attempts: %w", maxRetries, lastErr) 146} 147 148// doExport performs the actual HTTP request 149func (c *Client) doExport(ctx context.Context, opts ExportOptions) ([]PLCOperation, time.Duration, error) { 150 url := fmt.Sprintf("%s/export", c.baseURL) 151 152 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 153 if err != nil { 154 return nil, 0, err 155 } 156 157 // Add query parameters 158 q := req.URL.Query() 159 if opts.Count > 0 { 160 q.Add("count", fmt.Sprintf("%d", opts.Count)) 161 } 162 if opts.After != "" { 163 q.Add("after", opts.After) 164 } 165 req.URL.RawQuery = q.Encode() 166 167 resp, err := c.httpClient.Do(req) 168 if err != nil { 169 return nil, 0, fmt.Errorf("request failed: %w", err) 170 } 171 defer resp.Body.Close() 172 173 // Handle rate limiting (429) 174 if resp.StatusCode == http.StatusTooManyRequests { 175 retryAfter := parseRetryAfter(resp) 176 return nil, retryAfter, fmt.Errorf("rate limited (429)") 177 } 178 179 if resp.StatusCode != http.StatusOK { 180 body, _ := io.ReadAll(resp.Body) 181 return nil, 0, fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body)) 182 } 183 184 var operations []PLCOperation 185 186 // PLC export returns newline-delimited JSON 187 scanner := bufio.NewScanner(resp.Body) 188 buf := make([]byte, 0, 64*1024) 189 scanner.Buffer(buf, 1024*1024) 190 191 lineCount := 0 192 for scanner.Scan() { 193 lineCount++ 194 line := scanner.Bytes() 195 196 if len(line) == 0 { 197 continue 198 } 199 200 var op PLCOperation 201 if err := json.Unmarshal(line, &op); err != nil { 202 c.logger.Printf("Warning: failed to parse operation on line %d: %v", lineCount, err) 203 continue 204 } 205 206 // CRITICAL: Store the original raw JSON bytes 207 op.RawJSON = make([]byte, len(line)) 208 copy(op.RawJSON, line) 209 210 operations = append(operations, op) 211 } 212 213 if err := scanner.Err(); err != nil { 214 return nil, 0, fmt.Errorf("error reading response: %w", err) 215 } 216 217 return operations, 0, nil 218} 219 220// parseRetryAfter parses the Retry-After header 221func parseRetryAfter(resp *http.Response) time.Duration { 222 retryAfter := resp.Header.Get("Retry-After") 223 if retryAfter == "" { 224 // Default to 5 minutes if no header 225 return 5 * time.Minute 226 } 227 228 // Try parsing as seconds 229 if seconds, err := strconv.Atoi(retryAfter); err == nil { 230 return time.Duration(seconds) * time.Second 231 } 232 233 // Try parsing as HTTP date 234 if t, err := http.ParseTime(retryAfter); err == nil { 235 return time.Until(t) 236 } 237 238 // Default 239 return 5 * time.Minute 240} 241 242// GetDID fetches a specific DID document from PLC 243func (c *Client) GetDID(ctx context.Context, did string) (*DIDDocument, error) { 244 // Wait for rate limiter 245 if err := c.rateLimiter.Wait(ctx); err != nil { 246 return nil, err 247 } 248 249 url := fmt.Sprintf("%s/%s", c.baseURL, did) 250 251 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 252 if err != nil { 253 return nil, err 254 } 255 256 resp, err := c.httpClient.Do(req) 257 if err != nil { 258 return nil, err 259 } 260 defer resp.Body.Close() 261 262 if resp.StatusCode == http.StatusTooManyRequests { 263 retryAfter := parseRetryAfter(resp) 264 return nil, fmt.Errorf("rate limited, retry after %v", retryAfter) 265 } 266 267 if resp.StatusCode != http.StatusOK { 268 body, _ := io.ReadAll(resp.Body) 269 return nil, fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body)) 270 } 271 272 var doc DIDDocument 273 if err := json.NewDecoder(resp.Body).Decode(&doc); err != nil { 274 return nil, err 275 } 276 277 return &doc, nil 278} 279 280// GetStats returns basic stats about the client 281func (c *Client) GetStats() map[string]interface{} { 282 return map[string]interface{}{ 283 "base_url": c.baseURL, 284 "timeout": c.httpClient.Timeout, 285 } 286} 287 288// GetBaseURL returns the PLC directory base URL 289func (c *Client) GetBaseURL() string { 290 return c.baseURL 291}