upstream: https://github.com/janestreet/memtrace
1open Stdlib_shim
2(* This is the implementation of the encoder/decoder for the memtrace
3 format. This format is quite involved, and to understand it it's
4 best to read the CTF specification and comments in memtrace.tsl
5 first. *)
6
7(* Increment this when the format changes in an incompatible way *)
8(* Version 2: added context field to trace_info event
9 Version 3: added domain field to packet header *)
10let memtrace_version = 3
11
12(* If this is true, then all backtraces are immediately decoded and
13 verified after encoding. This is slow, but helpful for debugging. *)
14let cache_enable_debug = false
15
16open Buf
17
18exception Parse_error of string
19
20let () =
21 (Printexc.register_printer [@ocaml.alert "-unsafe_multidomain"]) (function
22 | Parse_error s -> Some ("malformed trace: " ^ s)
23 | _ -> None)
24
25let[@inline never] bad_format s = raise (Parse_error s)
26let[@inline never] bad_formatf f = Fmt.kstr bad_format f
27let check_fmt s b = if not b then bad_format s
28
29(* Utility types *)
30
31(* Time since the epoch *)
32module Timestamp = struct
33 type t = int64
34
35 let of_int64 t = t
36 let to_int64 t = t
37 let to_float t = Int64.to_float t /. 1_000_000.
38 let of_float f = f *. 1_000_000. |> Int64.of_float
39 let now () = of_float (Unix.gettimeofday ())
40end
41
42(* Time since the start of the trace *)
43module Timedelta = struct
44 type t = int64
45
46 let to_int64 t = t
47 let offset = Int64.add
48end
49
50module Int_tbl = Hashtbl.Make_seeded_portable (struct
51 type t = int
52
53 let hash _seed (id : t) =
54 let h = id * 189696287 in
55 h lxor (h lsr 23)
56
57 (* Required for OCaml >= 5.0.0, but causes errors for older compilers
58 because it is an unused value declaration. *)
59 let seeded_hash = hash
60 let equal (a : t) (b : t) = a = b
61end)
62
63module Domain_id = struct
64 type t = int
65
66 module Tbl = Int_tbl
67
68 module Expert = struct
69 let of_int x = x
70 end
71
72 let main_domain = Expert.of_int 0
73end
74
75(** CTF packet headers *)
76
77(* Small enough that Unix.write still does single writes.
78 (i.e. below 64k) *)
79let max_packet_size = 1 lsl 15
80
81type packet_header_info = {
82 content_size : int; (* bytes, excluding header *)
83 time_begin : Timestamp.t;
84 time_end : Timestamp.t;
85 alloc_id_begin : Int64.t;
86 alloc_id_end : Int64.t;
87 pid : Int64.t;
88 version : int;
89 domain : int;
90 cache_verifier : Backtrace_codec.Reader.cache_verifier;
91}
92
93(* When writing a packet, some fields can be filled in only once the
94 packet is complete. *)
95type ctf_header_offsets = {
96 off_packet_size : Write.position_32;
97 off_timestamp_begin : Write.position_64;
98 off_timestamp_end : Write.position_64;
99 off_flush_duration : Write.position_32;
100 off_alloc_begin : Write.position_64;
101 off_alloc_end : Write.position_64;
102}
103
104let put_ctf_header b ~pid ~domain ~cache =
105 let open Write in
106 put_32 b 0xc1fc1fc1l;
107 let off_packet_size = skip_32 b in
108 let off_timestamp_begin = skip_64 b in
109 let off_timestamp_end = skip_64 b in
110 let off_flush_duration = skip_32 b in
111 put_16 b memtrace_version;
112 put_64 b pid;
113 put_16 b domain;
114 (match cache with
115 | Some c -> Backtrace_codec.Writer.put_cache_verifier c b
116 | None -> Backtrace_codec.Writer.put_dummy_verifier b);
117 let off_alloc_begin = skip_64 b in
118 let off_alloc_end = skip_64 b in
119 {
120 off_packet_size;
121 off_timestamp_begin;
122 off_timestamp_end;
123 off_flush_duration;
124 off_alloc_begin;
125 off_alloc_end;
126 }
127
128let finish_ctf_header hdr b ~timestamp_begin ~timestamp_end ~alloc_id_begin
129 ~alloc_id_end =
130 let open Write in
131 let size = b.pos in
132 update_32 b hdr.off_packet_size (Int32.mul (Int32.of_int size) 8l);
133 update_64 b hdr.off_timestamp_begin timestamp_begin;
134 update_64 b hdr.off_timestamp_end timestamp_end;
135 update_32 b hdr.off_flush_duration 0l;
136 update_64 b hdr.off_alloc_begin (Int64.of_int alloc_id_begin);
137 update_64 b hdr.off_alloc_end (Int64.of_int alloc_id_end)
138
139let read_ctf_header b =
140 let open Read in
141 let start = b.pos in
142 let magic = get_32 b in
143 let packet_size = get_32 b in
144 let time_begin = get_64 b in
145 let time_end = get_64 b in
146 let _flush_duration = get_32 b in
147 let version = get_16 b in
148 let pid = get_64 b in
149 let domain = if version >= 3 then get_16 b else 0 in
150 let cache_verifier = Backtrace_codec.Reader.get_cache_verifier b in
151 let alloc_id_begin = get_64 b in
152 let alloc_id_end = get_64 b in
153 check_fmt "Not a CTF packet" (magic = 0xc1fc1fc1l);
154 if version > memtrace_version then
155 bad_formatf "trace format v%03d, but expected v%03d" version
156 memtrace_version;
157 check_fmt "Bad packet size" (packet_size >= 0l);
158 check_fmt "Monotone packet timestamps" (time_begin <= time_end);
159 check_fmt "Monotone alloc IDs" (alloc_id_begin <= alloc_id_end);
160 let header_size = b.pos - start in
161 {
162 content_size = Int32.(to_int (div packet_size 8l) - header_size);
163 time_begin;
164 time_end;
165 alloc_id_begin;
166 alloc_id_end;
167 pid;
168 domain;
169 version;
170 cache_verifier;
171 }
172
173(** Event headers *)
174
175type evcode =
176 | Ev_trace_info
177 | Ev_location
178 | Ev_alloc
179 | Ev_promote
180 | Ev_collect
181 | Ev_short_alloc of int
182
183let event_code = function
184 | Ev_trace_info -> 0
185 | Ev_location -> 1
186 | Ev_alloc -> 2
187 | Ev_promote -> 3
188 | Ev_collect -> 4
189 | Ev_short_alloc n ->
190 assert (1 <= n && n <= 16);
191 100 + n
192
193let event_of_code = function
194 | 0 -> Ev_trace_info
195 | 1 -> Ev_location
196 | 2 -> Ev_alloc
197 | 3 -> Ev_promote
198 | 4 -> Ev_collect
199 | n when 101 <= n && n <= 116 -> Ev_short_alloc (n - 100)
200 | c -> bad_format ("Unknown event code " ^ string_of_int c)
201
202let event_header_time_len = 25
203let event_header_time_mask = 0x1ffffffl
204
205(* NB: packet_max_time is less than (1 lsl event_header_time_len) microsecs *)
206let packet_max_time = 30 * 1_000_000
207
208let put_event_header b ev time =
209 let open Write in
210 let code =
211 Int32.(
212 logor
213 (shift_left (of_int (event_code ev)) event_header_time_len)
214 (logand (Int64.to_int32 time) event_header_time_mask))
215 in
216 put_32 b code
217
218let[@inline] read_event_header info b =
219 let open Read in
220 let code = get_32 b in
221 let start_low =
222 Int32.logand event_header_time_mask (Int64.to_int32 info.time_begin)
223 in
224 let time_low = Int32.logand event_header_time_mask code in
225 let time_low =
226 if time_low < start_low then (* Overflow *)
227 Int32.(add time_low (of_int (1 lsl event_header_time_len)))
228 else time_low
229 in
230 let time =
231 Int64.(
232 add
233 (logand info.time_begin (lognot (of_int32 event_header_time_mask)))
234 (of_int32 time_low))
235 in
236 check_fmt "time in packet bounds"
237 (info.time_begin <= time && time <= info.time_end);
238 let ev =
239 event_of_code
240 Int32.(to_int (shift_right_logical code event_header_time_len))
241 in
242 (ev, time)
243
244module Location = Location_codec.Location
245
246module Obj_id = struct
247 type t = int
248
249 module Tbl = Int_tbl
250
251 module Expert = struct
252 let of_int t = t
253 end
254
255 module Allocator = struct
256 type nonrec t = {
257 global_ids : t Atomic.t;
258 mutable start_id : t; (* first object ID this packet *)
259 mutable next_id : t; (* next object ID in this packet *)
260 mutable last_id : t; (* object ID at which we need to reallocate *)
261 }
262
263 let has_next t = t.next_id < t.last_id
264
265 let read_next_exn t =
266 if t.next_id = t.last_id then
267 failwith "Obj_id.Allocator.next_exn: exhausted";
268 t.next_id
269
270 let take_next_exn t =
271 let id = read_next_exn t in
272 t.next_id <- id + 1;
273 id
274
275 let ids_per_chunk = Atomic.make 10_000
276
277 let new_packet t =
278 if not (has_next t) then (
279 let ids_per_chunk = Atomic.get ids_per_chunk in
280 t.next_id <- Atomic.fetch_and_add t.global_ids ids_per_chunk;
281 t.last_id <- t.next_id + ids_per_chunk);
282 t.start_id <- t.next_id
283
284 let of_global_ids global_ids =
285 let t = { global_ids; start_id = 0; next_id = 0; last_id = 0 } in
286 new_packet t;
287 t
288
289 let create () = of_global_ids (Atomic.make 0)
290
291 let for_new_domain { global_ids; _ } : unit -> t =
292 fun () -> of_global_ids global_ids
293 end
294end
295
296(** Trace info *)
297
298module Info = struct
299 type t = {
300 sample_rate : float;
301 word_size : int;
302 executable_name : string;
303 host_name : string;
304 ocaml_runtime_params : string;
305 pid : Int64.t;
306 initial_domain : Domain_id.t;
307 start_time : Timestamp.t;
308 context : string option;
309 }
310end
311
312let put_trace_info b (info : Info.t) =
313 let open Write in
314 put_event_header b Ev_trace_info info.start_time;
315 put_float b info.sample_rate;
316 put_8 b info.word_size;
317 put_string b info.executable_name;
318 put_string b info.host_name;
319 put_string b info.ocaml_runtime_params;
320 put_64 b info.pid;
321 let context = match info.context with None -> "" | Some s -> s in
322 put_string b context
323
324let read_trace_info b ~packet_info =
325 let open Read in
326 let start_time = packet_info.time_begin in
327 let sample_rate = get_float b in
328 let word_size = get_8 b in
329 let executable_name = get_string b in
330 let host_name = get_string b in
331 let ocaml_runtime_params = get_string b in
332 let pid = get_64 b in
333 let context =
334 if packet_info.version >= 2 then
335 match get_string b with "" -> None | s -> Some s
336 else None
337 in
338 {
339 Info.start_time;
340 sample_rate;
341 word_size;
342 executable_name;
343 host_name;
344 ocaml_runtime_params;
345 pid;
346 initial_domain = packet_info.domain;
347 context;
348 }
349
350(** Trace writer *)
351
352type writer = {
353 dest : Buf.Shared_writer_fd.t;
354 pid : int64;
355 getpid : unit -> int64;
356 domain : Domain_id.t;
357 loc_writer : Location_codec.Writer.t;
358 cache : Backtrace_codec.Writer.t;
359 debug_reader_cache : Backtrace_codec.Reader.t option;
360 (* Locations that missed cache in this packet *)
361 mutable new_locs : (int * Location.t list) array;
362 mutable new_locs_len : int;
363 new_locs_buf : Bytes.t;
364 (* Last allocation callstack *)
365 mutable last_callstack : int array;
366 (* Number of slots that were dropped from the last callstack *)
367 mutable last_dropped_slots : int;
368 obj_ids : Obj_id.Allocator.t;
369 mutable packet_time_start : Timestamp.t;
370 mutable packet_time_end : Timestamp.t;
371 mutable packet_header : ctf_header_offsets;
372 mutable packet : Write.t;
373}
374
375let writer_for_domain ~dest ~pid ~getpid ~domain ~obj_ids ~start_time : writer =
376 let packet = Write.of_bytes (Bytes.make max_packet_size '\042') in
377 let packet_header = put_ctf_header packet ~pid ~domain ~cache:None in
378 let cache = Backtrace_codec.Writer.create () in
379 let debug_reader_cache =
380 if cache_enable_debug then Some (Backtrace_codec.Reader.create ()) else None
381 in
382 let s =
383 {
384 dest;
385 pid;
386 getpid;
387 domain;
388 loc_writer = Location_codec.Writer.create ();
389 new_locs = [||];
390 new_locs_len = 0;
391 new_locs_buf = Bytes.make max_packet_size '\042';
392 cache;
393 debug_reader_cache;
394 last_callstack = [||];
395 last_dropped_slots = 0;
396 obj_ids;
397 packet_time_start = start_time;
398 packet_time_end = start_time;
399 packet_header;
400 packet;
401 }
402 in
403 s
404
405let init_writer dest ?getpid (info : Info.t) =
406 let dest = Buf.Shared_writer_fd.make dest in
407 let open Write in
408 let getpid =
409 match getpid with Some getpid -> getpid | None -> fun () -> info.pid
410 in
411 let pid = getpid () in
412 let domain = info.initial_domain in
413 let packet = Write.of_bytes (Bytes.make max_packet_size '\042') in
414 let obj_ids = Obj_id.Allocator.create () in
415 (* Write the trace info packet *)
416 (let hdr = put_ctf_header packet ~pid ~domain ~cache:None in
417 put_trace_info packet info;
418 finish_ctf_header hdr packet ~timestamp_begin:info.start_time
419 ~timestamp_end:info.start_time ~alloc_id_begin:0 ~alloc_id_end:0;
420 write_fd dest packet);
421 writer_for_domain ~dest ~pid ~getpid ~domain ~obj_ids
422 ~start_time:info.start_time
423
424module Location_code = struct
425 type t = int
426
427 module Tbl = Int_tbl
428
429 module Expert = struct
430 let of_int t = t
431 end
432end
433
434module Allocation_source = struct
435 type t = Minor | Major | External
436end
437
438module Event = struct
439 type t =
440 | Alloc of {
441 obj_id : Obj_id.t;
442 length : int;
443 domain : Domain_id.t;
444 nsamples : int;
445 source : Allocation_source.t;
446 backtrace_buffer : Location_code.t array;
447 backtrace_length : int;
448 common_prefix : int;
449 }
450 | Promote of Obj_id.t * Domain_id.t
451 | Collect of Obj_id.t * Domain_id.t
452
453 let to_string decode_loc = function
454 | Alloc
455 {
456 obj_id;
457 length;
458 domain;
459 nsamples;
460 source;
461 backtrace_buffer;
462 backtrace_length;
463 common_prefix;
464 } ->
465 let backtrace =
466 List.init backtrace_length (fun i ->
467 let s = backtrace_buffer.(i) in
468 match decode_loc s with
469 | [] -> Fmt.str "$%d" (s :> int)
470 | ls -> String.concat " " (List.map Location.to_string ls))
471 |> String.concat " "
472 in
473 let alloc_src =
474 match source with
475 | Minor -> "alloc"
476 | Major -> "alloc_major"
477 | External -> "alloc_ext"
478 in
479 Fmt.str "%010d %s %d len=%d dom=%d % 4d: %s"
480 (obj_id :> int)
481 alloc_src nsamples length
482 (domain :> int)
483 common_prefix backtrace
484 | Promote (id, _dom) -> Fmt.str "%010d promote" (id :> int)
485 | Collect (id, _dom) -> Fmt.str "%010d collect" (id :> int)
486
487 let domain = function
488 | Alloc { domain; _ } | Promote (_, domain) | Collect (_, domain) -> domain
489end
490
491let log_new_loc s loc =
492 let alen = Array.length s.new_locs in
493 assert (s.new_locs_len <= alen);
494 if s.new_locs_len = alen then (
495 let new_len = if alen = 0 then 32 else alen * 2 in
496 let locs = Array.make new_len loc in
497 Array.blit s.new_locs 0 locs 0 alen;
498 s.new_locs <- locs;
499 s.new_locs_len <- alen + 1)
500 else (
501 s.new_locs.(s.new_locs_len) <- loc;
502 s.new_locs_len <- s.new_locs_len + 1)
503
504exception Pid_changed
505(** Flushing *)
506
507let flush_at s ~now =
508 (* If the PID has changed, then the process forked and we're in the subprocess.
509 Don't write anything to the file, and raise an exception to quit tracing *)
510 if s.pid <> s.getpid () then raise Pid_changed;
511 let open Write in
512 (* First, flush newly-seen locations.
513 These must be emitted before any events that might refer to them *)
514 let i = ref 0 in
515 while !i < s.new_locs_len do
516 let b = Write.of_bytes s.new_locs_buf in
517 let hdr = put_ctf_header b ~pid:s.pid ~domain:s.domain ~cache:None in
518 while
519 !i < s.new_locs_len && remaining b > Location_codec.Writer.max_length
520 do
521 put_event_header b Ev_location s.packet_time_start;
522 Location_codec.Writer.put_location s.loc_writer b s.new_locs.(!i);
523 incr i
524 done;
525 finish_ctf_header hdr b ~timestamp_begin:s.packet_time_start
526 ~timestamp_end:s.packet_time_start ~alloc_id_begin:s.obj_ids.start_id
527 ~alloc_id_end:s.obj_ids.start_id;
528 write_fd s.dest b
529 done;
530 (* Next, flush the actual events *)
531 finish_ctf_header s.packet_header s.packet
532 ~timestamp_begin:s.packet_time_start ~timestamp_end:s.packet_time_end
533 ~alloc_id_begin:s.obj_ids.start_id ~alloc_id_end:s.obj_ids.next_id;
534 write_fd s.dest s.packet;
535 (* Finally, reset the buffer *)
536 s.packet_time_start <- now;
537 s.packet_time_end <- now;
538 s.new_locs_len <- 0;
539 s.packet <- Write.of_bytes s.packet.buf;
540 Obj_id.Allocator.new_packet s.obj_ids;
541 s.packet_header <-
542 put_ctf_header s.packet ~pid:s.pid ~domain:s.domain ~cache:(Some s.cache)
543
544let max_ev_size =
545 100
546 (* upper bound on fixed-size portion of events
547 (i.e. not backtraces or locations) *)
548 + max Location_codec.Writer.max_length Backtrace_codec.Writer.max_length
549
550let begin_event s ev ~(now : Timestamp.t) =
551 let open Write in
552 if
553 remaining s.packet < max_ev_size
554 || s.new_locs_len > 128
555 || Int64.(sub now s.packet_time_start > of_int packet_max_time)
556 || not (Obj_id.Allocator.has_next s.obj_ids)
557 then flush_at s ~now;
558 s.packet_time_end <- now;
559 put_event_header s.packet ev now
560
561let flush s = flush_at s ~now:s.packet_time_end
562
563(* Returns length of the longest suffix of curr which is also a suffix of prev *)
564let common_suffix_len (prev : int array) prev_start (curr : int array)
565 curr_start =
566 assert (prev_start >= 0);
567 assert (curr_start >= 0);
568 let i = ref (Array.length curr - 1) and j = ref (Array.length prev - 1) in
569 while !i >= curr_start && !j >= prev_start do
570 if Array.unsafe_get curr !i = Array.unsafe_get prev !j then (
571 decr i;
572 decr j)
573 else j := -1
574 done;
575 (* !i is now the highest index of curr that doesn't match prev *)
576 Array.length curr - (!i + 1)
577
578type alloc_length_format =
579 | Len_short of Write.position_8
580 | Len_long of Write.position_16
581
582let verify_backtrace s ~callstack_as_ints ~bt_elem_off ~nencoded ~common_len =
583 match s.debug_reader_cache with
584 | None -> ()
585 | Some c ->
586 let open Read in
587 (* Decode the backtrace and check that it matches *)
588 let b = s.packet in
589 let b' = Read.of_bytes_sub b.buf ~pos:bt_elem_off ~pos_end:b.pos in
590 let decoded, decoded_len =
591 Backtrace_codec.Reader.get_backtrace c b' ~nencoded
592 ~common_pfx_len:common_len
593 in
594 assert (remaining b' = 0);
595 let rev_callstack =
596 callstack_as_ints |> Array.to_list |> List.rev |> Array.of_list
597 in
598 if Array.sub decoded 0 decoded_len <> rev_callstack then (
599 rev_callstack |> Array.map Int64.of_int |> Array.iter (Fmt.pr " %08Lx");
600 Fmt.pr " !@.";
601 Array.sub decoded 0 decoded_len |> Array.iter (Fmt.pr " %08x");
602 Fmt.pr " !@.";
603 failwith "bad coded backtrace")
604
605let encode_alloc_header b ~is_short ~length ~nsamples ~source ~common_len =
606 let open Write in
607 let src_code =
608 match source with
609 | Allocation_source.Minor -> 0
610 | Major -> 1
611 | External -> 2
612 in
613 if is_short then (
614 put_vint b common_len;
615 Len_short (skip_8 b))
616 else (
617 put_vint b length;
618 put_vint b nsamples;
619 put_8 b src_code;
620 put_vint b common_len;
621 Len_long (skip_16 b))
622
623let put_alloc s now ~length ~nsamples ~source ~callstack ~callstack_as_ints
624 ~decode_callstack_entry ~drop_slots =
625 let open Write in
626 let common_len =
627 common_suffix_len s.last_callstack s.last_dropped_slots callstack_as_ints
628 drop_slots
629 in
630 let new_len = Array.length callstack_as_ints - common_len in
631 s.last_callstack <- callstack_as_ints;
632 s.last_dropped_slots <- drop_slots;
633 let is_short =
634 1 <= length && length <= 16
635 && source = Allocation_source.Minor
636 && nsamples = 1 && new_len < 256
637 in
638 begin_event s (if is_short then Ev_short_alloc length else Ev_alloc) ~now;
639 let id = Obj_id.Allocator.take_next_exn s.obj_ids in
640 let cache = s.cache in
641 let b = s.packet in
642 let bt_len_off =
643 encode_alloc_header b ~is_short ~length ~nsamples ~source ~common_len
644 in
645 let bt_elem_off = b.pos in
646 let log_new_location ~index =
647 log_new_loc s
648 (callstack_as_ints.(index), decode_callstack_entry callstack index)
649 in
650 let nencoded =
651 Backtrace_codec.Writer.put_backtrace cache b ~alloc_id:id
652 ~callstack:callstack_as_ints ~callstack_pos:drop_slots
653 ~callstack_len:new_len ~log_new_location
654 in
655 (match bt_len_off with
656 | Len_short p ->
657 assert (nencoded <= 0xff);
658 update_8 b p nencoded
659 | Len_long p ->
660 (* This can't overflow because there isn't room in a packet for more than
661 0xffff entries. (See max_packet_size) *)
662 assert (nencoded <= 0xffff);
663 update_16 b p nencoded);
664 verify_backtrace s ~callstack_as_ints ~bt_elem_off ~nencoded ~common_len;
665 id
666
667let read_alloc ~parse_backtraces ~domain evcode cache alloc_id b =
668 let open Read in
669 let is_short, length, nsamples, source =
670 match evcode with
671 | Ev_short_alloc n -> (true, n, 1, Allocation_source.Minor)
672 | Ev_alloc ->
673 let length = get_vint b in
674 let nsamples = get_vint b in
675 let source : Allocation_source.t =
676 match get_8 b with
677 | 0 -> Minor
678 | 1 -> Major
679 | 2 -> External
680 | _ -> bad_format "source"
681 in
682 (false, length, nsamples, source)
683 | _ -> assert false
684 in
685 let common_pfx_len = get_vint b in
686 let nencoded = if is_short then get_8 b else get_16 b in
687 let backtrace_buffer, backtrace_length =
688 if parse_backtraces then
689 Backtrace_codec.Reader.get_backtrace cache b ~nencoded ~common_pfx_len
690 else (
691 Backtrace_codec.Reader.skip_backtrace cache b ~nencoded ~common_pfx_len;
692 ([||], 0))
693 in
694 Event.Alloc
695 {
696 obj_id = alloc_id;
697 length;
698 domain;
699 nsamples;
700 source;
701 backtrace_buffer;
702 backtrace_length;
703 common_prefix = common_pfx_len;
704 }
705
706(* The other events are much simpler *)
707
708let put_promote s now id =
709 let open Write in
710 begin_event s Ev_promote ~now;
711 let b = s.packet in
712 put_vint b (s.obj_ids.next_id - 1 - id)
713
714let read_promote ~domain alloc_id b =
715 let open Read in
716 let id_delta = get_vint b in
717 check_fmt "promote id sync" (id_delta >= 0);
718 let id = alloc_id - 1 - id_delta in
719 Event.Promote (id, domain)
720
721let put_collect s now id =
722 let open Write in
723 begin_event s Ev_collect ~now;
724 let b = s.packet in
725 put_vint b (s.obj_ids.next_id - 1 - id)
726
727let read_collect ~domain alloc_id b =
728 let open Read in
729 let id_delta = get_vint b in
730 (* Typically, id_delta >= 0, because you are collecting an object with an earlier object
731 ID. However, a tricky case in domain termination (collecting an object previously
732 allocated by a now-terminated domain) means that this is not necessarily the case, so
733 there's no assertion here *)
734 let id = alloc_id - 1 - id_delta in
735 Event.Collect (id, domain)
736
737(** Trace reader *)
738
739type reader = {
740 fd : Unix.file_descr;
741 info : Info.t;
742 data_off : int;
743 loc_table : Location.t list Location_code.Tbl.t;
744}
745
746let init_reader fd =
747 let open Read in
748 let buf = Bytes.make max_packet_size '\042' in
749 let start_pos = Unix.lseek fd 0 SEEK_CUR in
750 let b = read_fd fd buf in
751 let packet_info = read_ctf_header b in
752 let header_size = b.pos in
753 let b, _ = split b packet_info.content_size in
754 check_fmt "trace info packet size" (remaining b >= packet_info.content_size);
755 let ev, evtime = read_event_header packet_info b in
756 check_fmt "trace info packet code" (ev = Ev_trace_info);
757 check_fmt "trace info packet time" (evtime = packet_info.time_begin);
758 let trace_info = read_trace_info b ~packet_info in
759 check_fmt "trace info packet done" (remaining b = 0);
760 let loc_table = Location_code.Tbl.create 20 in
761 let data_off = start_pos + header_size + packet_info.content_size in
762 { fd; info = trace_info; data_off; loc_table }
763
764let report_hack fmt = Fmt.kstr (fun s -> Fmt.epr "%s@." s) fmt
765
766let refill_to size fd stream =
767 let open Read in
768 if remaining stream < size then refill_fd fd stream else stream
769
770let domain_state per_domain start_time domain =
771 try Domain_id.Tbl.find per_domain domain
772 with Not_found ->
773 let reader = Location_codec.Reader.create () in
774 let cache = Backtrace_codec.Reader.create () in
775 let last_timestamp = ref start_time in
776 Domain_id.Tbl.add per_domain domain (reader, cache, last_timestamp);
777 (reader, cache, last_timestamp)
778
779let iter_events_of_packet s per_domain ~parse_backtraces f
780 (packet_header : packet_header_info) b =
781 let open Read in
782 let domain = packet_header.domain in
783 let alloc_id = ref (Int64.to_int packet_header.alloc_id_begin) in
784 let loc_reader, cache, last_timestamp =
785 domain_state per_domain s.info.start_time domain
786 in
787 if parse_backtraces then
788 if
789 not
790 (Backtrace_codec.Reader.check_cache_verifier cache
791 packet_header.cache_verifier)
792 then bad_format "cache verification";
793 while remaining b > 0 do
794 let ev, time = read_event_header packet_header b in
795 check_fmt "monotone timestamps" (!last_timestamp <= time);
796 last_timestamp := time;
797 let dt = Int64.(sub time s.info.start_time) in
798 match ev with
799 | Ev_trace_info -> bad_format "Multiple trace-info events present"
800 | Ev_location ->
801 let id, loc = Location_codec.Reader.get_location loc_reader b in
802 (*Printf.printf "%3d _ _ location\n" (b.pos - last_pos);*)
803 if Location_code.Tbl.mem s.loc_table id then
804 check_fmt "consistent location info"
805 (Location_code.Tbl.find s.loc_table id = loc)
806 else Location_code.Tbl.add s.loc_table id loc
807 | (Ev_alloc | Ev_short_alloc _) as evcode ->
808 let info =
809 read_alloc ~parse_backtraces ~domain evcode cache !alloc_id b
810 in
811 incr alloc_id;
812 (*Printf.printf "%3d " (b.pos - last_pos);*)
813 f dt info
814 | Ev_collect ->
815 let info = read_collect ~domain !alloc_id b in
816 (*Printf.printf "%3d " (b.pos - last_pos);*)
817 f dt info
818 | Ev_promote ->
819 let info = read_promote ~domain !alloc_id b in
820 (*Printf.printf "%3d " (b.pos - last_pos);*)
821 f dt info
822 done;
823 check_fmt "alloc id sync" (packet_header.alloc_id_end = Int64.of_int !alloc_id)
824
825let iter s ?(parse_backtraces = true) f =
826 let open Read in
827 let per_domain = Domain_id.Tbl.create 1 in
828 Unix.lseek s.fd s.data_off SEEK_SET |> ignore;
829 let rec iter_packets stream =
830 let header_upper_bound =
831 200
832 (* more than big enough for a header *)
833 in
834 let stream = refill_to header_upper_bound s.fd stream in
835 if remaining stream = 0 then ()
836 else
837 let packet_header = read_ctf_header stream in
838 let stream = refill_to packet_header.content_size s.fd stream in
839 let packet, rest = split stream packet_header.content_size in
840 if packet_header.pid <> s.info.pid then
841 report_hack "skipping bad packet (wrong pid: %Ld, but tracing %Ld)"
842 packet_header.pid s.info.pid
843 else if remaining packet <> packet_header.content_size then
844 report_hack "skipping truncated packet"
845 else
846 iter_events_of_packet s per_domain ~parse_backtraces f packet_header
847 packet;
848 iter_packets rest
849 in
850 iter_packets (read_fd s.fd (Bytes.make max_packet_size '\000'))
851
852module Private = struct
853 let name_of_memprof_tracer = Atomic.make ""
854 let set_tracer_module s = Atomic.set name_of_memprof_tracer (s ^ ".ext_alloc")
855 let obj_ids_per_chunk = Obj_id.Allocator.ids_per_chunk
856end
857
858module Writer = struct
859 type t = writer
860
861 exception Pid_changed = Pid_changed
862
863 let create = init_writer
864 let domain t = t.domain
865
866 let for_domain_at_time ~start_time t : domain:int -> t =
867 let { dest; pid; getpid; _ } = t in
868 let obj_ids = Obj_id.Allocator.for_new_domain t.obj_ids in
869 fun ~domain ->
870 let obj_ids = obj_ids () in
871 let t =
872 writer_for_domain ~dest ~pid ~getpid ~domain ~obj_ids ~start_time
873 in
874 t
875
876 let for_domain t = for_domain_at_time ~start_time:t.packet_time_end t
877
878 (* Unfortunately, efficient access to the backtrace is not possible
879 with the current Printexc API, even though internally it's an int
880 array. For now, wave the Obj.magic wand. There's a PR to fix this:
881 https://github.com/ocaml/ocaml/pull/9663 *)
882 let loc_codes_of_backtrace (b : Printexc.raw_backtrace) :
883 Location_code.t array =
884 Obj.magic b
885
886 (* Is this a location that we'd prefer to leave out of traces? This mechanism only
887 really makes sense for inlinable functions, so that we can drop a single frame out of
888 a backtrace slot. For non-inlinable functions like
889 [Memprof_tracer.ext_alloc_slowpath], we instead avoid capturing the slot to begin
890 with (using [put_alloc_backtrace_suffix]). *)
891 let is_internal_location (loc : Location.t) =
892 String.equal loc.defname (Atomic.get Private.name_of_memprof_tracer)
893
894 let decode_raw_backtrace_entry callstack i : Location.t list =
895 let open Printexc in
896 let rec get_locations slot : Location.t list =
897 let tail =
898 match get_raw_backtrace_next_slot slot with
899 | None -> []
900 | Some slot -> get_locations slot
901 in
902 let slot = convert_raw_backtrace_slot slot in
903 match Slot.location slot with
904 | None -> tail
905 | Some { filename; line_number; start_char; end_char; _ } ->
906 let defname = match Slot.name slot with Some n -> n | _ -> "??" in
907 { filename; line = line_number; start_char; end_char; defname }
908 :: tail
909 in
910 let locs = get_locations (get_raw_backtrace_slot callstack i) |> List.rev in
911 match List.filter (fun loc -> not (is_internal_location loc)) locs with
912 | [] ->
913 (* It would break things to return an empty list here, and the worst that happens if
914 we return the whole list is occasionally slightly confusing output (it looks like
915 Memtrace is using memory rather than user code). *)
916 locs
917 | at_least_one -> at_least_one
918
919 let put_alloc_backtrace_suffix t now ~length ~nsamples ~source ~callstack
920 ~drop_slots =
921 let callstack_as_ints = loc_codes_of_backtrace callstack in
922 put_alloc t now ~length ~nsamples ~source ~callstack ~callstack_as_ints
923 ~decode_callstack_entry:decode_raw_backtrace_entry ~drop_slots
924
925 let put_alloc_with_raw_backtrace t now ~length ~nsamples ~source ~callstack =
926 let callstack_as_ints = loc_codes_of_backtrace callstack in
927 put_alloc t now ~length ~nsamples ~source ~callstack ~callstack_as_ints
928 ~decode_callstack_entry:decode_raw_backtrace_entry ~drop_slots:0
929
930 let put_alloc t now ~length ~nsamples ~source ~callstack
931 ~decode_callstack_entry =
932 let decode_callstack_entry cs i = decode_callstack_entry cs.(i) in
933 put_alloc t now ~length ~nsamples ~source ~callstack
934 ~callstack_as_ints:callstack ~decode_callstack_entry ~drop_slots:0
935
936 let put_collect = put_collect
937 let put_promote = put_promote
938 let flush = flush
939
940 let close t =
941 flush t;
942 Buf.Shared_writer_fd.close t.dest
943
944 let put_event w ~decode_callstack_entry now (ev : Event.t) =
945 if Event.domain ev <> w.domain then
946 raise (Invalid_argument "Trace.put_event: mismatched domain fields");
947 if now < w.packet_time_end then
948 raise (Invalid_argument "Trace.put_event: out-of-order timestamps");
949 match ev with
950 | Alloc
951 {
952 obj_id;
953 length;
954 domain = _;
955 nsamples;
956 source;
957 backtrace_buffer;
958 backtrace_length;
959 common_prefix = _;
960 } ->
961 let btrev =
962 Array.init backtrace_length (fun i ->
963 backtrace_buffer.(backtrace_length - 1 - i))
964 in
965 let id =
966 put_alloc w now ~length ~nsamples ~source ~callstack:btrev
967 ~decode_callstack_entry
968 in
969 if id <> obj_id then raise (Invalid_argument "Incorrect allocation ID")
970 | Promote (id, _domain) -> put_promote w now id
971 | Collect (id, _domain) -> put_collect w now id
972
973 module Multiplexed_domains = struct
974 type nonrec t = {
975 mutable last_domain : Domain_id.t;
976 (* Invariant: all writers except possibly that of [last_domain] are flushed *)
977 writers : t Domain_id.Tbl.t;
978 start_time : Timestamp.t;
979 }
980
981 let create dest ?getpid info =
982 let w = create dest ?getpid info in
983 let writers = Domain_id.Tbl.create 1 in
984 let dom = domain w in
985 Domain_id.Tbl.add writers dom w;
986 { last_domain = dom; writers; start_time = info.start_time }
987
988 let writer_for_domain t ~domain =
989 let last_w = Domain_id.Tbl.find t.writers t.last_domain in
990 if domain = t.last_domain then last_w
991 else (
992 flush last_w;
993 t.last_domain <- domain;
994 try Domain_id.Tbl.find t.writers domain
995 with Not_found ->
996 let w =
997 (for_domain_at_time ~start_time:t.start_time last_w) ~domain
998 in
999 Domain_id.Tbl.add t.writers domain w;
1000 w)
1001
1002 let next_alloc_id t ~domain =
1003 let w = writer_for_domain t ~domain in
1004 if not (Obj_id.Allocator.has_next w.obj_ids) then
1005 flush_at w ~now:w.packet_time_end;
1006 Obj_id.Allocator.read_next_exn w.obj_ids
1007
1008 let put_event t ~decode_callstack_entry time ev =
1009 let w = writer_for_domain t ~domain:(Event.domain ev) in
1010 put_event w ~decode_callstack_entry time ev
1011
1012 let flush t = flush (Domain_id.Tbl.find t.writers t.last_domain)
1013 end
1014end
1015
1016module Reader = struct
1017 type t = reader
1018
1019 let create = init_reader
1020 let info s = s.info
1021
1022 let lookup_location_code { loc_table; _ } code =
1023 match Location_code.Tbl.find loc_table code with
1024 | v -> v
1025 | exception Not_found ->
1026 raise (Invalid_argument (Fmt.str "invalid location code %08x" code))
1027
1028 let iter = iter
1029 let open_ ~filename = init_reader (Unix.openfile filename [ Unix.O_RDONLY ] 0)
1030 let size_bytes s = (Unix.LargeFile.fstat s.fd).st_size
1031 let close s = Unix.close s.fd
1032end