Skip to content

Commit

Permalink
Change to streaming out the heap snapshot data (JuliaLang#52854)
Browse files Browse the repository at this point in the history
This PR is to continue the work on the following PR:

Prevent OOMs during heap snapshot: Change to streaming out the snapshot
data (JuliaLang#51518 )

Here are the commit history:

```
* Streaming the heap snapshot!

This should prevent the engine from OOMing while recording the snapshot!

Now we just need to sample the files, either online, before downloading, or offline after downloading :)

If we're gonna do it offline, we'll want to gzip the files before downloading them.

* Allow custom filename; use original API

* Support legacy heap snapshot interface. Add reassembly function.

* Add tests

* Apply suggestions from code review

* Update src/gc-heap-snapshot.cpp

* Change to always save the parts in the same directory

This way you can always recover from an OOM

* Fix bug in reassembler: from_node and to_node were in the wrong order

* Fix correctness mistake: The edges have to be reordered according to the node order. That's the whole reason this is tricky.

But i'm not sure now whether the SoAs approach is actually an optimization.... It seems like we should probably prefer to inline the Edges right into the vector, rather than having to do another random lookup into the edges table?

* Debugging messed up edge array idxs

* Disable log message

* Write the .nodes and .edges as binary data

* Remove unnecessary logging

* fix merge issues

* attempt to add back the orphan node checking logic
```

---------

Co-authored-by: Nathan Daly <[email protected]>
Co-authored-by: Nathan Daly <[email protected]>
  • Loading branch information
3 people authored and RAI CI (GitHub Action Automation) committed May 28, 2024
1 parent 06c6392 commit 5db6700
Show file tree
Hide file tree
Showing 7 changed files with 574 additions and 204 deletions.
401 changes: 207 additions & 194 deletions src/gc-heap-snapshot.cpp

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/gc-heap-snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ static inline void gc_heap_snapshot_record_finlist(jl_value_t *finlist, size_t i
// ---------------------------------------------------------------------
// Functions to call from Julia to take heap snapshot
// ---------------------------------------------------------------------
JL_DLLEXPORT void jl_gc_take_heap_snapshot(ios_t *stream, char all_one);
JL_DLLEXPORT void jl_gc_take_heap_snapshot(ios_t *nodes, ios_t *edges,
ios_t *strings, ios_t *json, char all_one);


#ifdef __cplusplus
Expand Down
19 changes: 19 additions & 0 deletions stdlib/Profile/docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,24 @@ Traces and records julia objects on the heap. This only records objects known to
garbage collector. Memory allocated by external libraries not managed by the garbage
collector will not show up in the snapshot.

To avoid OOMing while recording the snapshot, we added a streaming option to stream out the heap snapshot
into four files,

```julia-repl
julia> using Profile
julia> Profile.take_heap_snapshot("snapshot"; streaming=true)
```

where "snapshot" is the filepath as the prefix for the generated files.

Once the snapshot files are generated, they could be assembled offline with the following command:

```julia-repl
julia> using Profile
julia> Profile.HeapSnapshot.assemble_snapshot("snapshot", "snapshot.heapsnapshot")
```

The resulting heap snapshot file can be uploaded to chrome devtools to be viewed.
For more information, see the [chrome devtools docs](https://developer.chrome.com/docs/devtools/memory-problems/heap-snapshots/#view_snapshots).
69 changes: 60 additions & 9 deletions stdlib/Profile/src/Profile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1220,9 +1220,8 @@ end


"""
Profile.take_heap_snapshot(io::IOStream, all_one::Bool=false)
Profile.take_heap_snapshot(filepath::String, all_one::Bool=false)
Profile.take_heap_snapshot(all_one::Bool=false; dir::String)
Profile.take_heap_snapshot(filepath::String, all_one::Bool=false, streaming=false)
Profile.take_heap_snapshot(all_one::Bool=false; dir::String, streaming=false)
Write a snapshot of the heap, in the JSON format expected by the Chrome
Devtools Heap Snapshot viewer (.heapsnapshot extension) to a file
Expand All @@ -1232,16 +1231,67 @@ full file path, or IO stream.
If `all_one` is true, then report the size of every object as one so they can be easily
counted. Otherwise, report the actual size.
If `streaming` is true, we will stream the snapshot data out into four files, using filepath
as the prefix, to avoid having to hold the entire snapshot in memory. This option should be
used for any setting where your memory is constrained. These files can then be reassembled
by calling Profile.HeapSnapshot.assemble_snapshot(), which can
be done offline.
NOTE: We strongly recommend setting streaming=true for performance reasons. Reconstructing
the snapshot from the parts requires holding the entire snapshot in memory, so if the
snapshot is large, you can run out of memory while processing it. Streaming allows you to
reconstruct the snapshot offline, after your workload is done running.
If you do attempt to collect a snapshot with streaming=false (the default, for
backwards-compatibility) and your process is killed, note that this will always save the
parts in the same directory as your provided filepath, so you can still reconstruct the
snapshot after the fact, via `assemble_snapshot()`.
"""
function take_heap_snapshot(io::IOStream, all_one::Bool=false)
Base.@_lock_ios(io, ccall(:jl_gc_take_heap_snapshot, Cvoid, (Ptr{Cvoid}, Cchar), io.handle, Cchar(all_one)))
end
function take_heap_snapshot(filepath::String, all_one::Bool=false)
open(filepath, "w") do io
take_heap_snapshot(io, all_one)
function take_heap_snapshot(filepath::AbstractString, all_one::Bool=false; streaming::Bool=false)
if streaming
_stream_heap_snapshot(filepath, all_one)
else
# Support the legacy, non-streaming mode, by first streaming the parts, then
# reassembling it after we're done.
prefix = filepath
_stream_heap_snapshot(prefix, all_one)
Profile.HeapSnapshot.assemble_snapshot(prefix, filepath)
end
return filepath
end
function take_heap_snapshot(io::IO, all_one::Bool=false)
# Support the legacy, non-streaming mode, by first streaming the parts to a tempdir,
# then reassembling it after we're done.
dir = tempdir()
prefix = joinpath(dir, "snapshot")
_stream_heap_snapshot(prefix, all_one)
Profile.HeapSnapshot.assemble_snapshot(prefix, io)
end
function _stream_heap_snapshot(prefix::AbstractString, all_one::Bool)
# Nodes and edges are binary files
open("$prefix.nodes", "w") do nodes
open("$prefix.edges", "w") do edges
open("$prefix.strings", "w") do strings
# The following file is json data
open("$prefix.metadata.json", "w") do json
Base.@_lock_ios(nodes,
Base.@_lock_ios(edges,
Base.@_lock_ios(strings,
Base.@_lock_ios(json,
ccall(:jl_gc_take_heap_snapshot,
Cvoid,
(Ptr{Cvoid},Ptr{Cvoid},Ptr{Cvoid},Ptr{Cvoid}, Cchar),
nodes.handle, edges.handle, strings.handle, json.handle,
Cchar(all_one))
)
)
)
)
end
end
end
end
end
function take_heap_snapshot(all_one::Bool=false; dir::Union{Nothing,S}=nothing) where {S <: AbstractString}
fname = "$(getpid())_$(time_ns()).heapsnapshot"
if isnothing(dir)
Expand Down Expand Up @@ -1277,6 +1327,7 @@ function take_page_profile(filepath::String)
end

include("Allocs.jl")
include("heapsnapshot_reassemble.jl")
include("precompile.jl")

end # module
231 changes: 231 additions & 0 deletions stdlib/Profile/src/heapsnapshot_reassemble.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

module HeapSnapshot

"""
assemble_snapshot(filepath::AbstractString, out_file::AbstractString)
Assemble a .heapsnapshot file from the .json files produced by `Profile.take_snapshot`.
"""

# SoA layout to reduce padding
struct Edges
type::Vector{Int8} # index into `snapshot.meta.edge_types`
name_or_index::Vector{UInt} # Either an index into `snapshot.strings`, or the index in an array, depending on edge_type
to_pos::Vector{UInt} # index into `snapshot.nodes`
end
function Edges(n::Int)
Edges(
Vector{Int8}(undef, n),
Vector{UInt}(undef, n),
Vector{UInt}(undef, n),
)
end
Base.length(n::Edges) = length(n.type)

# trace_node_id and detachedness are always 0 in the snapshots Julia produces so we don't store them
struct Nodes
type::Vector{Int8} # index into `snapshot.meta.node_types`
name_idx::Vector{UInt32} # index into `snapshot.strings`
id::Vector{UInt} # unique id, in julia it is the address of the object
self_size::Vector{Int} # size of the object itself, not including the size of its fields
edge_count::Vector{UInt} # number of outgoing edges
edges::Edges # outgoing edges
# This is the main complexity of the .heapsnapshot format, and it's the reason we need
# to read in all the data before writing it out. The edges vector contains all edges,
# but organized by which node they came from. First, it contains all the edges coming
# out of node 0, then all edges leaving node 1, etc. So we need to have visited all
# edges, and assigned them to their corresponding nodes, before we can emit the file.
edge_idxs::Vector{Vector{UInt}} # indexes into edges, keeping per-node outgoing edge ids
end
function Nodes(n::Int, e::Int)
Nodes(
Vector{Int8}(undef, n),
Vector{UInt32}(undef, n),
Vector{UInt}(undef, n),
Vector{Int}(undef, n),
Vector{UInt32}(undef, n),
Edges(e),
[Vector{UInt}() for _ in 1:n], # Take care to construct n separate empty vectors
)
end
Base.length(n::Nodes) = length(n.type)

const k_node_number_of_fields = 7

# Like Base.dec, but doesn't allocate a string and writes directly to the io object
# We know all of the numbers we're about to write fit into a UInt and are non-negative
let _dec_d100 = UInt16[(0x30 + i % 10) << 0x8 + (0x30 + i ÷ 10) for i = 0:99]
global _write_decimal_number
_write_decimal_number(io, x::Integer, buf) = _write_decimal_number(io, unsigned(x), buf)
function _write_decimal_number(io, x::Unsigned, digits_buf)
buf = digits_buf
n = ndigits(x)
i = n
@inbounds while i >= 2
d, r = divrem(x, 0x64)
d100 = _dec_d100[(r % Int)::Int + 1]
buf[i-1] = d100 % UInt8
buf[i] = (d100 >> 0x8) % UInt8
x = oftype(x, d)
i -= 2
end
if i > 0
@inbounds buf[i] = 0x30 + (rem(x, 0xa) % UInt8)::UInt8
end
write(io, @view buf[max(i, 1):n])
end
end

function assemble_snapshot(in_prefix, out_file::AbstractString = in_prefix)
open(out_file, "w") do io
assemble_snapshot(in_prefix, io)
end
end

# Manually parse and write the .json files, given that we don't have JSON import/export in
# julia's stdlibs.
function assemble_snapshot(in_prefix, io::IO)
preamble = read(string(in_prefix, ".metadata.json"), String)
pos = last(findfirst("node_count\":", preamble)) + 1
endpos = findnext(==(','), preamble, pos) - 1
node_count = parse(Int, String(@view preamble[pos:endpos]))

pos = last(findnext("edge_count\":", preamble, endpos)) + 1
endpos = findnext(==('}'), preamble, pos) - 1
edge_count = parse(Int, String(@view preamble[pos:endpos]))

nodes = Nodes(node_count, edge_count)

orphans = Set{UInt}() # nodes that have no incoming edges
# Parse nodes with empty edge counts that we need to fill later
nodes_file = open(string(in_prefix, ".nodes"), "r")
for i in 1:length(nodes)
node_type = read(nodes_file, Int8)
node_name_idx = read(nodes_file, UInt)
id = read(nodes_file, UInt)
self_size = read(nodes_file, Int)
@assert read(nodes_file, Int) == 0 # trace_node_id
@assert read(nodes_file, Int8) == 0 # detachedness

nodes.type[i] = node_type
nodes.name_idx[i] = node_name_idx
nodes.id[i] = id
nodes.self_size[i] = self_size
nodes.edge_count[i] = 0 # edge_count
# populate the orphans set with node index
push!(orphans, i-1)
end

# Parse the edges to fill in the edge counts for nodes and correct the to_node offsets
edges_file = open(string(in_prefix, ".edges"), "r")
for i in 1:length(nodes.edges)
edge_type = read(edges_file, Int8)
edge_name_or_index = read(edges_file, UInt)
from_node = read(edges_file, UInt)
to_node = read(edges_file, UInt)

nodes.edges.type[i] = edge_type
nodes.edges.name_or_index[i] = edge_name_or_index
nodes.edges.to_pos[i] = to_node * k_node_number_of_fields # 7 fields per node, the streaming format doesn't multiply the offset by 7
nodes.edge_count[from_node + 1] += UInt32(1) # C and JSON use 0-based indexing
push!(nodes.edge_idxs[from_node + 1], i) # Index into nodes.edges
# remove the node from the orphans if it has at least one incoming edge
if to_node in orphans
delete!(orphans, to_node)
end
end

_digits_buf = zeros(UInt8, ndigits(typemax(UInt)))
println(io, @view(preamble[1:end-2]), ",") # remove trailing "}\n", we don't end the snapshot here
println(io, "\"nodes\":[")
for i in 1:length(nodes)
i > 1 && println(io, ",")
_write_decimal_number(io, nodes.type[i], _digits_buf)
print(io, ",")
_write_decimal_number(io, nodes.name_idx[i], _digits_buf)
print(io, ",")
_write_decimal_number(io, nodes.id[i], _digits_buf)
print(io, ",")
_write_decimal_number(io, nodes.self_size[i], _digits_buf)
print(io, ",")
_write_decimal_number(io, nodes.edge_count[i], _digits_buf)
print(io, ",0,0")
end
print(io, "],\"edges\":[")
e = 1
for n in 1:length(nodes)
count = nodes.edge_count[n]
len_edges = length(nodes.edge_idxs[n])
@assert count == len_edges "For node $n: $count != $len_edges"
for i in nodes.edge_idxs[n]
e > 1 && print(io, ",")
println(io)
_write_decimal_number(io, nodes.edges.type[i], _digits_buf)
print(io, ",")
_write_decimal_number(io, nodes.edges.name_or_index[i], _digits_buf)
print(io, ",")
_write_decimal_number(io, nodes.edges.to_pos[i], _digits_buf)
if !(nodes.edges.to_pos[i] % k_node_number_of_fields == 0)
@warn "Bug in to_pos for edge $i from node $n: $(nodes.edges.to_pos[i])"
end
e += 1
end
end
println(io, "],")

println(io, "\"strings\":[")
open(string(in_prefix, ".strings"), "r") do strings_io
first = true
while !eof(strings_io)
str_size = read(strings_io, UInt)
str_bytes = read(strings_io, str_size)
str = String(str_bytes)
if first
print_str_escape_json(io, str)
first = false
else
print(io, ",\n")
print_str_escape_json(io, str)
end
end
end
print(io, "]}")

# remove the uber node from the orphans
if 0 in orphans
delete!(orphans, 0)
end

@assert isempty(orphans) "Orphaned nodes: $(orphans), node count: $(length(nodes)), orphan node count: $(length(orphans))"

return nothing
end

function print_str_escape_json(stream::IO, s::AbstractString)
print(stream, '"')
for c in s
if c == '"'
print(stream, "\\\"")
elseif c == '\\'
print(stream, "\\\\")
elseif c == '\b'
print(stream, "\\b")
elseif c == '\f'
print(stream, "\\f")
elseif c == '\n'
print(stream, "\\n")
elseif c == '\r'
print(stream, "\\r")
elseif c == '\t'
print(stream, "\\t")
elseif '\x00' <= c <= '\x1f'
print(stream, "\\u", lpad(string(UInt16(c), base=16), 4, '0'))
else
print(stream, c)
end
end
print(stream, '"')
end

end
Loading

0 comments on commit 5db6700

Please sign in to comment.