zig async 测试用例
1const std = @import("std");
2const Io = std.Io;
3const IpAddress = std.Io.net.IpAddress;
4
5// 配置参数
6const Config = struct {
7 num_clients: usize = 10, // 并发客户端数
8 messages_per_client: usize = 100, // 每个客户端发送的消息数
9 message_size: usize = 64, // 消息大小(字节)
10};
11
12// 统计数据
13const Stats = struct {
14 total_requests: std.atomic.Value(usize),
15 successful_requests: std.atomic.Value(usize),
16 failed_requests: std.atomic.Value(usize),
17 total_latency_ns: std.atomic.Value(u64),
18 min_latency_ns: std.atomic.Value(u64),
19 max_latency_ns: std.atomic.Value(u64),
20
21 fn init() Stats {
22 return Stats{
23 .total_requests = std.atomic.Value(usize).init(0),
24 .successful_requests = std.atomic.Value(usize).init(0),
25 .failed_requests = std.atomic.Value(usize).init(0),
26 .total_latency_ns = std.atomic.Value(u64).init(0),
27 .min_latency_ns = std.atomic.Value(u64).init(std.math.maxInt(u64)),
28 .max_latency_ns = std.atomic.Value(u64).init(0),
29 };
30 }
31
32 fn updateLatency(self: *Stats, latency_ns: u64) void {
33 // 更新总延迟
34 _ = self.total_latency_ns.fetchAdd(latency_ns, .monotonic);
35
36 // 更新最小延迟
37 var current_min = self.min_latency_ns.load(.monotonic);
38 while (latency_ns < current_min) {
39 if (self.min_latency_ns.cmpxchgWeak(
40 current_min,
41 latency_ns,
42 .monotonic,
43 .monotonic,
44 )) |new_min| {
45 current_min = new_min;
46 } else {
47 break;
48 }
49 }
50
51 // 更新最大延迟
52 var current_max = self.max_latency_ns.load(.monotonic);
53 while (latency_ns > current_max) {
54 if (self.max_latency_ns.cmpxchgWeak(
55 current_max,
56 latency_ns,
57 .monotonic,
58 .monotonic,
59 )) |new_max| {
60 current_max = new_max;
61 } else {
62 break;
63 }
64 }
65 }
66
67 fn recordSuccess(self: *Stats, latency_ns: u64) void {
68 _ = self.successful_requests.fetchAdd(1, .monotonic);
69 self.updateLatency(latency_ns);
70 }
71
72 fn recordFailure(self: *Stats) void {
73 _ = self.failed_requests.fetchAdd(1, .monotonic);
74 }
75
76 fn printResults(self: *Stats, config: Config, elapsed_ns: u64) void {
77 const total = self.total_requests.load(.monotonic);
78 const successful = self.successful_requests.load(.monotonic);
79 const failed = self.failed_requests.load(.monotonic);
80 const total_latency = self.total_latency_ns.load(.monotonic);
81 const min_latency = self.min_latency_ns.load(.monotonic);
82 const max_latency = self.max_latency_ns.load(.monotonic);
83
84 const elapsed_s = @as(f64, @floatFromInt(elapsed_ns)) / 1_000_000_000.0;
85 const success_rate = if (total > 0) @as(f64, @floatFromInt(successful)) / @as(f64, @floatFromInt(total)) * 100.0 else 0.0;
86 const qps = if (elapsed_s > 0) @as(f64, @floatFromInt(successful)) / elapsed_s else 0.0;
87 const avg_latency_ms = if (successful > 0) @as(f64, @floatFromInt(total_latency)) / @as(f64, @floatFromInt(successful)) / 1_000_000.0 else 0.0;
88 const min_latency_ms = @as(f64, @floatFromInt(min_latency)) / 1_000_000.0;
89 const max_latency_ms = @as(f64, @floatFromInt(max_latency)) / 1_000_000.0;
90
91 std.debug.print("\n========================================\n", .{});
92 std.debug.print("性能测试配置\n", .{});
93 std.debug.print("========================================\n", .{});
94 std.debug.print("并发客户端数: {}\n", .{config.num_clients});
95 std.debug.print("每客户端消息数: {}\n", .{config.messages_per_client});
96 std.debug.print("消息大小: {} 字节\n", .{config.message_size});
97 std.debug.print("总请求数: {}\n", .{total});
98
99 std.debug.print("\n========================================\n", .{});
100 std.debug.print("性能测试结果\n", .{});
101 std.debug.print("========================================\n", .{});
102 std.debug.print("总耗时: {d:.3} 秒\n", .{elapsed_s});
103 std.debug.print("成功请求: {}\n", .{successful});
104 std.debug.print("失败请求: {}\n", .{failed});
105 std.debug.print("成功率: {d:.2}%\n", .{success_rate});
106 std.debug.print("\n", .{});
107 std.debug.print("吞吐量: {d:.2} 请求/秒\n", .{qps});
108 std.debug.print("平均延迟: {d:.2} 毫秒\n", .{avg_latency_ms});
109 std.debug.print("最小延迟: {d:.2} 毫秒\n", .{min_latency_ms});
110 std.debug.print("最大延迟: {d:.2} 毫秒\n", .{max_latency_ms});
111 std.debug.print("========================================\n", .{});
112 }
113};
114
115pub fn main() !void {
116 // 使用 GPA 分配器
117 var gpa = std.heap.GeneralPurposeAllocator(.{}){};
118 defer _ = gpa.deinit();
119 const allocator = gpa.allocator();
120
121 // 配置参数
122 const config = Config{
123 .num_clients = 10,
124 .messages_per_client = 100,
125 .message_size = 64,
126 };
127
128 // 初始化统计数据
129 var stats = Stats.init();
130 const total_requests = config.num_clients * config.messages_per_client;
131 stats.total_requests.store(total_requests, .monotonic);
132
133 std.debug.print("========================================\n", .{});
134 std.debug.print("性能测试配置\n", .{});
135 std.debug.print("========================================\n", .{});
136 std.debug.print("并发客户端数: {}\n", .{config.num_clients});
137 std.debug.print("每客户端消息数: {}\n", .{config.messages_per_client});
138 std.debug.print("消息大小: {} 字节\n", .{config.message_size});
139 std.debug.print("总请求数: {}\n", .{total_requests});
140 std.debug.print("\n========================================\n", .{});
141 std.debug.print("测试进行中...\n", .{});
142 std.debug.print("========================================\n", .{});
143
144 // 初始化异步 I/O 上下文
145 var threaded = Io.Threaded.init(allocator);
146 defer threaded.deinit();
147 const io = threaded.io();
148
149 // 记录开始时间
150 const start_instant = try std.time.Instant.now();
151
152 // 启动多个并发客户端任务
153 var futures = try allocator.alloc(Io.Future(void), config.num_clients);
154 defer allocator.free(futures);
155
156 for (0..config.num_clients) |client_id| {
157 futures[client_id] = io.concurrent(runClient, .{ io, config, &stats, client_id }) catch |err| {
158 std.debug.print("启动客户端 {} 失败: {}\n", .{ client_id, err });
159 stats.recordFailure();
160 continue;
161 };
162 }
163
164 // 等待所有任务完成
165 for (futures) |*future| {
166 future.await(io);
167 }
168
169 // 记录结束时间
170 const end_instant = try std.time.Instant.now();
171 const elapsed_ns = end_instant.since(start_instant);
172
173 // 打印统计结果
174 stats.printResults(config, elapsed_ns);
175}
176
177fn runClient(io: Io, config: Config, stats: *Stats, client_id: usize) void {
178 // 连接到服务器
179 const address = IpAddress.parseIp4("127.0.0.1", 8080) catch |err| {
180 std.debug.print("客户端 {}: 解析地址失败: {}\n", .{ client_id, err });
181 for (0..config.messages_per_client) |_| {
182 stats.recordFailure();
183 }
184 return;
185 };
186
187 var stream = address.connect(io, .{ .mode = .stream }) catch |err| {
188 std.debug.print("客户端 {}: 连接失败: {}\n", .{ client_id, err });
189 for (0..config.messages_per_client) |_| {
190 stats.recordFailure();
191 }
192 return;
193 };
194 defer stream.close(io);
195
196 // 获取 reader 和 writer
197 var reader = stream.reader(io, &.{});
198 var writer = stream.writer(io, &.{});
199
200 // 准备消息缓冲区
201 var send_buffer: [256]u8 = undefined;
202 var recv_buffer: [256]u8 = undefined;
203
204 // 循环发送消息
205 for (0..config.messages_per_client) |msg_num| {
206 // 构造消息
207 const message = std.fmt.bufPrint(&send_buffer, "Message {}-{}\n", .{ client_id, msg_num }) catch {
208 std.debug.print("客户端 {}: 构造消息失败\n", .{client_id});
209 stats.recordFailure();
210 continue;
211 };
212
213 // 记录发送时间
214 const send_instant = std.time.Instant.now() catch {
215 std.debug.print("客户端 {}: 获取时间失败\n", .{client_id});
216 stats.recordFailure();
217 continue;
218 };
219
220 // 发送消息
221 std.debug.print("🔵 客户端 {}: 准备发送消息 {} (长度: {}): {s}", .{ client_id, msg_num, message.len, message });
222 writer.interface.writeAll(message) catch |err| {
223 std.debug.print("客户端 {}: 发送消息 {} 失败: {}\n", .{ client_id, msg_num, err });
224 stats.recordFailure();
225 continue;
226 };
227 std.debug.print("🟢 客户端 {}: 消息 {} 已写入缓冲区\n", .{ client_id, msg_num });
228
229 // 接收回显
230 std.debug.print("🟡 客户端 {}: 开始读取回显 {}\n", .{ client_id, msg_num });
231 const bytes_read = reader.interface.readSliceShort(&recv_buffer) catch |err| {
232 std.debug.print("🔴 客户端 {}: 接收消息 {} 失败: {}\n", .{ client_id, msg_num, err });
233 stats.recordFailure();
234 continue;
235 };
236 std.debug.print("🟢 客户端 {}: 消息 {} 读取到 {} 字节\n", .{ client_id, msg_num, bytes_read });
237
238 // 记录接收时间
239 const recv_instant = std.time.Instant.now() catch {
240 std.debug.print("客户端 {}: 获取时间失败\n", .{client_id});
241 stats.recordFailure();
242 continue;
243 };
244 const latency_ns = recv_instant.since(send_instant);
245
246 // 验证回显内容
247 const received = recv_buffer[0..bytes_read];
248 if (!std.mem.eql(u8, message, received)) {
249 std.debug.print("客户端 {}: 消息 {} 验证失败\n", .{ client_id, msg_num });
250 std.debug.print(" 期望: {s}", .{message});
251 std.debug.print(" 收到: {s}", .{received});
252 stats.recordFailure();
253 continue;
254 }
255
256 // 记录成功
257 stats.recordSuccess(latency_ns);
258 }
259
260 std.debug.print("客户端 {} 完成\n", .{client_id});
261}