const std = @import("std"); const HTTPClient = std.http.Client; const Io = std.Io; const json = std.json; const models = @import("models.zig"); const TIME_SLEEP_FACTOR: f64 = 0.98; const log = std.log.scoped(.SpaceTraders); const Semaphore = struct { mutex: Io.Mutex = .{ .state = .unlocked }, cond: Io.Condition = .{}, /// It is OK to initialise this field to any value. permits: u64 = 0, pub fn post(sem: *Semaphore, io: Io) void { sem.mutex.lockUncancelable(io); defer sem.mutex.unlock(io); sem.permits += 1; sem.cond.signal(io); } pub fn set(sem: *Semaphore, io: Io, permits: u64) void { sem.mutex.lockUncancelable(io); defer sem.mutex.unlock(io); sem.permits = permits; sem.cond.signal(io); } pub fn wait(sem: *Semaphore, io: Io) !void { sem.mutex.lockUncancelable(io); defer sem.mutex.unlock(io); while (sem.permits == 0) try sem.cond.wait(io, &sem.mutex); sem.permits -= 1; if (sem.permits > 0) sem.cond.signal(io); } pub fn available(sem: *Semaphore, io: Io) u64 { sem.mutex.lockUncancelable(io); defer sem.mutex.unlock(io); return sem.permits; } }; pub const Limiter = struct { points: u64, duration: i64, time: ?Io.Timestamp, mutex: Io.Mutex = .{ .state = .unlocked }, semaphor: Semaphore, pub fn init(opts: struct { points: u54 = 2, duration: i64 = 1000 }) Limiter { return .{ .points = opts.points, .duration = opts.duration, .time = null, .semaphor = Semaphore{ .permits = opts.points }, }; } pub fn checkReset(l: *Limiter, io: Io) bool { l.mutex.lock(io) catch return false; defer l.mutex.unlock(io); if (l.time) |t| { const dur = t.durationTo(Io.Clock.now(.real, io) catch return false); if (dur.toSeconds() > 0) { l.semaphor.set(io, l.points); l.time = null; return true; } } return false; } pub fn aquire(l: *Limiter, io: Io) !void { try l.mutex.lock(io); defer l.mutex.unlock(io); if (l.time == null) { const now = try Io.Clock.now(.real, io); l.time = now.addDuration(.fromMilliseconds(l.duration)); } return l.semaphor.wait(io); } pub fn timeToReset(l: *Limiter, io: Io) Io.Duration { if (l.time) |t| { return t.durationTo(Io.Clock.now(.real, io) catch return .zero); } return .zero; } pub fn available(l: *Limiter, io: Io) bool { return l.semaphor.available(io) > 0; } }; pub const BurstyLimiter = struct { static: Limiter, burst: Limiter, pub fn wait(bl: *BurstyLimiter, io: Io) !void { _ = bl.static.checkReset(io); _ = bl.burst.checkReset(io); if (!bl.static.available(io)) { if (bl.burst.available(io)) { log.debug("Using Burst", .{}); try bl.burst.aquire(io); return; } else { log.warn("No request available, waiting", .{}); var static = bl.static.checkReset(io); var burst = bl.burst.checkReset(io); while (!static and !burst and !bl.static.available(io)) { try io.sleep(bl.static.timeToReset(io), .real); static = bl.static.checkReset(io); burst = bl.burst.checkReset(io); } log.debug("sleep done", .{}); if (burst) { log.debug("Using Burst", .{}); try bl.burst.aquire(io); return; } } } try bl.static.aquire(io); return; } }; pub const AuthType = enum { account, agent, none }; pub const Auth = struct { account: []const u8 = "", agent: []const u8 = "", }; pub const RequestOptions = struct { method: std.http.Method = .GET, auth: AuthType = .none, body: Body = .empty, free_body_after_sending: bool = false, pub const Body = union(enum) { empty: void, buffer: []u8, }; pub fn authorization(opts: *const RequestOptions, client: *const Client) HTTPClient.Request.Headers.Value { switch (opts.auth) { .account => return .{ .override = client.auth.account }, .agent => return .{ .override = client.auth.agent }, .none => return .{ .omit = {} }, } } }; pub const RequestError = error{ OutOfMemory, InvalidResponse, RateLimiterError, } || HTTPClient.RequestError || HTTPClient.Request.ReceiveHeadError || std.Uri.ParseError; pub fn RawResponse(comptime T: type) type { return Io.Future(RequestError!json.Parsed(T)); } pub fn Response(comptime T: type) type { return RawResponse(models.Wrapper(T)); } pub const Client = struct { allocator: std.mem.Allocator, io: Io, limiter: BurstyLimiter, base_url: []const u8, auth: Auth, http: HTTPClient, // Stats total_requests: std.atomic.Value(u64) = .init(0), successful_requests: std.atomic.Value(u64) = .init(0), total_latency: std.atomic.Value(u64) = .init(0), average_latency: std.atomic.Value(u64) = .init(0), pub fn init( allocator: std.mem.Allocator, io: std.Io, opts: struct { base_url: []const u8 = "https://api.spacetraders.io/v2", auth: Auth = .{}, }, ) Client { return .{ .allocator = allocator, .io = io, .limiter = .{ .static = .init(.{}), .burst = .init(.{ .points = 30, .duration = 60_000 }), }, .base_url = opts.base_url, .auth = opts.auth, .http = .{ .allocator = allocator, .io = io }, }; } pub fn deinit(client: *Client) void { client.http.deinit(); } pub fn get( cl: *Client, comptime T: type, comptime path: []const u8, args: anytype, auth: AuthType, ) !RawResponse(T) { return cl.request(T, path, args, .{ .auth = auth }); } pub fn post( cl: *Client, comptime T: type, comptime path: []const u8, args: anytype, body: anytype, auth: AuthType, ) !RawResponse(T) { const buffer = try json.Stringify.valueAlloc(cl.allocator, body, .{}); log.debug("json body: {s}", .{buffer}); return cl.request(T, path, args, .{ .method = .POST, .auth = auth, .body = .{ .buffer = buffer }, .free_body_after_sending = true, }); } pub fn patch( cl: *Client, comptime T: type, comptime path: []const u8, args: anytype, body: anytype, auth: AuthType, ) !RawResponse(T) { const buffer = try json.Stringify.valueAlloc(cl.allocator, body, .{}); log.debug("json body: {s}", .{buffer}); return cl.request(T, path, args, .{ .method = .PATCH, .auth = auth, .body = .{ .buffer = buffer }, .free_body_after_sending = true, }); } pub fn request( client: *Client, comptime T: type, comptime path: []const u8, args: anytype, opts: RequestOptions, ) !RawResponse(T) { const path_fmt = try std.fmt.allocPrint(client.allocator, path, args); defer client.allocator.free(path_fmt); const url = try std.fmt.allocPrint(client.allocator, "{s}{s}", .{ client.base_url, path_fmt }); const Wrapper = struct { fn call( cl: *Client, url_param: []const u8, opts_param: RequestOptions, ) RequestError!json.Parsed(T) { defer cl.allocator.free(url_param); cl.limiter.wait(cl.io) catch return error.RateLimiterError; return Client.requestRaw(cl, T, url_param, opts_param); } }; return client.io.concurrent( Wrapper.call, .{ client, url, opts }, ); } pub fn requestRaw( client: *Client, comptime T: type, url: []const u8, opts: RequestOptions, ) RequestError!json.Parsed(T) { const uri = std.Uri.parse(url) catch |err| { log.err("Error parsing url: {} - url = {s}", .{ err, url }); return err; }; var req = try client.http.request(opts.method, uri, .{ .headers = .{ .authorization = opts.authorization(client), .user_agent = .{ .override = "SPACE/0.1" }, .content_type = .{ .override = "application/json" }, }, }); defer req.deinit(); log.debug("requesting: {s}", .{uri.path.percent_encoded}); const start: ?Io.Timestamp = Io.Clock.now(.real, client.io) catch null; switch (opts.body) { .empty => try req.sendBodiless(), .buffer => |body| { try req.sendBodyComplete(body); if (opts.free_body_after_sending) client.allocator.free(body); }, } var redirect_buffer: [1024]u8 = undefined; var response = try req.receiveHead(&redirect_buffer); const colour = blk: { if (@intFromEnum(response.head.status) >= 200 and @intFromEnum(response.head.status) < 300) { break :blk "\x1b[92m"; } else { break :blk "\x1b[1m\x1b[91m"; } }; _ = client.total_requests.fetchAdd(1, .seq_cst); if (start) |s| blk: { const latency: u64 = @intCast(s.durationTo(Io.Clock.now(.real, client.io) catch break :blk).toMilliseconds()); const old_average = client.average_latency.load(.seq_cst); var new_average: u64 = 0; if (old_average == 0) { new_average = latency; } else { const total_reqs = client.total_requests.load(.seq_cst); new_average = old_average * (total_reqs - 1) / total_reqs + latency / total_reqs; } client.average_latency.store(new_average, .seq_cst); _ = client.total_latency.fetchAdd(latency, .seq_cst); log.debug("latency: {} ms - average: {}ms", .{ latency, new_average }); } log.debug( "\x1b[2m[path = {s}]\x1b[0m received {s}{d} {s}\x1b[0m", .{ url[client.base_url.len..], colour, response.head.status, response.head.reason }, ); // var header_iter = response.head.iterateHeaders(); // while (header_iter.next()) |header| { // log.debug("{s}: {s}", .{ header.name, header.value }); // } var decompress_buffer: [std.compress.flate.max_window_len]u8 = undefined; var transfer_buffer: [64]u8 = undefined; var decompress: std.http.Decompress = undefined; const decompressed_body_reader = response.readerDecompressing(&transfer_buffer, &decompress, &decompress_buffer); var json_reader: json.Reader = .init(client.allocator, decompressed_body_reader); defer json_reader.deinit(); const result = json.parseFromTokenSource(T, client.allocator, &json_reader, .{ .ignore_unknown_fields = true, }) catch |err| { log.err("Error parsing response: {}", .{err}); return RequestError.InvalidResponse; }; _ = client.successful_requests.fetchAdd(1, .seq_cst); return result; } };