Skip to content

Commit 31e477b

Browse files
committed
Fallback speedscope exports to temporal records
Keep the normal transform flow for existing formats, but route speedscope exports through temporal allocation records when a capture lacks exact per-allocation timestamps. The reporter uses snapshot times to order surviving intervals for leak reports and the intervals contributing to the peak snapshot for high-water-mark reports. Signed-off-by: Pablo Galindo Salgado <pablogsal@gmail.com>
1 parent dc7ade1 commit 31e477b

6 files changed

Lines changed: 422 additions & 11 deletions

File tree

news/899.feature.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Add ``allocation_timestamps`` support to ``Tracker`` and a ``speedscope``
2+
output format for ``memray transform``. Speedscope exports now fall back to
3+
temporal allocation records to preserve chronological ordering when a capture
4+
does not include per-allocation timestamps.

src/memray/commands/transform.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
import argparse
22
import importlib.util
3+
import os
34
import shutil
45
import sys
56

67
from rich import print as pprint
78

9+
from memray import FileReader
810
from memray._errors import MemrayCommandError
11+
from memray._memray import FileFormat
912

1013
from ..reporters.transform import TransformReporter
1114
from .common import HighWatermarkCommand
15+
from .common import warn_if_file_is_not_aggregated_and_is_too_big
16+
from .common import warn_if_not_enough_symbols
1217

1318

1419
class TransformCommand(HighWatermarkCommand):
@@ -72,3 +77,103 @@ def post_run_gprof2dot(self) -> None:
7277
print()
7378
print("To generate a graph from the transform file, run for example:")
7479
print(f"{command} -f json {self.output_file} | dot -Tpng -o output.png")
80+
81+
def write_report(
82+
self,
83+
result_path,
84+
output_file,
85+
show_memory_leaks,
86+
temporary_allocation_threshold,
87+
merge_threads=None,
88+
inverted=None,
89+
temporal=False,
90+
max_memory_records=None,
91+
no_web=False,
92+
) -> None:
93+
if self.reporter_name != "speedscope":
94+
return super().write_report(
95+
result_path=result_path,
96+
output_file=output_file,
97+
show_memory_leaks=show_memory_leaks,
98+
temporary_allocation_threshold=temporary_allocation_threshold,
99+
merge_threads=merge_threads,
100+
inverted=inverted,
101+
temporal=temporal,
102+
max_memory_records=max_memory_records,
103+
no_web=no_web,
104+
)
105+
106+
try:
107+
kwargs = {}
108+
if max_memory_records is not None:
109+
kwargs["max_memory_records"] = max_memory_records
110+
reader = FileReader(os.fspath(result_path), report_progress=True, **kwargs)
111+
merge_threads = True if merge_threads is None else merge_threads
112+
inverted = False if inverted is None else inverted
113+
114+
if reader.metadata.has_native_traces:
115+
warn_if_not_enough_symbols()
116+
117+
if not temporal and temporary_allocation_threshold < 0:
118+
warn_if_file_is_not_aggregated_and_is_too_big(reader, result_path)
119+
120+
memory_records = tuple(reader.get_memory_snapshots())
121+
reporter_kwargs = {
122+
"memory_records": memory_records,
123+
"native_traces": reader.metadata.has_native_traces,
124+
}
125+
126+
use_temporal_fallback = (
127+
reader.metadata.file_format == FileFormat.ALL_ALLOCATIONS
128+
and not reader.metadata.has_allocation_timestamps
129+
and temporary_allocation_threshold < 0
130+
)
131+
132+
if use_temporal_fallback:
133+
if show_memory_leaks:
134+
allocations = reader.get_temporal_allocation_records(
135+
merge_threads=merge_threads
136+
)
137+
reporter = self.reporter_factory(allocations, **reporter_kwargs)
138+
else:
139+
(
140+
allocations,
141+
high_water_mark_by_snapshot,
142+
) = reader.get_temporal_high_water_mark_allocation_records(
143+
merge_threads=merge_threads
144+
)
145+
reporter = self.reporter_factory(
146+
allocations,
147+
high_water_mark_by_snapshot=high_water_mark_by_snapshot,
148+
**reporter_kwargs,
149+
)
150+
else:
151+
if show_memory_leaks:
152+
allocations = reader.get_leaked_allocation_records(
153+
merge_threads=merge_threads
154+
)
155+
elif temporary_allocation_threshold >= 0:
156+
allocations = reader.get_temporary_allocation_records(
157+
threshold=temporary_allocation_threshold,
158+
merge_threads=merge_threads,
159+
)
160+
else:
161+
allocations = reader.get_high_watermark_allocation_records(
162+
merge_threads=merge_threads
163+
)
164+
reporter = self.reporter_factory(allocations, **reporter_kwargs)
165+
except OSError as e:
166+
raise MemrayCommandError(
167+
f"Failed to parse allocation records in {result_path}\nReason: {e}",
168+
exit_code=1,
169+
)
170+
171+
with open(os.fspath(output_file.expanduser()), "w") as f:
172+
reporter.render(
173+
outfile=f,
174+
metadata=reader.metadata,
175+
show_memory_leaks=show_memory_leaks,
176+
merge_threads=merge_threads,
177+
inverted=inverted,
178+
no_web=no_web,
179+
)

src/memray/reporters/transform.py

Lines changed: 136 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77
from typing import Optional
88
from typing import TextIO
99
from typing import Tuple
10+
from typing import Union
1011

1112
from memray import AllocationRecord
1213
from memray import AllocatorType
1314
from memray import MemorySnapshot
1415
from memray import Metadata
1516
from memray import __version__
17+
from memray._memray import TemporalAllocationRecord
1618
from memray.reporters.common import format_thread_name
1719

1820
Location = Tuple[str, str]
@@ -34,13 +36,15 @@ def __init__(
3436
format: str,
3537
native_traces: bool,
3638
memory_records: Iterable[MemorySnapshot],
39+
high_water_mark_by_snapshot: Optional[List[int]] = None,
3740
**kwargs: Any,
3841
) -> None:
3942
super().__init__()
4043
self.allocations = allocations
4144
self.format = format
4245
self.native_traces = native_traces
43-
self.memory_records = memory_records
46+
self.memory_records = tuple(memory_records)
47+
self.high_water_mark_by_snapshot = high_water_mark_by_snapshot
4448

4549
def render_as_gprof2dot(
4650
self,
@@ -74,7 +78,9 @@ def render_as_gprof2dot(
7478
}
7579
json.dump(result, outfile)
7680

77-
def _stack_trace_for_record(self, record: AllocationRecord) -> Tuple[Tuple[str, str, int], ...]:
81+
def _stack_trace_for_record(
82+
self, record: Union[AllocationRecord, TemporalAllocationRecord]
83+
) -> Tuple[Tuple[str, str, int], ...]:
7884
return (
7985
tuple(record.hybrid_stack_trace())
8086
if self.native_traces
@@ -83,7 +89,7 @@ def _stack_trace_for_record(self, record: AllocationRecord) -> Tuple[Tuple[str,
8389

8490
def _speedscope_sample_for_record(
8591
self,
86-
record: AllocationRecord,
92+
record: Union[AllocationRecord, TemporalAllocationRecord],
8793
*,
8894
location_to_index: Dict[FrameLocation, int],
8995
frames: List[Dict[str, Any]],
@@ -140,7 +146,9 @@ def _aggregate_snapshot_speedscope_samples(
140146
frames: List[Dict[str, Any]] = []
141147
sample_weights: Dict[FrameSample, List[int]] = {}
142148
sample_order: Dict[FrameSample, int] = {}
143-
has_exact_timestamps = metadata is not None and metadata.has_allocation_timestamps
149+
has_exact_timestamps = (
150+
metadata is not None and metadata.has_allocation_timestamps
151+
)
144152

145153
for sequence, record in enumerate(allocations):
146154
sample = self._speedscope_sample_for_record(
@@ -164,6 +172,111 @@ def _aggregate_snapshot_speedscope_samples(
164172
)
165173
return frames, ordered_samples
166174

175+
def _snapshot_order_key(self, snapshot_index: int) -> int:
176+
if 0 <= snapshot_index < len(self.memory_records):
177+
# Convert ms-since-epoch to µs for comparison with timestamp_us
178+
return self.memory_records[snapshot_index].time * 1000
179+
if self.memory_records:
180+
return self.memory_records[-1].time * 1000
181+
return snapshot_index
182+
183+
def _peak_snapshot_index(self) -> int:
184+
high_water_mark_by_snapshot = self.high_water_mark_by_snapshot or [0]
185+
return max(
186+
range(len(high_water_mark_by_snapshot)),
187+
key=high_water_mark_by_snapshot.__getitem__,
188+
)
189+
190+
def _contribution_for_temporal_record(
191+
self,
192+
record: TemporalAllocationRecord,
193+
*,
194+
show_memory_leaks: bool,
195+
peak_snapshot: Optional[int] = None,
196+
) -> Tuple[int, int, Optional[int]]:
197+
size = 0
198+
n_allocations = 0
199+
first_snapshot = None
200+
201+
if show_memory_leaks:
202+
for interval in record.intervals:
203+
if interval.deallocated_before_snapshot is not None:
204+
continue
205+
size += interval.n_bytes
206+
n_allocations += interval.n_allocations
207+
snapshot = interval.allocated_before_snapshot
208+
if first_snapshot is None or snapshot < first_snapshot:
209+
first_snapshot = snapshot
210+
return size, n_allocations, first_snapshot
211+
212+
if peak_snapshot is None:
213+
peak_snapshot = self._peak_snapshot_index()
214+
for interval in record.intervals:
215+
if interval.allocated_before_snapshot > peak_snapshot:
216+
continue
217+
if (
218+
interval.deallocated_before_snapshot is not None
219+
and peak_snapshot >= interval.deallocated_before_snapshot
220+
):
221+
continue
222+
size += interval.n_bytes
223+
n_allocations += interval.n_allocations
224+
snapshot = interval.allocated_before_snapshot
225+
if first_snapshot is None or snapshot < first_snapshot:
226+
first_snapshot = snapshot
227+
return size, n_allocations, first_snapshot
228+
229+
def _aggregate_temporal_speedscope_samples(
230+
self,
231+
allocations: Iterable[TemporalAllocationRecord],
232+
*,
233+
show_memory_leaks: bool,
234+
) -> Tuple[List[Dict[str, Any]], List[Tuple[FrameSample, List[int]]]]:
235+
location_to_index: Dict[FrameLocation, int] = {}
236+
frames: List[Dict[str, Any]] = []
237+
sample_weights: Dict[FrameSample, List[int]] = {}
238+
sample_order: Dict[FrameSample, int] = {}
239+
240+
peak_snapshot = None if show_memory_leaks else self._peak_snapshot_index()
241+
242+
for sequence, record in enumerate(allocations):
243+
sample = self._speedscope_sample_for_record(
244+
record,
245+
location_to_index=location_to_index,
246+
frames=frames,
247+
)
248+
(
249+
size,
250+
n_allocations,
251+
first_snapshot,
252+
) = self._contribution_for_temporal_record(
253+
record,
254+
show_memory_leaks=show_memory_leaks,
255+
peak_snapshot=peak_snapshot,
256+
)
257+
if size <= 0 and n_allocations <= 0:
258+
continue
259+
260+
order_key = (
261+
self._snapshot_order_key(first_snapshot)
262+
if first_snapshot is not None
263+
else sequence
264+
)
265+
self._add_speedscope_sample(
266+
sample=sample,
267+
size=size,
268+
n_allocations=n_allocations,
269+
order_key=order_key,
270+
sample_weights=sample_weights,
271+
sample_order=sample_order,
272+
)
273+
274+
ordered_samples = sorted(
275+
sample_weights.items(),
276+
key=lambda item: (sample_order[item[0]], item[0]),
277+
)
278+
return frames, ordered_samples
279+
167280
def _create_speedscope_profile(
168281
self,
169282
*,
@@ -198,10 +311,19 @@ def render_as_speedscope(
198311
**kwargs: Any,
199312
) -> None:
200313
metadata = kwargs.get("metadata")
201-
frames, sample_weights = self._aggregate_snapshot_speedscope_samples(
202-
self.allocations,
203-
metadata=metadata,
204-
)
314+
show_memory_leaks = kwargs.get("show_memory_leaks", False)
315+
allocations = list(self.allocations)
316+
317+
if allocations and self._is_temporal_record(allocations[0]):
318+
frames, sample_weights = self._aggregate_temporal_speedscope_samples(
319+
allocations,
320+
show_memory_leaks=show_memory_leaks,
321+
)
322+
else:
323+
frames, sample_weights = self._aggregate_snapshot_speedscope_samples(
324+
allocations,
325+
metadata=metadata,
326+
)
205327

206328
result = {
207329
"$schema": "https://www.speedscope.app/file-format-schema.json",
@@ -272,3 +394,9 @@ def render_as_csv(
272394
"|".join(f"{func};{mod};{line}" for func, mod, line in stack_trace),
273395
]
274396
)
397+
398+
@staticmethod
399+
def _is_temporal_record(
400+
record: Union[AllocationRecord, TemporalAllocationRecord]
401+
) -> bool:
402+
return hasattr(record, "intervals")

0 commit comments

Comments
 (0)