zig async 测试用例
at main 261 lines 10 kB view raw
1const std = @import("std"); 2const Io = std.Io; 3const IpAddress = std.Io.net.IpAddress; 4 5// 配置参数 6const Config = struct { 7 num_clients: usize = 10, // 并发客户端数 8 messages_per_client: usize = 100, // 每个客户端发送的消息数 9 message_size: usize = 64, // 消息大小(字节) 10}; 11 12// 统计数据 13const Stats = struct { 14 total_requests: std.atomic.Value(usize), 15 successful_requests: std.atomic.Value(usize), 16 failed_requests: std.atomic.Value(usize), 17 total_latency_ns: std.atomic.Value(u64), 18 min_latency_ns: std.atomic.Value(u64), 19 max_latency_ns: std.atomic.Value(u64), 20 21 fn init() Stats { 22 return Stats{ 23 .total_requests = std.atomic.Value(usize).init(0), 24 .successful_requests = std.atomic.Value(usize).init(0), 25 .failed_requests = std.atomic.Value(usize).init(0), 26 .total_latency_ns = std.atomic.Value(u64).init(0), 27 .min_latency_ns = std.atomic.Value(u64).init(std.math.maxInt(u64)), 28 .max_latency_ns = std.atomic.Value(u64).init(0), 29 }; 30 } 31 32 fn updateLatency(self: *Stats, latency_ns: u64) void { 33 // 更新总延迟 34 _ = self.total_latency_ns.fetchAdd(latency_ns, .monotonic); 35 36 // 更新最小延迟 37 var current_min = self.min_latency_ns.load(.monotonic); 38 while (latency_ns < current_min) { 39 if (self.min_latency_ns.cmpxchgWeak( 40 current_min, 41 latency_ns, 42 .monotonic, 43 .monotonic, 44 )) |new_min| { 45 current_min = new_min; 46 } else { 47 break; 48 } 49 } 50 51 // 更新最大延迟 52 var current_max = self.max_latency_ns.load(.monotonic); 53 while (latency_ns > current_max) { 54 if (self.max_latency_ns.cmpxchgWeak( 55 current_max, 56 latency_ns, 57 .monotonic, 58 .monotonic, 59 )) |new_max| { 60 current_max = new_max; 61 } else { 62 break; 63 } 64 } 65 } 66 67 fn recordSuccess(self: *Stats, latency_ns: u64) void { 68 _ = self.successful_requests.fetchAdd(1, .monotonic); 69 self.updateLatency(latency_ns); 70 } 71 72 fn recordFailure(self: *Stats) void { 73 _ = self.failed_requests.fetchAdd(1, .monotonic); 74 } 75 76 fn printResults(self: *Stats, config: Config, elapsed_ns: u64) void { 77 const total = self.total_requests.load(.monotonic); 78 const successful = self.successful_requests.load(.monotonic); 79 const failed = self.failed_requests.load(.monotonic); 80 const total_latency = self.total_latency_ns.load(.monotonic); 81 const min_latency = self.min_latency_ns.load(.monotonic); 82 const max_latency = self.max_latency_ns.load(.monotonic); 83 84 const elapsed_s = @as(f64, @floatFromInt(elapsed_ns)) / 1_000_000_000.0; 85 const success_rate = if (total > 0) @as(f64, @floatFromInt(successful)) / @as(f64, @floatFromInt(total)) * 100.0 else 0.0; 86 const qps = if (elapsed_s > 0) @as(f64, @floatFromInt(successful)) / elapsed_s else 0.0; 87 const avg_latency_ms = if (successful > 0) @as(f64, @floatFromInt(total_latency)) / @as(f64, @floatFromInt(successful)) / 1_000_000.0 else 0.0; 88 const min_latency_ms = @as(f64, @floatFromInt(min_latency)) / 1_000_000.0; 89 const max_latency_ms = @as(f64, @floatFromInt(max_latency)) / 1_000_000.0; 90 91 std.debug.print("\n========================================\n", .{}); 92 std.debug.print("性能测试配置\n", .{}); 93 std.debug.print("========================================\n", .{}); 94 std.debug.print("并发客户端数: {}\n", .{config.num_clients}); 95 std.debug.print("每客户端消息数: {}\n", .{config.messages_per_client}); 96 std.debug.print("消息大小: {} 字节\n", .{config.message_size}); 97 std.debug.print("总请求数: {}\n", .{total}); 98 99 std.debug.print("\n========================================\n", .{}); 100 std.debug.print("性能测试结果\n", .{}); 101 std.debug.print("========================================\n", .{}); 102 std.debug.print("总耗时: {d:.3} 秒\n", .{elapsed_s}); 103 std.debug.print("成功请求: {}\n", .{successful}); 104 std.debug.print("失败请求: {}\n", .{failed}); 105 std.debug.print("成功率: {d:.2}%\n", .{success_rate}); 106 std.debug.print("\n", .{}); 107 std.debug.print("吞吐量: {d:.2} 请求/秒\n", .{qps}); 108 std.debug.print("平均延迟: {d:.2} 毫秒\n", .{avg_latency_ms}); 109 std.debug.print("最小延迟: {d:.2} 毫秒\n", .{min_latency_ms}); 110 std.debug.print("最大延迟: {d:.2} 毫秒\n", .{max_latency_ms}); 111 std.debug.print("========================================\n", .{}); 112 } 113}; 114 115pub fn main() !void { 116 // 使用 GPA 分配器 117 var gpa = std.heap.GeneralPurposeAllocator(.{}){}; 118 defer _ = gpa.deinit(); 119 const allocator = gpa.allocator(); 120 121 // 配置参数 122 const config = Config{ 123 .num_clients = 10, 124 .messages_per_client = 100, 125 .message_size = 64, 126 }; 127 128 // 初始化统计数据 129 var stats = Stats.init(); 130 const total_requests = config.num_clients * config.messages_per_client; 131 stats.total_requests.store(total_requests, .monotonic); 132 133 std.debug.print("========================================\n", .{}); 134 std.debug.print("性能测试配置\n", .{}); 135 std.debug.print("========================================\n", .{}); 136 std.debug.print("并发客户端数: {}\n", .{config.num_clients}); 137 std.debug.print("每客户端消息数: {}\n", .{config.messages_per_client}); 138 std.debug.print("消息大小: {} 字节\n", .{config.message_size}); 139 std.debug.print("总请求数: {}\n", .{total_requests}); 140 std.debug.print("\n========================================\n", .{}); 141 std.debug.print("测试进行中...\n", .{}); 142 std.debug.print("========================================\n", .{}); 143 144 // 初始化异步 I/O 上下文 145 var threaded = Io.Threaded.init(allocator); 146 defer threaded.deinit(); 147 const io = threaded.io(); 148 149 // 记录开始时间 150 const start_instant = try std.time.Instant.now(); 151 152 // 启动多个并发客户端任务 153 var futures = try allocator.alloc(Io.Future(void), config.num_clients); 154 defer allocator.free(futures); 155 156 for (0..config.num_clients) |client_id| { 157 futures[client_id] = io.concurrent(runClient, .{ io, config, &stats, client_id }) catch |err| { 158 std.debug.print("启动客户端 {} 失败: {}\n", .{ client_id, err }); 159 stats.recordFailure(); 160 continue; 161 }; 162 } 163 164 // 等待所有任务完成 165 for (futures) |*future| { 166 future.await(io); 167 } 168 169 // 记录结束时间 170 const end_instant = try std.time.Instant.now(); 171 const elapsed_ns = end_instant.since(start_instant); 172 173 // 打印统计结果 174 stats.printResults(config, elapsed_ns); 175} 176 177fn runClient(io: Io, config: Config, stats: *Stats, client_id: usize) void { 178 // 连接到服务器 179 const address = IpAddress.parseIp4("127.0.0.1", 8080) catch |err| { 180 std.debug.print("客户端 {}: 解析地址失败: {}\n", .{ client_id, err }); 181 for (0..config.messages_per_client) |_| { 182 stats.recordFailure(); 183 } 184 return; 185 }; 186 187 var stream = address.connect(io, .{ .mode = .stream }) catch |err| { 188 std.debug.print("客户端 {}: 连接失败: {}\n", .{ client_id, err }); 189 for (0..config.messages_per_client) |_| { 190 stats.recordFailure(); 191 } 192 return; 193 }; 194 defer stream.close(io); 195 196 // 获取 reader 和 writer 197 var reader = stream.reader(io, &.{}); 198 var writer = stream.writer(io, &.{}); 199 200 // 准备消息缓冲区 201 var send_buffer: [256]u8 = undefined; 202 var recv_buffer: [256]u8 = undefined; 203 204 // 循环发送消息 205 for (0..config.messages_per_client) |msg_num| { 206 // 构造消息 207 const message = std.fmt.bufPrint(&send_buffer, "Message {}-{}\n", .{ client_id, msg_num }) catch { 208 std.debug.print("客户端 {}: 构造消息失败\n", .{client_id}); 209 stats.recordFailure(); 210 continue; 211 }; 212 213 // 记录发送时间 214 const send_instant = std.time.Instant.now() catch { 215 std.debug.print("客户端 {}: 获取时间失败\n", .{client_id}); 216 stats.recordFailure(); 217 continue; 218 }; 219 220 // 发送消息 221 std.debug.print("🔵 客户端 {}: 准备发送消息 {} (长度: {}): {s}", .{ client_id, msg_num, message.len, message }); 222 writer.interface.writeAll(message) catch |err| { 223 std.debug.print("客户端 {}: 发送消息 {} 失败: {}\n", .{ client_id, msg_num, err }); 224 stats.recordFailure(); 225 continue; 226 }; 227 std.debug.print("🟢 客户端 {}: 消息 {} 已写入缓冲区\n", .{ client_id, msg_num }); 228 229 // 接收回显 230 std.debug.print("🟡 客户端 {}: 开始读取回显 {}\n", .{ client_id, msg_num }); 231 const bytes_read = reader.interface.readSliceShort(&recv_buffer) catch |err| { 232 std.debug.print("🔴 客户端 {}: 接收消息 {} 失败: {}\n", .{ client_id, msg_num, err }); 233 stats.recordFailure(); 234 continue; 235 }; 236 std.debug.print("🟢 客户端 {}: 消息 {} 读取到 {} 字节\n", .{ client_id, msg_num, bytes_read }); 237 238 // 记录接收时间 239 const recv_instant = std.time.Instant.now() catch { 240 std.debug.print("客户端 {}: 获取时间失败\n", .{client_id}); 241 stats.recordFailure(); 242 continue; 243 }; 244 const latency_ns = recv_instant.since(send_instant); 245 246 // 验证回显内容 247 const received = recv_buffer[0..bytes_read]; 248 if (!std.mem.eql(u8, message, received)) { 249 std.debug.print("客户端 {}: 消息 {} 验证失败\n", .{ client_id, msg_num }); 250 std.debug.print(" 期望: {s}", .{message}); 251 std.debug.print(" 收到: {s}", .{received}); 252 stats.recordFailure(); 253 continue; 254 } 255 256 // 记录成功 257 stats.recordSuccess(latency_ns); 258 } 259 260 std.debug.print("客户端 {} 完成\n", .{client_id}); 261}