A SpaceTraders Agent
at main 12 kB view raw
1const std = @import("std"); 2const HTTPClient = std.http.Client; 3const Io = std.Io; 4const json = std.json; 5 6const models = @import("models.zig"); 7 8const TIME_SLEEP_FACTOR: f64 = 0.98; 9 10const log = std.log.scoped(.SpaceTraders); 11 12const Semaphore = struct { 13 mutex: Io.Mutex = .{ .state = .unlocked }, 14 cond: Io.Condition = .{}, 15 /// It is OK to initialise this field to any value. 16 permits: u64 = 0, 17 18 pub fn post(sem: *Semaphore, io: Io) void { 19 sem.mutex.lockUncancelable(io); 20 defer sem.mutex.unlock(io); 21 22 sem.permits += 1; 23 sem.cond.signal(io); 24 } 25 26 pub fn set(sem: *Semaphore, io: Io, permits: u64) void { 27 sem.mutex.lockUncancelable(io); 28 defer sem.mutex.unlock(io); 29 30 sem.permits = permits; 31 sem.cond.signal(io); 32 } 33 34 pub fn wait(sem: *Semaphore, io: Io) !void { 35 sem.mutex.lockUncancelable(io); 36 defer sem.mutex.unlock(io); 37 38 while (sem.permits == 0) 39 try sem.cond.wait(io, &sem.mutex); 40 41 sem.permits -= 1; 42 if (sem.permits > 0) 43 sem.cond.signal(io); 44 } 45 46 pub fn available(sem: *Semaphore, io: Io) u64 { 47 sem.mutex.lockUncancelable(io); 48 defer sem.mutex.unlock(io); 49 50 return sem.permits; 51 } 52}; 53 54pub const Limiter = struct { 55 points: u64, 56 duration: i64, 57 time: ?Io.Timestamp, 58 59 mutex: Io.Mutex = .{ .state = .unlocked }, 60 semaphor: Semaphore, 61 62 pub fn init(opts: struct { points: u54 = 2, duration: i64 = 1000 }) Limiter { 63 return .{ 64 .points = opts.points, 65 .duration = opts.duration, 66 .time = null, 67 .semaphor = Semaphore{ .permits = opts.points }, 68 }; 69 } 70 71 pub fn checkReset(l: *Limiter, io: Io) bool { 72 l.mutex.lock(io) catch return false; 73 defer l.mutex.unlock(io); 74 75 if (l.time) |t| { 76 const dur = t.durationTo(Io.Clock.now(.real, io) catch return false); 77 if (dur.toSeconds() > 0) { 78 l.semaphor.set(io, l.points); 79 l.time = null; 80 return true; 81 } 82 } 83 84 return false; 85 } 86 87 pub fn aquire(l: *Limiter, io: Io) !void { 88 try l.mutex.lock(io); 89 defer l.mutex.unlock(io); 90 91 if (l.time == null) { 92 const now = try Io.Clock.now(.real, io); 93 l.time = now.addDuration(.fromMilliseconds(l.duration)); 94 } 95 96 return l.semaphor.wait(io); 97 } 98 99 pub fn timeToReset(l: *Limiter, io: Io) Io.Duration { 100 if (l.time) |t| { 101 return t.durationTo(Io.Clock.now(.real, io) catch return .zero); 102 } 103 return .zero; 104 } 105 106 pub fn available(l: *Limiter, io: Io) bool { 107 return l.semaphor.available(io) > 0; 108 } 109}; 110 111pub const BurstyLimiter = struct { 112 static: Limiter, 113 burst: Limiter, 114 115 pub fn wait(bl: *BurstyLimiter, io: Io) !void { 116 _ = bl.static.checkReset(io); 117 _ = bl.burst.checkReset(io); 118 119 if (!bl.static.available(io)) { 120 if (bl.burst.available(io)) { 121 log.debug("Using Burst", .{}); 122 try bl.burst.aquire(io); 123 return; 124 } else { 125 log.warn("No request available, waiting", .{}); 126 127 var static = bl.static.checkReset(io); 128 var burst = bl.burst.checkReset(io); 129 130 while (!static and !burst and !bl.static.available(io)) { 131 try io.sleep(bl.static.timeToReset(io), .real); 132 static = bl.static.checkReset(io); 133 burst = bl.burst.checkReset(io); 134 } 135 136 log.debug("sleep done", .{}); 137 138 if (burst) { 139 log.debug("Using Burst", .{}); 140 try bl.burst.aquire(io); 141 return; 142 } 143 } 144 } 145 146 try bl.static.aquire(io); 147 return; 148 } 149}; 150 151pub const AuthType = enum { account, agent, none }; 152 153pub const Auth = struct { 154 account: []const u8 = "", 155 agent: []const u8 = "", 156}; 157 158pub const RequestOptions = struct { 159 method: std.http.Method = .GET, 160 auth: AuthType = .none, 161 body: Body = .empty, 162 163 free_body_after_sending: bool = false, 164 165 pub const Body = union(enum) { 166 empty: void, 167 buffer: []u8, 168 }; 169 170 pub fn authorization(opts: *const RequestOptions, client: *const Client) HTTPClient.Request.Headers.Value { 171 switch (opts.auth) { 172 .account => return .{ .override = client.auth.account }, 173 .agent => return .{ .override = client.auth.agent }, 174 .none => return .{ .omit = {} }, 175 } 176 } 177}; 178 179pub const RequestError = error{ 180 OutOfMemory, 181 InvalidResponse, 182 RateLimiterError, 183} || HTTPClient.RequestError || HTTPClient.Request.ReceiveHeadError || std.Uri.ParseError; 184 185pub fn RawResponse(comptime T: type) type { 186 return Io.Future(RequestError!json.Parsed(T)); 187} 188 189pub fn Response(comptime T: type) type { 190 return RawResponse(models.Wrapper(T)); 191} 192 193pub const Client = struct { 194 allocator: std.mem.Allocator, 195 io: Io, 196 limiter: BurstyLimiter, 197 198 base_url: []const u8, 199 auth: Auth, 200 201 http: HTTPClient, 202 203 // Stats 204 total_requests: std.atomic.Value(u64) = .init(0), 205 successful_requests: std.atomic.Value(u64) = .init(0), 206 total_latency: std.atomic.Value(u64) = .init(0), 207 average_latency: std.atomic.Value(u64) = .init(0), 208 209 pub fn init( 210 allocator: std.mem.Allocator, 211 io: std.Io, 212 opts: struct { 213 base_url: []const u8 = "https://api.spacetraders.io/v2", 214 auth: Auth = .{}, 215 }, 216 ) Client { 217 return .{ 218 .allocator = allocator, 219 .io = io, 220 .limiter = .{ 221 .static = .init(.{}), 222 .burst = .init(.{ .points = 30, .duration = 60_000 }), 223 }, 224 .base_url = opts.base_url, 225 .auth = opts.auth, 226 .http = .{ .allocator = allocator, .io = io }, 227 }; 228 } 229 230 pub fn deinit(client: *Client) void { 231 client.http.deinit(); 232 } 233 234 pub fn get( 235 cl: *Client, 236 comptime T: type, 237 comptime path: []const u8, 238 args: anytype, 239 auth: AuthType, 240 ) !RawResponse(T) { 241 return cl.request(T, path, args, .{ .auth = auth }); 242 } 243 244 pub fn post( 245 cl: *Client, 246 comptime T: type, 247 comptime path: []const u8, 248 args: anytype, 249 body: anytype, 250 auth: AuthType, 251 ) !RawResponse(T) { 252 const buffer = try json.Stringify.valueAlloc(cl.allocator, body, .{}); 253 log.debug("json body: {s}", .{buffer}); 254 return cl.request(T, path, args, .{ 255 .method = .POST, 256 .auth = auth, 257 .body = .{ .buffer = buffer }, 258 .free_body_after_sending = true, 259 }); 260 } 261 262 pub fn patch( 263 cl: *Client, 264 comptime T: type, 265 comptime path: []const u8, 266 args: anytype, 267 body: anytype, 268 auth: AuthType, 269 ) !RawResponse(T) { 270 const buffer = try json.Stringify.valueAlloc(cl.allocator, body, .{}); 271 log.debug("json body: {s}", .{buffer}); 272 return cl.request(T, path, args, .{ 273 .method = .PATCH, 274 .auth = auth, 275 .body = .{ .buffer = buffer }, 276 .free_body_after_sending = true, 277 }); 278 } 279 280 pub fn request( 281 client: *Client, 282 comptime T: type, 283 comptime path: []const u8, 284 args: anytype, 285 opts: RequestOptions, 286 ) !RawResponse(T) { 287 const path_fmt = try std.fmt.allocPrint(client.allocator, path, args); 288 defer client.allocator.free(path_fmt); 289 290 const url = try std.fmt.allocPrint(client.allocator, "{s}{s}", .{ client.base_url, path_fmt }); 291 292 const Wrapper = struct { 293 fn call( 294 cl: *Client, 295 url_param: []const u8, 296 opts_param: RequestOptions, 297 ) RequestError!json.Parsed(T) { 298 defer cl.allocator.free(url_param); 299 cl.limiter.wait(cl.io) catch return error.RateLimiterError; 300 return Client.requestRaw(cl, T, url_param, opts_param); 301 } 302 }; 303 304 return client.io.concurrent( 305 Wrapper.call, 306 .{ client, url, opts }, 307 ); 308 } 309 310 pub fn requestRaw( 311 client: *Client, 312 comptime T: type, 313 url: []const u8, 314 opts: RequestOptions, 315 ) RequestError!json.Parsed(T) { 316 const uri = std.Uri.parse(url) catch |err| { 317 log.err("Error parsing url: {} - url = {s}", .{ err, url }); 318 return err; 319 }; 320 321 var req = try client.http.request(opts.method, uri, .{ 322 .headers = .{ 323 .authorization = opts.authorization(client), 324 .user_agent = .{ .override = "SPACE/0.1" }, 325 .content_type = .{ .override = "application/json" }, 326 }, 327 }); 328 defer req.deinit(); 329 330 log.debug("requesting: {s}", .{uri.path.percent_encoded}); 331 332 const start: ?Io.Timestamp = Io.Clock.now(.real, client.io) catch null; 333 334 switch (opts.body) { 335 .empty => try req.sendBodiless(), 336 .buffer => |body| { 337 try req.sendBodyComplete(body); 338 if (opts.free_body_after_sending) client.allocator.free(body); 339 }, 340 } 341 342 var redirect_buffer: [1024]u8 = undefined; 343 344 var response = try req.receiveHead(&redirect_buffer); 345 const colour = blk: { 346 if (@intFromEnum(response.head.status) >= 200 and @intFromEnum(response.head.status) < 300) { 347 break :blk "\x1b[92m"; 348 } else { 349 break :blk "\x1b[1m\x1b[91m"; 350 } 351 }; 352 353 _ = client.total_requests.fetchAdd(1, .seq_cst); 354 355 if (start) |s| blk: { 356 const latency: u64 = @intCast(s.durationTo(Io.Clock.now(.real, client.io) catch break :blk).toMilliseconds()); 357 const old_average = client.average_latency.load(.seq_cst); 358 var new_average: u64 = 0; 359 360 if (old_average == 0) { 361 new_average = latency; 362 } else { 363 const total_reqs = client.total_requests.load(.seq_cst); 364 new_average = old_average * (total_reqs - 1) / total_reqs + latency / total_reqs; 365 } 366 367 client.average_latency.store(new_average, .seq_cst); 368 _ = client.total_latency.fetchAdd(latency, .seq_cst); 369 370 log.debug("latency: {} ms - average: {}ms", .{ latency, new_average }); 371 } 372 373 log.debug( 374 "\x1b[2m[path = {s}]\x1b[0m received {s}{d} {s}\x1b[0m", 375 .{ url[client.base_url.len..], colour, response.head.status, response.head.reason }, 376 ); 377 378 // var header_iter = response.head.iterateHeaders(); 379 // while (header_iter.next()) |header| { 380 // log.debug("{s}: {s}", .{ header.name, header.value }); 381 // } 382 383 var decompress_buffer: [std.compress.flate.max_window_len]u8 = undefined; 384 var transfer_buffer: [64]u8 = undefined; 385 var decompress: std.http.Decompress = undefined; 386 387 const decompressed_body_reader = response.readerDecompressing(&transfer_buffer, &decompress, &decompress_buffer); 388 389 var json_reader: json.Reader = .init(client.allocator, decompressed_body_reader); 390 defer json_reader.deinit(); 391 392 const result = json.parseFromTokenSource(T, client.allocator, &json_reader, .{ 393 .ignore_unknown_fields = true, 394 }) catch |err| { 395 log.err("Error parsing response: {}", .{err}); 396 397 return RequestError.InvalidResponse; 398 }; 399 400 _ = client.successful_requests.fetchAdd(1, .seq_cst); 401 402 return result; 403 } 404};