upstream: https://github.com/janestreet/memtrace
at main 1032 lines 33 kB view raw
1open Stdlib_shim 2(* This is the implementation of the encoder/decoder for the memtrace 3 format. This format is quite involved, and to understand it it's 4 best to read the CTF specification and comments in memtrace.tsl 5 first. *) 6 7(* Increment this when the format changes in an incompatible way *) 8(* Version 2: added context field to trace_info event 9 Version 3: added domain field to packet header *) 10let memtrace_version = 3 11 12(* If this is true, then all backtraces are immediately decoded and 13 verified after encoding. This is slow, but helpful for debugging. *) 14let cache_enable_debug = false 15 16open Buf 17 18exception Parse_error of string 19 20let () = 21 (Printexc.register_printer [@ocaml.alert "-unsafe_multidomain"]) (function 22 | Parse_error s -> Some ("malformed trace: " ^ s) 23 | _ -> None) 24 25let[@inline never] bad_format s = raise (Parse_error s) 26let[@inline never] bad_formatf f = Fmt.kstr bad_format f 27let check_fmt s b = if not b then bad_format s 28 29(* Utility types *) 30 31(* Time since the epoch *) 32module Timestamp = struct 33 type t = int64 34 35 let of_int64 t = t 36 let to_int64 t = t 37 let to_float t = Int64.to_float t /. 1_000_000. 38 let of_float f = f *. 1_000_000. |> Int64.of_float 39 let now () = of_float (Unix.gettimeofday ()) 40end 41 42(* Time since the start of the trace *) 43module Timedelta = struct 44 type t = int64 45 46 let to_int64 t = t 47 let offset = Int64.add 48end 49 50module Int_tbl = Hashtbl.Make_seeded_portable (struct 51 type t = int 52 53 let hash _seed (id : t) = 54 let h = id * 189696287 in 55 h lxor (h lsr 23) 56 57 (* Required for OCaml >= 5.0.0, but causes errors for older compilers 58 because it is an unused value declaration. *) 59 let seeded_hash = hash 60 let equal (a : t) (b : t) = a = b 61end) 62 63module Domain_id = struct 64 type t = int 65 66 module Tbl = Int_tbl 67 68 module Expert = struct 69 let of_int x = x 70 end 71 72 let main_domain = Expert.of_int 0 73end 74 75(** CTF packet headers *) 76 77(* Small enough that Unix.write still does single writes. 78 (i.e. below 64k) *) 79let max_packet_size = 1 lsl 15 80 81type packet_header_info = { 82 content_size : int; (* bytes, excluding header *) 83 time_begin : Timestamp.t; 84 time_end : Timestamp.t; 85 alloc_id_begin : Int64.t; 86 alloc_id_end : Int64.t; 87 pid : Int64.t; 88 version : int; 89 domain : int; 90 cache_verifier : Backtrace_codec.Reader.cache_verifier; 91} 92 93(* When writing a packet, some fields can be filled in only once the 94 packet is complete. *) 95type ctf_header_offsets = { 96 off_packet_size : Write.position_32; 97 off_timestamp_begin : Write.position_64; 98 off_timestamp_end : Write.position_64; 99 off_flush_duration : Write.position_32; 100 off_alloc_begin : Write.position_64; 101 off_alloc_end : Write.position_64; 102} 103 104let put_ctf_header b ~pid ~domain ~cache = 105 let open Write in 106 put_32 b 0xc1fc1fc1l; 107 let off_packet_size = skip_32 b in 108 let off_timestamp_begin = skip_64 b in 109 let off_timestamp_end = skip_64 b in 110 let off_flush_duration = skip_32 b in 111 put_16 b memtrace_version; 112 put_64 b pid; 113 put_16 b domain; 114 (match cache with 115 | Some c -> Backtrace_codec.Writer.put_cache_verifier c b 116 | None -> Backtrace_codec.Writer.put_dummy_verifier b); 117 let off_alloc_begin = skip_64 b in 118 let off_alloc_end = skip_64 b in 119 { 120 off_packet_size; 121 off_timestamp_begin; 122 off_timestamp_end; 123 off_flush_duration; 124 off_alloc_begin; 125 off_alloc_end; 126 } 127 128let finish_ctf_header hdr b ~timestamp_begin ~timestamp_end ~alloc_id_begin 129 ~alloc_id_end = 130 let open Write in 131 let size = b.pos in 132 update_32 b hdr.off_packet_size (Int32.mul (Int32.of_int size) 8l); 133 update_64 b hdr.off_timestamp_begin timestamp_begin; 134 update_64 b hdr.off_timestamp_end timestamp_end; 135 update_32 b hdr.off_flush_duration 0l; 136 update_64 b hdr.off_alloc_begin (Int64.of_int alloc_id_begin); 137 update_64 b hdr.off_alloc_end (Int64.of_int alloc_id_end) 138 139let read_ctf_header b = 140 let open Read in 141 let start = b.pos in 142 let magic = get_32 b in 143 let packet_size = get_32 b in 144 let time_begin = get_64 b in 145 let time_end = get_64 b in 146 let _flush_duration = get_32 b in 147 let version = get_16 b in 148 let pid = get_64 b in 149 let domain = if version >= 3 then get_16 b else 0 in 150 let cache_verifier = Backtrace_codec.Reader.get_cache_verifier b in 151 let alloc_id_begin = get_64 b in 152 let alloc_id_end = get_64 b in 153 check_fmt "Not a CTF packet" (magic = 0xc1fc1fc1l); 154 if version > memtrace_version then 155 bad_formatf "trace format v%03d, but expected v%03d" version 156 memtrace_version; 157 check_fmt "Bad packet size" (packet_size >= 0l); 158 check_fmt "Monotone packet timestamps" (time_begin <= time_end); 159 check_fmt "Monotone alloc IDs" (alloc_id_begin <= alloc_id_end); 160 let header_size = b.pos - start in 161 { 162 content_size = Int32.(to_int (div packet_size 8l) - header_size); 163 time_begin; 164 time_end; 165 alloc_id_begin; 166 alloc_id_end; 167 pid; 168 domain; 169 version; 170 cache_verifier; 171 } 172 173(** Event headers *) 174 175type evcode = 176 | Ev_trace_info 177 | Ev_location 178 | Ev_alloc 179 | Ev_promote 180 | Ev_collect 181 | Ev_short_alloc of int 182 183let event_code = function 184 | Ev_trace_info -> 0 185 | Ev_location -> 1 186 | Ev_alloc -> 2 187 | Ev_promote -> 3 188 | Ev_collect -> 4 189 | Ev_short_alloc n -> 190 assert (1 <= n && n <= 16); 191 100 + n 192 193let event_of_code = function 194 | 0 -> Ev_trace_info 195 | 1 -> Ev_location 196 | 2 -> Ev_alloc 197 | 3 -> Ev_promote 198 | 4 -> Ev_collect 199 | n when 101 <= n && n <= 116 -> Ev_short_alloc (n - 100) 200 | c -> bad_format ("Unknown event code " ^ string_of_int c) 201 202let event_header_time_len = 25 203let event_header_time_mask = 0x1ffffffl 204 205(* NB: packet_max_time is less than (1 lsl event_header_time_len) microsecs *) 206let packet_max_time = 30 * 1_000_000 207 208let put_event_header b ev time = 209 let open Write in 210 let code = 211 Int32.( 212 logor 213 (shift_left (of_int (event_code ev)) event_header_time_len) 214 (logand (Int64.to_int32 time) event_header_time_mask)) 215 in 216 put_32 b code 217 218let[@inline] read_event_header info b = 219 let open Read in 220 let code = get_32 b in 221 let start_low = 222 Int32.logand event_header_time_mask (Int64.to_int32 info.time_begin) 223 in 224 let time_low = Int32.logand event_header_time_mask code in 225 let time_low = 226 if time_low < start_low then (* Overflow *) 227 Int32.(add time_low (of_int (1 lsl event_header_time_len))) 228 else time_low 229 in 230 let time = 231 Int64.( 232 add 233 (logand info.time_begin (lognot (of_int32 event_header_time_mask))) 234 (of_int32 time_low)) 235 in 236 check_fmt "time in packet bounds" 237 (info.time_begin <= time && time <= info.time_end); 238 let ev = 239 event_of_code 240 Int32.(to_int (shift_right_logical code event_header_time_len)) 241 in 242 (ev, time) 243 244module Location = Location_codec.Location 245 246module Obj_id = struct 247 type t = int 248 249 module Tbl = Int_tbl 250 251 module Expert = struct 252 let of_int t = t 253 end 254 255 module Allocator = struct 256 type nonrec t = { 257 global_ids : t Atomic.t; 258 mutable start_id : t; (* first object ID this packet *) 259 mutable next_id : t; (* next object ID in this packet *) 260 mutable last_id : t; (* object ID at which we need to reallocate *) 261 } 262 263 let has_next t = t.next_id < t.last_id 264 265 let read_next_exn t = 266 if t.next_id = t.last_id then 267 failwith "Obj_id.Allocator.next_exn: exhausted"; 268 t.next_id 269 270 let take_next_exn t = 271 let id = read_next_exn t in 272 t.next_id <- id + 1; 273 id 274 275 let ids_per_chunk = Atomic.make 10_000 276 277 let new_packet t = 278 if not (has_next t) then ( 279 let ids_per_chunk = Atomic.get ids_per_chunk in 280 t.next_id <- Atomic.fetch_and_add t.global_ids ids_per_chunk; 281 t.last_id <- t.next_id + ids_per_chunk); 282 t.start_id <- t.next_id 283 284 let of_global_ids global_ids = 285 let t = { global_ids; start_id = 0; next_id = 0; last_id = 0 } in 286 new_packet t; 287 t 288 289 let create () = of_global_ids (Atomic.make 0) 290 291 let for_new_domain { global_ids; _ } : unit -> t = 292 fun () -> of_global_ids global_ids 293 end 294end 295 296(** Trace info *) 297 298module Info = struct 299 type t = { 300 sample_rate : float; 301 word_size : int; 302 executable_name : string; 303 host_name : string; 304 ocaml_runtime_params : string; 305 pid : Int64.t; 306 initial_domain : Domain_id.t; 307 start_time : Timestamp.t; 308 context : string option; 309 } 310end 311 312let put_trace_info b (info : Info.t) = 313 let open Write in 314 put_event_header b Ev_trace_info info.start_time; 315 put_float b info.sample_rate; 316 put_8 b info.word_size; 317 put_string b info.executable_name; 318 put_string b info.host_name; 319 put_string b info.ocaml_runtime_params; 320 put_64 b info.pid; 321 let context = match info.context with None -> "" | Some s -> s in 322 put_string b context 323 324let read_trace_info b ~packet_info = 325 let open Read in 326 let start_time = packet_info.time_begin in 327 let sample_rate = get_float b in 328 let word_size = get_8 b in 329 let executable_name = get_string b in 330 let host_name = get_string b in 331 let ocaml_runtime_params = get_string b in 332 let pid = get_64 b in 333 let context = 334 if packet_info.version >= 2 then 335 match get_string b with "" -> None | s -> Some s 336 else None 337 in 338 { 339 Info.start_time; 340 sample_rate; 341 word_size; 342 executable_name; 343 host_name; 344 ocaml_runtime_params; 345 pid; 346 initial_domain = packet_info.domain; 347 context; 348 } 349 350(** Trace writer *) 351 352type writer = { 353 dest : Buf.Shared_writer_fd.t; 354 pid : int64; 355 getpid : unit -> int64; 356 domain : Domain_id.t; 357 loc_writer : Location_codec.Writer.t; 358 cache : Backtrace_codec.Writer.t; 359 debug_reader_cache : Backtrace_codec.Reader.t option; 360 (* Locations that missed cache in this packet *) 361 mutable new_locs : (int * Location.t list) array; 362 mutable new_locs_len : int; 363 new_locs_buf : Bytes.t; 364 (* Last allocation callstack *) 365 mutable last_callstack : int array; 366 (* Number of slots that were dropped from the last callstack *) 367 mutable last_dropped_slots : int; 368 obj_ids : Obj_id.Allocator.t; 369 mutable packet_time_start : Timestamp.t; 370 mutable packet_time_end : Timestamp.t; 371 mutable packet_header : ctf_header_offsets; 372 mutable packet : Write.t; 373} 374 375let writer_for_domain ~dest ~pid ~getpid ~domain ~obj_ids ~start_time : writer = 376 let packet = Write.of_bytes (Bytes.make max_packet_size '\042') in 377 let packet_header = put_ctf_header packet ~pid ~domain ~cache:None in 378 let cache = Backtrace_codec.Writer.create () in 379 let debug_reader_cache = 380 if cache_enable_debug then Some (Backtrace_codec.Reader.create ()) else None 381 in 382 let s = 383 { 384 dest; 385 pid; 386 getpid; 387 domain; 388 loc_writer = Location_codec.Writer.create (); 389 new_locs = [||]; 390 new_locs_len = 0; 391 new_locs_buf = Bytes.make max_packet_size '\042'; 392 cache; 393 debug_reader_cache; 394 last_callstack = [||]; 395 last_dropped_slots = 0; 396 obj_ids; 397 packet_time_start = start_time; 398 packet_time_end = start_time; 399 packet_header; 400 packet; 401 } 402 in 403 s 404 405let init_writer dest ?getpid (info : Info.t) = 406 let dest = Buf.Shared_writer_fd.make dest in 407 let open Write in 408 let getpid = 409 match getpid with Some getpid -> getpid | None -> fun () -> info.pid 410 in 411 let pid = getpid () in 412 let domain = info.initial_domain in 413 let packet = Write.of_bytes (Bytes.make max_packet_size '\042') in 414 let obj_ids = Obj_id.Allocator.create () in 415 (* Write the trace info packet *) 416 (let hdr = put_ctf_header packet ~pid ~domain ~cache:None in 417 put_trace_info packet info; 418 finish_ctf_header hdr packet ~timestamp_begin:info.start_time 419 ~timestamp_end:info.start_time ~alloc_id_begin:0 ~alloc_id_end:0; 420 write_fd dest packet); 421 writer_for_domain ~dest ~pid ~getpid ~domain ~obj_ids 422 ~start_time:info.start_time 423 424module Location_code = struct 425 type t = int 426 427 module Tbl = Int_tbl 428 429 module Expert = struct 430 let of_int t = t 431 end 432end 433 434module Allocation_source = struct 435 type t = Minor | Major | External 436end 437 438module Event = struct 439 type t = 440 | Alloc of { 441 obj_id : Obj_id.t; 442 length : int; 443 domain : Domain_id.t; 444 nsamples : int; 445 source : Allocation_source.t; 446 backtrace_buffer : Location_code.t array; 447 backtrace_length : int; 448 common_prefix : int; 449 } 450 | Promote of Obj_id.t * Domain_id.t 451 | Collect of Obj_id.t * Domain_id.t 452 453 let to_string decode_loc = function 454 | Alloc 455 { 456 obj_id; 457 length; 458 domain; 459 nsamples; 460 source; 461 backtrace_buffer; 462 backtrace_length; 463 common_prefix; 464 } -> 465 let backtrace = 466 List.init backtrace_length (fun i -> 467 let s = backtrace_buffer.(i) in 468 match decode_loc s with 469 | [] -> Fmt.str "$%d" (s :> int) 470 | ls -> String.concat " " (List.map Location.to_string ls)) 471 |> String.concat " " 472 in 473 let alloc_src = 474 match source with 475 | Minor -> "alloc" 476 | Major -> "alloc_major" 477 | External -> "alloc_ext" 478 in 479 Fmt.str "%010d %s %d len=%d dom=%d % 4d: %s" 480 (obj_id :> int) 481 alloc_src nsamples length 482 (domain :> int) 483 common_prefix backtrace 484 | Promote (id, _dom) -> Fmt.str "%010d promote" (id :> int) 485 | Collect (id, _dom) -> Fmt.str "%010d collect" (id :> int) 486 487 let domain = function 488 | Alloc { domain; _ } | Promote (_, domain) | Collect (_, domain) -> domain 489end 490 491let log_new_loc s loc = 492 let alen = Array.length s.new_locs in 493 assert (s.new_locs_len <= alen); 494 if s.new_locs_len = alen then ( 495 let new_len = if alen = 0 then 32 else alen * 2 in 496 let locs = Array.make new_len loc in 497 Array.blit s.new_locs 0 locs 0 alen; 498 s.new_locs <- locs; 499 s.new_locs_len <- alen + 1) 500 else ( 501 s.new_locs.(s.new_locs_len) <- loc; 502 s.new_locs_len <- s.new_locs_len + 1) 503 504exception Pid_changed 505(** Flushing *) 506 507let flush_at s ~now = 508 (* If the PID has changed, then the process forked and we're in the subprocess. 509 Don't write anything to the file, and raise an exception to quit tracing *) 510 if s.pid <> s.getpid () then raise Pid_changed; 511 let open Write in 512 (* First, flush newly-seen locations. 513 These must be emitted before any events that might refer to them *) 514 let i = ref 0 in 515 while !i < s.new_locs_len do 516 let b = Write.of_bytes s.new_locs_buf in 517 let hdr = put_ctf_header b ~pid:s.pid ~domain:s.domain ~cache:None in 518 while 519 !i < s.new_locs_len && remaining b > Location_codec.Writer.max_length 520 do 521 put_event_header b Ev_location s.packet_time_start; 522 Location_codec.Writer.put_location s.loc_writer b s.new_locs.(!i); 523 incr i 524 done; 525 finish_ctf_header hdr b ~timestamp_begin:s.packet_time_start 526 ~timestamp_end:s.packet_time_start ~alloc_id_begin:s.obj_ids.start_id 527 ~alloc_id_end:s.obj_ids.start_id; 528 write_fd s.dest b 529 done; 530 (* Next, flush the actual events *) 531 finish_ctf_header s.packet_header s.packet 532 ~timestamp_begin:s.packet_time_start ~timestamp_end:s.packet_time_end 533 ~alloc_id_begin:s.obj_ids.start_id ~alloc_id_end:s.obj_ids.next_id; 534 write_fd s.dest s.packet; 535 (* Finally, reset the buffer *) 536 s.packet_time_start <- now; 537 s.packet_time_end <- now; 538 s.new_locs_len <- 0; 539 s.packet <- Write.of_bytes s.packet.buf; 540 Obj_id.Allocator.new_packet s.obj_ids; 541 s.packet_header <- 542 put_ctf_header s.packet ~pid:s.pid ~domain:s.domain ~cache:(Some s.cache) 543 544let max_ev_size = 545 100 546 (* upper bound on fixed-size portion of events 547 (i.e. not backtraces or locations) *) 548 + max Location_codec.Writer.max_length Backtrace_codec.Writer.max_length 549 550let begin_event s ev ~(now : Timestamp.t) = 551 let open Write in 552 if 553 remaining s.packet < max_ev_size 554 || s.new_locs_len > 128 555 || Int64.(sub now s.packet_time_start > of_int packet_max_time) 556 || not (Obj_id.Allocator.has_next s.obj_ids) 557 then flush_at s ~now; 558 s.packet_time_end <- now; 559 put_event_header s.packet ev now 560 561let flush s = flush_at s ~now:s.packet_time_end 562 563(* Returns length of the longest suffix of curr which is also a suffix of prev *) 564let common_suffix_len (prev : int array) prev_start (curr : int array) 565 curr_start = 566 assert (prev_start >= 0); 567 assert (curr_start >= 0); 568 let i = ref (Array.length curr - 1) and j = ref (Array.length prev - 1) in 569 while !i >= curr_start && !j >= prev_start do 570 if Array.unsafe_get curr !i = Array.unsafe_get prev !j then ( 571 decr i; 572 decr j) 573 else j := -1 574 done; 575 (* !i is now the highest index of curr that doesn't match prev *) 576 Array.length curr - (!i + 1) 577 578type alloc_length_format = 579 | Len_short of Write.position_8 580 | Len_long of Write.position_16 581 582let verify_backtrace s ~callstack_as_ints ~bt_elem_off ~nencoded ~common_len = 583 match s.debug_reader_cache with 584 | None -> () 585 | Some c -> 586 let open Read in 587 (* Decode the backtrace and check that it matches *) 588 let b = s.packet in 589 let b' = Read.of_bytes_sub b.buf ~pos:bt_elem_off ~pos_end:b.pos in 590 let decoded, decoded_len = 591 Backtrace_codec.Reader.get_backtrace c b' ~nencoded 592 ~common_pfx_len:common_len 593 in 594 assert (remaining b' = 0); 595 let rev_callstack = 596 callstack_as_ints |> Array.to_list |> List.rev |> Array.of_list 597 in 598 if Array.sub decoded 0 decoded_len <> rev_callstack then ( 599 rev_callstack |> Array.map Int64.of_int |> Array.iter (Fmt.pr " %08Lx"); 600 Fmt.pr " !@."; 601 Array.sub decoded 0 decoded_len |> Array.iter (Fmt.pr " %08x"); 602 Fmt.pr " !@."; 603 failwith "bad coded backtrace") 604 605let encode_alloc_header b ~is_short ~length ~nsamples ~source ~common_len = 606 let open Write in 607 let src_code = 608 match source with 609 | Allocation_source.Minor -> 0 610 | Major -> 1 611 | External -> 2 612 in 613 if is_short then ( 614 put_vint b common_len; 615 Len_short (skip_8 b)) 616 else ( 617 put_vint b length; 618 put_vint b nsamples; 619 put_8 b src_code; 620 put_vint b common_len; 621 Len_long (skip_16 b)) 622 623let put_alloc s now ~length ~nsamples ~source ~callstack ~callstack_as_ints 624 ~decode_callstack_entry ~drop_slots = 625 let open Write in 626 let common_len = 627 common_suffix_len s.last_callstack s.last_dropped_slots callstack_as_ints 628 drop_slots 629 in 630 let new_len = Array.length callstack_as_ints - common_len in 631 s.last_callstack <- callstack_as_ints; 632 s.last_dropped_slots <- drop_slots; 633 let is_short = 634 1 <= length && length <= 16 635 && source = Allocation_source.Minor 636 && nsamples = 1 && new_len < 256 637 in 638 begin_event s (if is_short then Ev_short_alloc length else Ev_alloc) ~now; 639 let id = Obj_id.Allocator.take_next_exn s.obj_ids in 640 let cache = s.cache in 641 let b = s.packet in 642 let bt_len_off = 643 encode_alloc_header b ~is_short ~length ~nsamples ~source ~common_len 644 in 645 let bt_elem_off = b.pos in 646 let log_new_location ~index = 647 log_new_loc s 648 (callstack_as_ints.(index), decode_callstack_entry callstack index) 649 in 650 let nencoded = 651 Backtrace_codec.Writer.put_backtrace cache b ~alloc_id:id 652 ~callstack:callstack_as_ints ~callstack_pos:drop_slots 653 ~callstack_len:new_len ~log_new_location 654 in 655 (match bt_len_off with 656 | Len_short p -> 657 assert (nencoded <= 0xff); 658 update_8 b p nencoded 659 | Len_long p -> 660 (* This can't overflow because there isn't room in a packet for more than 661 0xffff entries. (See max_packet_size) *) 662 assert (nencoded <= 0xffff); 663 update_16 b p nencoded); 664 verify_backtrace s ~callstack_as_ints ~bt_elem_off ~nencoded ~common_len; 665 id 666 667let read_alloc ~parse_backtraces ~domain evcode cache alloc_id b = 668 let open Read in 669 let is_short, length, nsamples, source = 670 match evcode with 671 | Ev_short_alloc n -> (true, n, 1, Allocation_source.Minor) 672 | Ev_alloc -> 673 let length = get_vint b in 674 let nsamples = get_vint b in 675 let source : Allocation_source.t = 676 match get_8 b with 677 | 0 -> Minor 678 | 1 -> Major 679 | 2 -> External 680 | _ -> bad_format "source" 681 in 682 (false, length, nsamples, source) 683 | _ -> assert false 684 in 685 let common_pfx_len = get_vint b in 686 let nencoded = if is_short then get_8 b else get_16 b in 687 let backtrace_buffer, backtrace_length = 688 if parse_backtraces then 689 Backtrace_codec.Reader.get_backtrace cache b ~nencoded ~common_pfx_len 690 else ( 691 Backtrace_codec.Reader.skip_backtrace cache b ~nencoded ~common_pfx_len; 692 ([||], 0)) 693 in 694 Event.Alloc 695 { 696 obj_id = alloc_id; 697 length; 698 domain; 699 nsamples; 700 source; 701 backtrace_buffer; 702 backtrace_length; 703 common_prefix = common_pfx_len; 704 } 705 706(* The other events are much simpler *) 707 708let put_promote s now id = 709 let open Write in 710 begin_event s Ev_promote ~now; 711 let b = s.packet in 712 put_vint b (s.obj_ids.next_id - 1 - id) 713 714let read_promote ~domain alloc_id b = 715 let open Read in 716 let id_delta = get_vint b in 717 check_fmt "promote id sync" (id_delta >= 0); 718 let id = alloc_id - 1 - id_delta in 719 Event.Promote (id, domain) 720 721let put_collect s now id = 722 let open Write in 723 begin_event s Ev_collect ~now; 724 let b = s.packet in 725 put_vint b (s.obj_ids.next_id - 1 - id) 726 727let read_collect ~domain alloc_id b = 728 let open Read in 729 let id_delta = get_vint b in 730 (* Typically, id_delta >= 0, because you are collecting an object with an earlier object 731 ID. However, a tricky case in domain termination (collecting an object previously 732 allocated by a now-terminated domain) means that this is not necessarily the case, so 733 there's no assertion here *) 734 let id = alloc_id - 1 - id_delta in 735 Event.Collect (id, domain) 736 737(** Trace reader *) 738 739type reader = { 740 fd : Unix.file_descr; 741 info : Info.t; 742 data_off : int; 743 loc_table : Location.t list Location_code.Tbl.t; 744} 745 746let init_reader fd = 747 let open Read in 748 let buf = Bytes.make max_packet_size '\042' in 749 let start_pos = Unix.lseek fd 0 SEEK_CUR in 750 let b = read_fd fd buf in 751 let packet_info = read_ctf_header b in 752 let header_size = b.pos in 753 let b, _ = split b packet_info.content_size in 754 check_fmt "trace info packet size" (remaining b >= packet_info.content_size); 755 let ev, evtime = read_event_header packet_info b in 756 check_fmt "trace info packet code" (ev = Ev_trace_info); 757 check_fmt "trace info packet time" (evtime = packet_info.time_begin); 758 let trace_info = read_trace_info b ~packet_info in 759 check_fmt "trace info packet done" (remaining b = 0); 760 let loc_table = Location_code.Tbl.create 20 in 761 let data_off = start_pos + header_size + packet_info.content_size in 762 { fd; info = trace_info; data_off; loc_table } 763 764let report_hack fmt = Fmt.kstr (fun s -> Fmt.epr "%s@." s) fmt 765 766let refill_to size fd stream = 767 let open Read in 768 if remaining stream < size then refill_fd fd stream else stream 769 770let domain_state per_domain start_time domain = 771 try Domain_id.Tbl.find per_domain domain 772 with Not_found -> 773 let reader = Location_codec.Reader.create () in 774 let cache = Backtrace_codec.Reader.create () in 775 let last_timestamp = ref start_time in 776 Domain_id.Tbl.add per_domain domain (reader, cache, last_timestamp); 777 (reader, cache, last_timestamp) 778 779let iter_events_of_packet s per_domain ~parse_backtraces f 780 (packet_header : packet_header_info) b = 781 let open Read in 782 let domain = packet_header.domain in 783 let alloc_id = ref (Int64.to_int packet_header.alloc_id_begin) in 784 let loc_reader, cache, last_timestamp = 785 domain_state per_domain s.info.start_time domain 786 in 787 if parse_backtraces then 788 if 789 not 790 (Backtrace_codec.Reader.check_cache_verifier cache 791 packet_header.cache_verifier) 792 then bad_format "cache verification"; 793 while remaining b > 0 do 794 let ev, time = read_event_header packet_header b in 795 check_fmt "monotone timestamps" (!last_timestamp <= time); 796 last_timestamp := time; 797 let dt = Int64.(sub time s.info.start_time) in 798 match ev with 799 | Ev_trace_info -> bad_format "Multiple trace-info events present" 800 | Ev_location -> 801 let id, loc = Location_codec.Reader.get_location loc_reader b in 802 (*Printf.printf "%3d _ _ location\n" (b.pos - last_pos);*) 803 if Location_code.Tbl.mem s.loc_table id then 804 check_fmt "consistent location info" 805 (Location_code.Tbl.find s.loc_table id = loc) 806 else Location_code.Tbl.add s.loc_table id loc 807 | (Ev_alloc | Ev_short_alloc _) as evcode -> 808 let info = 809 read_alloc ~parse_backtraces ~domain evcode cache !alloc_id b 810 in 811 incr alloc_id; 812 (*Printf.printf "%3d " (b.pos - last_pos);*) 813 f dt info 814 | Ev_collect -> 815 let info = read_collect ~domain !alloc_id b in 816 (*Printf.printf "%3d " (b.pos - last_pos);*) 817 f dt info 818 | Ev_promote -> 819 let info = read_promote ~domain !alloc_id b in 820 (*Printf.printf "%3d " (b.pos - last_pos);*) 821 f dt info 822 done; 823 check_fmt "alloc id sync" (packet_header.alloc_id_end = Int64.of_int !alloc_id) 824 825let iter s ?(parse_backtraces = true) f = 826 let open Read in 827 let per_domain = Domain_id.Tbl.create 1 in 828 Unix.lseek s.fd s.data_off SEEK_SET |> ignore; 829 let rec iter_packets stream = 830 let header_upper_bound = 831 200 832 (* more than big enough for a header *) 833 in 834 let stream = refill_to header_upper_bound s.fd stream in 835 if remaining stream = 0 then () 836 else 837 let packet_header = read_ctf_header stream in 838 let stream = refill_to packet_header.content_size s.fd stream in 839 let packet, rest = split stream packet_header.content_size in 840 if packet_header.pid <> s.info.pid then 841 report_hack "skipping bad packet (wrong pid: %Ld, but tracing %Ld)" 842 packet_header.pid s.info.pid 843 else if remaining packet <> packet_header.content_size then 844 report_hack "skipping truncated packet" 845 else 846 iter_events_of_packet s per_domain ~parse_backtraces f packet_header 847 packet; 848 iter_packets rest 849 in 850 iter_packets (read_fd s.fd (Bytes.make max_packet_size '\000')) 851 852module Private = struct 853 let name_of_memprof_tracer = Atomic.make "" 854 let set_tracer_module s = Atomic.set name_of_memprof_tracer (s ^ ".ext_alloc") 855 let obj_ids_per_chunk = Obj_id.Allocator.ids_per_chunk 856end 857 858module Writer = struct 859 type t = writer 860 861 exception Pid_changed = Pid_changed 862 863 let create = init_writer 864 let domain t = t.domain 865 866 let for_domain_at_time ~start_time t : domain:int -> t = 867 let { dest; pid; getpid; _ } = t in 868 let obj_ids = Obj_id.Allocator.for_new_domain t.obj_ids in 869 fun ~domain -> 870 let obj_ids = obj_ids () in 871 let t = 872 writer_for_domain ~dest ~pid ~getpid ~domain ~obj_ids ~start_time 873 in 874 t 875 876 let for_domain t = for_domain_at_time ~start_time:t.packet_time_end t 877 878 (* Unfortunately, efficient access to the backtrace is not possible 879 with the current Printexc API, even though internally it's an int 880 array. For now, wave the Obj.magic wand. There's a PR to fix this: 881 https://github.com/ocaml/ocaml/pull/9663 *) 882 let loc_codes_of_backtrace (b : Printexc.raw_backtrace) : 883 Location_code.t array = 884 Obj.magic b 885 886 (* Is this a location that we'd prefer to leave out of traces? This mechanism only 887 really makes sense for inlinable functions, so that we can drop a single frame out of 888 a backtrace slot. For non-inlinable functions like 889 [Memprof_tracer.ext_alloc_slowpath], we instead avoid capturing the slot to begin 890 with (using [put_alloc_backtrace_suffix]). *) 891 let is_internal_location (loc : Location.t) = 892 String.equal loc.defname (Atomic.get Private.name_of_memprof_tracer) 893 894 let decode_raw_backtrace_entry callstack i : Location.t list = 895 let open Printexc in 896 let rec get_locations slot : Location.t list = 897 let tail = 898 match get_raw_backtrace_next_slot slot with 899 | None -> [] 900 | Some slot -> get_locations slot 901 in 902 let slot = convert_raw_backtrace_slot slot in 903 match Slot.location slot with 904 | None -> tail 905 | Some { filename; line_number; start_char; end_char; _ } -> 906 let defname = match Slot.name slot with Some n -> n | _ -> "??" in 907 { filename; line = line_number; start_char; end_char; defname } 908 :: tail 909 in 910 let locs = get_locations (get_raw_backtrace_slot callstack i) |> List.rev in 911 match List.filter (fun loc -> not (is_internal_location loc)) locs with 912 | [] -> 913 (* It would break things to return an empty list here, and the worst that happens if 914 we return the whole list is occasionally slightly confusing output (it looks like 915 Memtrace is using memory rather than user code). *) 916 locs 917 | at_least_one -> at_least_one 918 919 let put_alloc_backtrace_suffix t now ~length ~nsamples ~source ~callstack 920 ~drop_slots = 921 let callstack_as_ints = loc_codes_of_backtrace callstack in 922 put_alloc t now ~length ~nsamples ~source ~callstack ~callstack_as_ints 923 ~decode_callstack_entry:decode_raw_backtrace_entry ~drop_slots 924 925 let put_alloc_with_raw_backtrace t now ~length ~nsamples ~source ~callstack = 926 let callstack_as_ints = loc_codes_of_backtrace callstack in 927 put_alloc t now ~length ~nsamples ~source ~callstack ~callstack_as_ints 928 ~decode_callstack_entry:decode_raw_backtrace_entry ~drop_slots:0 929 930 let put_alloc t now ~length ~nsamples ~source ~callstack 931 ~decode_callstack_entry = 932 let decode_callstack_entry cs i = decode_callstack_entry cs.(i) in 933 put_alloc t now ~length ~nsamples ~source ~callstack 934 ~callstack_as_ints:callstack ~decode_callstack_entry ~drop_slots:0 935 936 let put_collect = put_collect 937 let put_promote = put_promote 938 let flush = flush 939 940 let close t = 941 flush t; 942 Buf.Shared_writer_fd.close t.dest 943 944 let put_event w ~decode_callstack_entry now (ev : Event.t) = 945 if Event.domain ev <> w.domain then 946 raise (Invalid_argument "Trace.put_event: mismatched domain fields"); 947 if now < w.packet_time_end then 948 raise (Invalid_argument "Trace.put_event: out-of-order timestamps"); 949 match ev with 950 | Alloc 951 { 952 obj_id; 953 length; 954 domain = _; 955 nsamples; 956 source; 957 backtrace_buffer; 958 backtrace_length; 959 common_prefix = _; 960 } -> 961 let btrev = 962 Array.init backtrace_length (fun i -> 963 backtrace_buffer.(backtrace_length - 1 - i)) 964 in 965 let id = 966 put_alloc w now ~length ~nsamples ~source ~callstack:btrev 967 ~decode_callstack_entry 968 in 969 if id <> obj_id then raise (Invalid_argument "Incorrect allocation ID") 970 | Promote (id, _domain) -> put_promote w now id 971 | Collect (id, _domain) -> put_collect w now id 972 973 module Multiplexed_domains = struct 974 type nonrec t = { 975 mutable last_domain : Domain_id.t; 976 (* Invariant: all writers except possibly that of [last_domain] are flushed *) 977 writers : t Domain_id.Tbl.t; 978 start_time : Timestamp.t; 979 } 980 981 let create dest ?getpid info = 982 let w = create dest ?getpid info in 983 let writers = Domain_id.Tbl.create 1 in 984 let dom = domain w in 985 Domain_id.Tbl.add writers dom w; 986 { last_domain = dom; writers; start_time = info.start_time } 987 988 let writer_for_domain t ~domain = 989 let last_w = Domain_id.Tbl.find t.writers t.last_domain in 990 if domain = t.last_domain then last_w 991 else ( 992 flush last_w; 993 t.last_domain <- domain; 994 try Domain_id.Tbl.find t.writers domain 995 with Not_found -> 996 let w = 997 (for_domain_at_time ~start_time:t.start_time last_w) ~domain 998 in 999 Domain_id.Tbl.add t.writers domain w; 1000 w) 1001 1002 let next_alloc_id t ~domain = 1003 let w = writer_for_domain t ~domain in 1004 if not (Obj_id.Allocator.has_next w.obj_ids) then 1005 flush_at w ~now:w.packet_time_end; 1006 Obj_id.Allocator.read_next_exn w.obj_ids 1007 1008 let put_event t ~decode_callstack_entry time ev = 1009 let w = writer_for_domain t ~domain:(Event.domain ev) in 1010 put_event w ~decode_callstack_entry time ev 1011 1012 let flush t = flush (Domain_id.Tbl.find t.writers t.last_domain) 1013 end 1014end 1015 1016module Reader = struct 1017 type t = reader 1018 1019 let create = init_reader 1020 let info s = s.info 1021 1022 let lookup_location_code { loc_table; _ } code = 1023 match Location_code.Tbl.find loc_table code with 1024 | v -> v 1025 | exception Not_found -> 1026 raise (Invalid_argument (Fmt.str "invalid location code %08x" code)) 1027 1028 let iter = iter 1029 let open_ ~filename = init_reader (Unix.openfile filename [ Unix.O_RDONLY ] 0) 1030 let size_bytes s = (Unix.LargeFile.fstat s.fd).st_size 1031 let close s = Unix.close s.fd 1032end