A SpaceTraders Agent
1const std = @import("std");
2const HTTPClient = std.http.Client;
3const Io = std.Io;
4const json = std.json;
5
6const models = @import("models.zig");
7
8const TIME_SLEEP_FACTOR: f64 = 0.98;
9
10const log = std.log.scoped(.SpaceTraders);
11
12const Semaphore = struct {
13 mutex: Io.Mutex = .{ .state = .unlocked },
14 cond: Io.Condition = .{},
15 /// It is OK to initialise this field to any value.
16 permits: u64 = 0,
17
18 pub fn post(sem: *Semaphore, io: Io) void {
19 sem.mutex.lockUncancelable(io);
20 defer sem.mutex.unlock(io);
21
22 sem.permits += 1;
23 sem.cond.signal(io);
24 }
25
26 pub fn set(sem: *Semaphore, io: Io, permits: u64) void {
27 sem.mutex.lockUncancelable(io);
28 defer sem.mutex.unlock(io);
29
30 sem.permits = permits;
31 sem.cond.signal(io);
32 }
33
34 pub fn wait(sem: *Semaphore, io: Io) !void {
35 sem.mutex.lockUncancelable(io);
36 defer sem.mutex.unlock(io);
37
38 while (sem.permits == 0)
39 try sem.cond.wait(io, &sem.mutex);
40
41 sem.permits -= 1;
42 if (sem.permits > 0)
43 sem.cond.signal(io);
44 }
45
46 pub fn available(sem: *Semaphore, io: Io) u64 {
47 sem.mutex.lockUncancelable(io);
48 defer sem.mutex.unlock(io);
49
50 return sem.permits;
51 }
52};
53
54pub const Limiter = struct {
55 points: u64,
56 duration: i64,
57 time: ?Io.Timestamp,
58
59 mutex: Io.Mutex = .{ .state = .unlocked },
60 semaphor: Semaphore,
61
62 pub fn init(opts: struct { points: u54 = 2, duration: i64 = 1000 }) Limiter {
63 return .{
64 .points = opts.points,
65 .duration = opts.duration,
66 .time = null,
67 .semaphor = Semaphore{ .permits = opts.points },
68 };
69 }
70
71 pub fn checkReset(l: *Limiter, io: Io) bool {
72 l.mutex.lock(io) catch return false;
73 defer l.mutex.unlock(io);
74
75 if (l.time) |t| {
76 const dur = t.durationTo(Io.Clock.now(.real, io) catch return false);
77 if (dur.toSeconds() > 0) {
78 l.semaphor.set(io, l.points);
79 l.time = null;
80 return true;
81 }
82 }
83
84 return false;
85 }
86
87 pub fn aquire(l: *Limiter, io: Io) !void {
88 try l.mutex.lock(io);
89 defer l.mutex.unlock(io);
90
91 if (l.time == null) {
92 const now = try Io.Clock.now(.real, io);
93 l.time = now.addDuration(.fromMilliseconds(l.duration));
94 }
95
96 return l.semaphor.wait(io);
97 }
98
99 pub fn timeToReset(l: *Limiter, io: Io) Io.Duration {
100 if (l.time) |t| {
101 return t.durationTo(Io.Clock.now(.real, io) catch return .zero);
102 }
103 return .zero;
104 }
105
106 pub fn available(l: *Limiter, io: Io) bool {
107 return l.semaphor.available(io) > 0;
108 }
109};
110
111pub const BurstyLimiter = struct {
112 static: Limiter,
113 burst: Limiter,
114
115 pub fn wait(bl: *BurstyLimiter, io: Io) !void {
116 _ = bl.static.checkReset(io);
117 _ = bl.burst.checkReset(io);
118
119 if (!bl.static.available(io)) {
120 if (bl.burst.available(io)) {
121 log.debug("Using Burst", .{});
122 try bl.burst.aquire(io);
123 return;
124 } else {
125 log.warn("No request available, waiting", .{});
126
127 var static = bl.static.checkReset(io);
128 var burst = bl.burst.checkReset(io);
129
130 while (!static and !burst and !bl.static.available(io)) {
131 try io.sleep(bl.static.timeToReset(io), .real);
132 static = bl.static.checkReset(io);
133 burst = bl.burst.checkReset(io);
134 }
135
136 log.debug("sleep done", .{});
137
138 if (burst) {
139 log.debug("Using Burst", .{});
140 try bl.burst.aquire(io);
141 return;
142 }
143 }
144 }
145
146 try bl.static.aquire(io);
147 return;
148 }
149};
150
151pub const AuthType = enum { account, agent, none };
152
153pub const Auth = struct {
154 account: []const u8 = "",
155 agent: []const u8 = "",
156};
157
158pub const RequestOptions = struct {
159 method: std.http.Method = .GET,
160 auth: AuthType = .none,
161 body: Body = .empty,
162
163 free_body_after_sending: bool = false,
164
165 pub const Body = union(enum) {
166 empty: void,
167 buffer: []u8,
168 };
169
170 pub fn authorization(opts: *const RequestOptions, client: *const Client) HTTPClient.Request.Headers.Value {
171 switch (opts.auth) {
172 .account => return .{ .override = client.auth.account },
173 .agent => return .{ .override = client.auth.agent },
174 .none => return .{ .omit = {} },
175 }
176 }
177};
178
179pub const RequestError = error{
180 OutOfMemory,
181 InvalidResponse,
182 RateLimiterError,
183} || HTTPClient.RequestError || HTTPClient.Request.ReceiveHeadError || std.Uri.ParseError;
184
185pub fn RawResponse(comptime T: type) type {
186 return Io.Future(RequestError!json.Parsed(T));
187}
188
189pub fn Response(comptime T: type) type {
190 return RawResponse(models.Wrapper(T));
191}
192
193pub const Client = struct {
194 allocator: std.mem.Allocator,
195 io: Io,
196 limiter: BurstyLimiter,
197
198 base_url: []const u8,
199 auth: Auth,
200
201 http: HTTPClient,
202
203 // Stats
204 total_requests: std.atomic.Value(u64) = .init(0),
205 successful_requests: std.atomic.Value(u64) = .init(0),
206 total_latency: std.atomic.Value(u64) = .init(0),
207 average_latency: std.atomic.Value(u64) = .init(0),
208
209 pub fn init(
210 allocator: std.mem.Allocator,
211 io: std.Io,
212 opts: struct {
213 base_url: []const u8 = "https://api.spacetraders.io/v2",
214 auth: Auth = .{},
215 },
216 ) Client {
217 return .{
218 .allocator = allocator,
219 .io = io,
220 .limiter = .{
221 .static = .init(.{}),
222 .burst = .init(.{ .points = 30, .duration = 60_000 }),
223 },
224 .base_url = opts.base_url,
225 .auth = opts.auth,
226 .http = .{ .allocator = allocator, .io = io },
227 };
228 }
229
230 pub fn deinit(client: *Client) void {
231 client.http.deinit();
232 }
233
234 pub fn get(
235 cl: *Client,
236 comptime T: type,
237 comptime path: []const u8,
238 args: anytype,
239 auth: AuthType,
240 ) !RawResponse(T) {
241 return cl.request(T, path, args, .{ .auth = auth });
242 }
243
244 pub fn post(
245 cl: *Client,
246 comptime T: type,
247 comptime path: []const u8,
248 args: anytype,
249 body: anytype,
250 auth: AuthType,
251 ) !RawResponse(T) {
252 const buffer = try json.Stringify.valueAlloc(cl.allocator, body, .{});
253 log.debug("json body: {s}", .{buffer});
254 return cl.request(T, path, args, .{
255 .method = .POST,
256 .auth = auth,
257 .body = .{ .buffer = buffer },
258 .free_body_after_sending = true,
259 });
260 }
261
262 pub fn patch(
263 cl: *Client,
264 comptime T: type,
265 comptime path: []const u8,
266 args: anytype,
267 body: anytype,
268 auth: AuthType,
269 ) !RawResponse(T) {
270 const buffer = try json.Stringify.valueAlloc(cl.allocator, body, .{});
271 log.debug("json body: {s}", .{buffer});
272 return cl.request(T, path, args, .{
273 .method = .PATCH,
274 .auth = auth,
275 .body = .{ .buffer = buffer },
276 .free_body_after_sending = true,
277 });
278 }
279
280 pub fn request(
281 client: *Client,
282 comptime T: type,
283 comptime path: []const u8,
284 args: anytype,
285 opts: RequestOptions,
286 ) !RawResponse(T) {
287 const path_fmt = try std.fmt.allocPrint(client.allocator, path, args);
288 defer client.allocator.free(path_fmt);
289
290 const url = try std.fmt.allocPrint(client.allocator, "{s}{s}", .{ client.base_url, path_fmt });
291
292 const Wrapper = struct {
293 fn call(
294 cl: *Client,
295 url_param: []const u8,
296 opts_param: RequestOptions,
297 ) RequestError!json.Parsed(T) {
298 defer cl.allocator.free(url_param);
299 cl.limiter.wait(cl.io) catch return error.RateLimiterError;
300 return Client.requestRaw(cl, T, url_param, opts_param);
301 }
302 };
303
304 return client.io.concurrent(
305 Wrapper.call,
306 .{ client, url, opts },
307 );
308 }
309
310 pub fn requestRaw(
311 client: *Client,
312 comptime T: type,
313 url: []const u8,
314 opts: RequestOptions,
315 ) RequestError!json.Parsed(T) {
316 const uri = std.Uri.parse(url) catch |err| {
317 log.err("Error parsing url: {} - url = {s}", .{ err, url });
318 return err;
319 };
320
321 var req = try client.http.request(opts.method, uri, .{
322 .headers = .{
323 .authorization = opts.authorization(client),
324 .user_agent = .{ .override = "SPACE/0.1" },
325 .content_type = .{ .override = "application/json" },
326 },
327 });
328 defer req.deinit();
329
330 log.debug("requesting: {s}", .{uri.path.percent_encoded});
331
332 const start: ?Io.Timestamp = Io.Clock.now(.real, client.io) catch null;
333
334 switch (opts.body) {
335 .empty => try req.sendBodiless(),
336 .buffer => |body| {
337 try req.sendBodyComplete(body);
338 if (opts.free_body_after_sending) client.allocator.free(body);
339 },
340 }
341
342 var redirect_buffer: [1024]u8 = undefined;
343
344 var response = try req.receiveHead(&redirect_buffer);
345 const colour = blk: {
346 if (@intFromEnum(response.head.status) >= 200 and @intFromEnum(response.head.status) < 300) {
347 break :blk "\x1b[92m";
348 } else {
349 break :blk "\x1b[1m\x1b[91m";
350 }
351 };
352
353 _ = client.total_requests.fetchAdd(1, .seq_cst);
354
355 if (start) |s| blk: {
356 const latency: u64 = @intCast(s.durationTo(Io.Clock.now(.real, client.io) catch break :blk).toMilliseconds());
357 const old_average = client.average_latency.load(.seq_cst);
358 var new_average: u64 = 0;
359
360 if (old_average == 0) {
361 new_average = latency;
362 } else {
363 const total_reqs = client.total_requests.load(.seq_cst);
364 new_average = old_average * (total_reqs - 1) / total_reqs + latency / total_reqs;
365 }
366
367 client.average_latency.store(new_average, .seq_cst);
368 _ = client.total_latency.fetchAdd(latency, .seq_cst);
369
370 log.debug("latency: {} ms - average: {}ms", .{ latency, new_average });
371 }
372
373 log.debug(
374 "\x1b[2m[path = {s}]\x1b[0m received {s}{d} {s}\x1b[0m",
375 .{ url[client.base_url.len..], colour, response.head.status, response.head.reason },
376 );
377
378 // var header_iter = response.head.iterateHeaders();
379 // while (header_iter.next()) |header| {
380 // log.debug("{s}: {s}", .{ header.name, header.value });
381 // }
382
383 var decompress_buffer: [std.compress.flate.max_window_len]u8 = undefined;
384 var transfer_buffer: [64]u8 = undefined;
385 var decompress: std.http.Decompress = undefined;
386
387 const decompressed_body_reader = response.readerDecompressing(&transfer_buffer, &decompress, &decompress_buffer);
388
389 var json_reader: json.Reader = .init(client.allocator, decompressed_body_reader);
390 defer json_reader.deinit();
391
392 const result = json.parseFromTokenSource(T, client.allocator, &json_reader, .{
393 .ignore_unknown_fields = true,
394 }) catch |err| {
395 log.err("Error parsing response: {}", .{err});
396
397 return RequestError.InvalidResponse;
398 };
399
400 _ = client.successful_requests.fetchAdd(1, .seq_cst);
401
402 return result;
403 }
404};