A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
1package plc
2
3import (
4 "bufio"
5 "context"
6 "fmt"
7 "io"
8 "log"
9 "net/http"
10 "strconv"
11 "time"
12
13 "github.com/goccy/go-json"
14)
15
16// Client is a client for the PLC directory
17type Client struct {
18 baseURL string
19 httpClient *http.Client
20 rateLimiter *RateLimiter
21 logger Logger
22}
23
24// Logger is a simple logging interface
25type Logger interface {
26 Printf(format string, v ...interface{})
27 Println(v ...interface{})
28}
29
30// defaultLogger uses standard log package
31type defaultLogger struct{}
32
33func (d defaultLogger) Printf(format string, v ...interface{}) {
34 log.Printf(format, v...)
35}
36
37func (d defaultLogger) Println(v ...interface{}) {
38 log.Println(v...)
39}
40
41// ClientOption is a functional option for configuring the Client
42type ClientOption func(*Client)
43
44// WithLogger sets a custom logger
45func WithLogger(logger Logger) ClientOption {
46 return func(c *Client) {
47 c.logger = logger
48 }
49}
50
51// WithTimeout sets a custom HTTP timeout
52func WithTimeout(timeout time.Duration) ClientOption {
53 return func(c *Client) {
54 c.httpClient.Timeout = timeout
55 }
56}
57
58// WithRateLimit sets a custom rate limit (requests per period)
59func WithRateLimit(requestsPerPeriod int, period time.Duration) ClientOption {
60 return func(c *Client) {
61 if c.rateLimiter != nil {
62 c.rateLimiter.Stop()
63 }
64 c.rateLimiter = NewRateLimiter(requestsPerPeriod, period)
65 }
66}
67
68// NewClient creates a new PLC directory client
69// Default: 90 requests per minute, 60 second timeout
70func NewClient(baseURL string, opts ...ClientOption) *Client {
71 c := &Client{
72 baseURL: baseURL,
73 httpClient: &http.Client{
74 Timeout: 60 * time.Second,
75 },
76 rateLimiter: NewRateLimiter(90, time.Minute),
77 logger: defaultLogger{},
78 }
79
80 for _, opt := range opts {
81 opt(c)
82 }
83
84 return c
85}
86
87// Close closes the client and cleans up resources
88func (c *Client) Close() {
89 if c.rateLimiter != nil {
90 c.rateLimiter.Stop()
91 }
92}
93
94// Export fetches export data from PLC directory with rate limiting and retry
95func (c *Client) Export(ctx context.Context, opts ExportOptions) ([]PLCOperation, error) {
96 return c.exportWithRetry(ctx, opts, 5)
97}
98
99// exportWithRetry implements retry logic with exponential backoff for rate limits
100func (c *Client) exportWithRetry(ctx context.Context, opts ExportOptions, maxRetries int) ([]PLCOperation, error) {
101 var lastErr error
102 backoff := 1 * time.Second
103
104 for attempt := 1; attempt <= maxRetries; attempt++ {
105 // Wait for rate limiter token
106 if err := c.rateLimiter.Wait(ctx); err != nil {
107 return nil, err
108 }
109
110 operations, retryAfter, err := c.doExport(ctx, opts)
111
112 if err == nil {
113 return operations, nil
114 }
115
116 lastErr = err
117
118 // Check if it's a rate limit error (429)
119 if retryAfter > 0 {
120 c.logger.Printf("Rate limited by PLC directory, waiting %v before retry %d/%d",
121 retryAfter, attempt, maxRetries)
122
123 select {
124 case <-time.After(retryAfter):
125 continue
126 case <-ctx.Done():
127 return nil, ctx.Err()
128 }
129 }
130
131 // Other errors - exponential backoff
132 if attempt < maxRetries {
133 c.logger.Printf("Request failed (attempt %d/%d): %v, retrying in %v",
134 attempt, maxRetries, err, backoff)
135
136 select {
137 case <-time.After(backoff):
138 backoff *= 2 // Exponential backoff
139 case <-ctx.Done():
140 return nil, ctx.Err()
141 }
142 }
143 }
144
145 return nil, fmt.Errorf("failed after %d attempts: %w", maxRetries, lastErr)
146}
147
148// doExport performs the actual HTTP request
149func (c *Client) doExport(ctx context.Context, opts ExportOptions) ([]PLCOperation, time.Duration, error) {
150 url := fmt.Sprintf("%s/export", c.baseURL)
151
152 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
153 if err != nil {
154 return nil, 0, err
155 }
156
157 // Add query parameters
158 q := req.URL.Query()
159 if opts.Count > 0 {
160 q.Add("count", fmt.Sprintf("%d", opts.Count))
161 }
162 if opts.After != "" {
163 q.Add("after", opts.After)
164 }
165 req.URL.RawQuery = q.Encode()
166
167 resp, err := c.httpClient.Do(req)
168 if err != nil {
169 return nil, 0, fmt.Errorf("request failed: %w", err)
170 }
171 defer resp.Body.Close()
172
173 // Handle rate limiting (429)
174 if resp.StatusCode == http.StatusTooManyRequests {
175 retryAfter := parseRetryAfter(resp)
176 return nil, retryAfter, fmt.Errorf("rate limited (429)")
177 }
178
179 if resp.StatusCode != http.StatusOK {
180 body, _ := io.ReadAll(resp.Body)
181 return nil, 0, fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))
182 }
183
184 var operations []PLCOperation
185
186 // PLC export returns newline-delimited JSON
187 scanner := bufio.NewScanner(resp.Body)
188 buf := make([]byte, 0, 64*1024)
189 scanner.Buffer(buf, 1024*1024)
190
191 lineCount := 0
192 for scanner.Scan() {
193 lineCount++
194 line := scanner.Bytes()
195
196 if len(line) == 0 {
197 continue
198 }
199
200 var op PLCOperation
201 if err := json.Unmarshal(line, &op); err != nil {
202 c.logger.Printf("Warning: failed to parse operation on line %d: %v", lineCount, err)
203 continue
204 }
205
206 // CRITICAL: Store the original raw JSON bytes
207 op.RawJSON = make([]byte, len(line))
208 copy(op.RawJSON, line)
209
210 operations = append(operations, op)
211 }
212
213 if err := scanner.Err(); err != nil {
214 return nil, 0, fmt.Errorf("error reading response: %w", err)
215 }
216
217 return operations, 0, nil
218}
219
220// parseRetryAfter parses the Retry-After header
221func parseRetryAfter(resp *http.Response) time.Duration {
222 retryAfter := resp.Header.Get("Retry-After")
223 if retryAfter == "" {
224 // Default to 5 minutes if no header
225 return 5 * time.Minute
226 }
227
228 // Try parsing as seconds
229 if seconds, err := strconv.Atoi(retryAfter); err == nil {
230 return time.Duration(seconds) * time.Second
231 }
232
233 // Try parsing as HTTP date
234 if t, err := http.ParseTime(retryAfter); err == nil {
235 return time.Until(t)
236 }
237
238 // Default
239 return 5 * time.Minute
240}
241
242// GetDID fetches a specific DID document from PLC
243func (c *Client) GetDID(ctx context.Context, did string) (*DIDDocument, error) {
244 // Wait for rate limiter
245 if err := c.rateLimiter.Wait(ctx); err != nil {
246 return nil, err
247 }
248
249 url := fmt.Sprintf("%s/%s", c.baseURL, did)
250
251 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
252 if err != nil {
253 return nil, err
254 }
255
256 resp, err := c.httpClient.Do(req)
257 if err != nil {
258 return nil, err
259 }
260 defer resp.Body.Close()
261
262 if resp.StatusCode == http.StatusTooManyRequests {
263 retryAfter := parseRetryAfter(resp)
264 return nil, fmt.Errorf("rate limited, retry after %v", retryAfter)
265 }
266
267 if resp.StatusCode != http.StatusOK {
268 body, _ := io.ReadAll(resp.Body)
269 return nil, fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))
270 }
271
272 var doc DIDDocument
273 if err := json.NewDecoder(resp.Body).Decode(&doc); err != nil {
274 return nil, err
275 }
276
277 return &doc, nil
278}
279
280// GetStats returns basic stats about the client
281func (c *Client) GetStats() map[string]interface{} {
282 return map[string]interface{}{
283 "base_url": c.baseURL,
284 "timeout": c.httpClient.Timeout,
285 }
286}
287
288// GetBaseURL returns the PLC directory base URL
289func (c *Client) GetBaseURL() string {
290 return c.baseURL
291}