Skip to content

Commit 4741be5

Browse files
Enhance tests for SynchronousMeasurementConsumer
Added tests for SynchronousMeasurementConsumer's behavior after forking, including registration of fork handlers and reinitialization of storage.
1 parent 039942b commit 4741be5

1 file changed

Lines changed: 328 additions & 1 deletion

File tree

opentelemetry-sdk/tests/metrics/test_measurement_consumer.py

Lines changed: 328 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
# pylint: disable=invalid-name,no-self-use
15+
# pylint: disable=invalid-name,no-self-use,protected-access
1616

1717
from time import sleep
1818
from unittest import TestCase
@@ -27,6 +27,17 @@
2727
)
2828

2929

30+
def _sdk_config(exemplar_filter=None, resource=None, metric_readers=None, views=None):
31+
"""Create SdkConfiguration for tests. exemplar_filter is set as attribute after init."""
32+
config = SdkConfiguration(
33+
resource=resource or Mock(),
34+
metric_readers=metric_readers or [Mock()],
35+
views=views or Mock(),
36+
exemplar_filter=exemplar_filter or Mock(should_sample=Mock(return_value=False))
37+
)
38+
return config
39+
40+
3041
@patch(
3142
"opentelemetry.sdk.metrics._internal."
3243
"measurement_consumer.MetricReaderStorage"
@@ -192,3 +203,319 @@ def sleep_1(*args, **kwargs):
192203
callback_options_time_call,
193204
10000,
194205
)
206+
207+
208+
@patch(
209+
"opentelemetry.sdk.metrics._internal."
210+
"measurement_consumer.MetricReaderStorage"
211+
)
212+
class TestSynchronousMeasurementConsumerForkHandler(TestCase):
213+
"""Exhaustive tests for fork handler, needs_storage_reinit, and lazy _reinit_storages."""
214+
215+
def test_register_at_fork_called_when_available(
216+
self, MockMetricReaderStorage
217+
):
218+
"""Consumer should register fork handler when os.register_at_fork exists."""
219+
register_mock = Mock()
220+
with patch(
221+
"opentelemetry.sdk.metrics._internal.measurement_consumer.os"
222+
) as mock_os:
223+
mock_os.register_at_fork = register_mock
224+
225+
SynchronousMeasurementConsumer(_sdk_config())
226+
register_mock.assert_called_once()
227+
call_kwargs = register_mock.call_args[1]
228+
self.assertIn("after_in_child", call_kwargs)
229+
self.assertTrue(callable(call_kwargs["after_in_child"]))
230+
231+
def test_at_fork_reinit_sets_needs_storage_reinit_and_clears_async_instruments(
232+
self, MockMetricReaderStorage
233+
):
234+
"""_at_fork_reinit should set _needs_storage_reinit=True and clear _async_instruments."""
235+
reader_mock = Mock()
236+
storage_mock = Mock()
237+
storage_mock._lock = Mock()
238+
storage_mock._instrument_view_instrument_matches = {}
239+
MockMetricReaderStorage.return_value = storage_mock
240+
241+
register_mock = Mock()
242+
with patch(
243+
"opentelemetry.sdk.metrics._internal.measurement_consumer.os"
244+
) as mock_os:
245+
mock_os.register_at_fork = register_mock
246+
247+
consumer = SynchronousMeasurementConsumer(
248+
_sdk_config(metric_readers=[reader_mock])
249+
)
250+
async_instrument = MagicMock()
251+
consumer.register_asynchronous_instrument(async_instrument)
252+
self.assertEqual(len(consumer._async_instruments), 1)
253+
254+
# Simulate fork: call the after_in_child callback
255+
after_in_child = register_mock.call_args[1]["after_in_child"]
256+
after_in_child()
257+
258+
self.assertTrue(consumer._needs_storage_reinit)
259+
self.assertEqual(len(consumer._async_instruments), 0)
260+
261+
def test_consume_measurement_triggers_lazy_reinit_on_first_use_after_fork(
262+
self, MockMetricReaderStorage
263+
):
264+
"""First consume_measurement after fork should call _reinit_storages."""
265+
reader_mocks = [Mock()]
266+
storage_mocks = [Mock()]
267+
storage_mocks[0]._lock = Mock()
268+
storage_mocks[0]._instrument_view_instrument_matches = {}
269+
storage_mocks[0].consume_measurement = Mock()
270+
MockMetricReaderStorage.side_effect = storage_mocks
271+
272+
register_mock = Mock()
273+
with patch(
274+
"opentelemetry.sdk.metrics._internal.measurement_consumer.os"
275+
) as mock_os:
276+
mock_os.register_at_fork = register_mock
277+
278+
consumer = SynchronousMeasurementConsumer(
279+
_sdk_config(metric_readers=reader_mocks)
280+
)
281+
after_in_child = register_mock.call_args[1]["after_in_child"]
282+
after_in_child()
283+
284+
with patch.object(
285+
consumer, "_reinit_storages", wraps=consumer._reinit_storages
286+
) as reinit_spy:
287+
consumer.consume_measurement(Mock())
288+
reinit_spy.assert_called_once()
289+
290+
def test_consume_measurement_does_not_reinit_on_second_call(
291+
self, MockMetricReaderStorage
292+
):
293+
"""Second consume_measurement after fork should NOT call _reinit_storages again."""
294+
reader_mocks = [Mock()]
295+
storage_mock = Mock()
296+
storage_mock._lock = Mock()
297+
storage_mock._instrument_view_instrument_matches = {}
298+
storage_mock.consume_measurement = Mock()
299+
MockMetricReaderStorage.return_value = storage_mock
300+
301+
register_mock = Mock()
302+
with patch(
303+
"opentelemetry.sdk.metrics._internal.measurement_consumer.os"
304+
) as mock_os:
305+
mock_os.register_at_fork = register_mock
306+
307+
consumer = SynchronousMeasurementConsumer(
308+
_sdk_config(metric_readers=reader_mocks)
309+
)
310+
after_in_child = register_mock.call_args[1]["after_in_child"]
311+
after_in_child()
312+
313+
with patch.object(
314+
consumer, "_reinit_storages", wraps=consumer._reinit_storages
315+
) as reinit_spy:
316+
consumer.consume_measurement(Mock())
317+
consumer.consume_measurement(Mock())
318+
reinit_spy.assert_called_once()
319+
320+
def test_collect_triggers_lazy_reinit_on_first_use_after_fork(
321+
self, MockMetricReaderStorage
322+
):
323+
"""First collect after fork should call _reinit_storages."""
324+
reader_mock = Mock()
325+
storage_mock = Mock()
326+
storage_mock._lock = Mock()
327+
storage_mock._instrument_view_instrument_matches = {}
328+
storage_mock.collect.return_value = []
329+
MockMetricReaderStorage.return_value = storage_mock
330+
331+
register_mock = Mock()
332+
with patch(
333+
"opentelemetry.sdk.metrics._internal.measurement_consumer.os"
334+
) as mock_os:
335+
mock_os.register_at_fork = register_mock
336+
337+
consumer = SynchronousMeasurementConsumer(
338+
_sdk_config(metric_readers=[reader_mock])
339+
)
340+
after_in_child = register_mock.call_args[1]["after_in_child"]
341+
after_in_child()
342+
343+
with patch.object(
344+
consumer, "_reinit_storages", wraps=consumer._reinit_storages
345+
) as reinit_spy:
346+
consumer.collect(reader_mock)
347+
reinit_spy.assert_called_once()
348+
349+
def test_collect_does_not_reinit_on_second_call(
350+
self, MockMetricReaderStorage
351+
):
352+
"""Second collect after fork should NOT call _reinit_storages again."""
353+
reader_mock = Mock()
354+
storage_mock = Mock()
355+
storage_mock._lock = Mock()
356+
storage_mock._instrument_view_instrument_matches = {}
357+
storage_mock.collect.return_value = []
358+
MockMetricReaderStorage.return_value = storage_mock
359+
360+
register_mock = Mock()
361+
with patch(
362+
"opentelemetry.sdk.metrics._internal.measurement_consumer.os"
363+
) as mock_os:
364+
mock_os.register_at_fork = register_mock
365+
366+
consumer = SynchronousMeasurementConsumer(
367+
_sdk_config(metric_readers=[reader_mock])
368+
)
369+
after_in_child = register_mock.call_args[1]["after_in_child"]
370+
after_in_child()
371+
372+
with patch.object(
373+
consumer, "_reinit_storages", wraps=consumer._reinit_storages
374+
) as reinit_spy:
375+
consumer.collect(reader_mock)
376+
consumer.collect(reader_mock)
377+
reinit_spy.assert_called_once()
378+
379+
def test_consume_then_collect_after_fork_reinits_once(
380+
self, MockMetricReaderStorage
381+
):
382+
"""After fork, consume_measurement triggers reinit; collect uses same reinit (no second call)."""
383+
reader_mock = Mock()
384+
storage_mock = Mock()
385+
storage_mock._lock = Mock()
386+
storage_mock._instrument_view_instrument_matches = {}
387+
storage_mock.consume_measurement = Mock()
388+
storage_mock.collect.return_value = []
389+
MockMetricReaderStorage.return_value = storage_mock
390+
391+
register_mock = Mock()
392+
with patch(
393+
"opentelemetry.sdk.metrics._internal.measurement_consumer.os"
394+
) as mock_os:
395+
mock_os.register_at_fork = register_mock
396+
397+
consumer = SynchronousMeasurementConsumer(
398+
_sdk_config(metric_readers=[reader_mock])
399+
)
400+
after_in_child = register_mock.call_args[1]["after_in_child"]
401+
after_in_child()
402+
403+
with patch.object(
404+
consumer, "_reinit_storages", wraps=consumer._reinit_storages
405+
) as reinit_spy:
406+
consumer.consume_measurement(Mock())
407+
consumer.collect(reader_mock)
408+
reinit_spy.assert_called_once()
409+
410+
def test_collect_then_consume_after_fork_reinits_once(
411+
self, MockMetricReaderStorage
412+
):
413+
"""After fork, collect triggers reinit; consume_measurement uses same reinit (no second call)."""
414+
reader_mock = Mock()
415+
storage_mock = Mock()
416+
storage_mock._lock = Mock()
417+
storage_mock._instrument_view_instrument_matches = {}
418+
storage_mock.consume_measurement = Mock()
419+
storage_mock.collect.return_value = []
420+
MockMetricReaderStorage.return_value = storage_mock
421+
422+
register_mock = Mock()
423+
with patch(
424+
"opentelemetry.sdk.metrics._internal.measurement_consumer.os"
425+
) as mock_os:
426+
mock_os.register_at_fork = register_mock
427+
428+
consumer = SynchronousMeasurementConsumer(
429+
_sdk_config(metric_readers=[reader_mock])
430+
)
431+
after_in_child = register_mock.call_args[1]["after_in_child"]
432+
after_in_child()
433+
434+
with patch.object(
435+
consumer, "_reinit_storages", wraps=consumer._reinit_storages
436+
) as reinit_spy:
437+
consumer.collect(reader_mock)
438+
consumer.consume_measurement(Mock())
439+
reinit_spy.assert_called_once()
440+
441+
def test_no_reinit_on_consume_measurement_without_fork(
442+
self, MockMetricReaderStorage
443+
):
444+
"""consume_measurement without prior fork should NOT call _reinit_storages."""
445+
reader_mocks = [Mock()]
446+
storage_mock = Mock()
447+
storage_mock._lock = Mock()
448+
storage_mock._instrument_view_instrument_matches = {}
449+
storage_mock.consume_measurement = Mock()
450+
MockMetricReaderStorage.return_value = storage_mock
451+
452+
with patch(
453+
"opentelemetry.sdk.metrics._internal.measurement_consumer.os"
454+
) as mock_os:
455+
mock_os.register_at_fork = Mock()
456+
457+
consumer = SynchronousMeasurementConsumer(
458+
_sdk_config(metric_readers=reader_mocks)
459+
)
460+
# Do NOT simulate fork
461+
with patch.object(
462+
consumer, "_reinit_storages", wraps=consumer._reinit_storages
463+
) as reinit_spy:
464+
consumer.consume_measurement(Mock())
465+
reinit_spy.assert_not_called()
466+
467+
def test_no_reinit_on_collect_without_fork(
468+
self, MockMetricReaderStorage
469+
):
470+
"""collect without prior fork should NOT call _reinit_storages."""
471+
reader_mock = Mock()
472+
storage_mock = Mock()
473+
storage_mock._lock = Mock()
474+
storage_mock._instrument_view_instrument_matches = {}
475+
storage_mock.collect.return_value = []
476+
MockMetricReaderStorage.return_value = storage_mock
477+
478+
with patch(
479+
"opentelemetry.sdk.metrics._internal.measurement_consumer.os"
480+
) as mock_os:
481+
mock_os.register_at_fork = Mock()
482+
483+
consumer = SynchronousMeasurementConsumer(
484+
_sdk_config(metric_readers=[reader_mock])
485+
)
486+
with patch.object(
487+
consumer, "_reinit_storages", wraps=consumer._reinit_storages
488+
) as reinit_spy:
489+
consumer.collect(reader_mock)
490+
reinit_spy.assert_not_called()
491+
492+
def test_collect_after_fork_does_not_invoke_cleared_async_instruments(
493+
self, MockMetricReaderStorage
494+
):
495+
"""After fork, collect should not invoke async instruments (they were cleared)."""
496+
reader_mock = Mock()
497+
storage_mock = Mock()
498+
storage_mock._lock = Mock()
499+
storage_mock._instrument_view_instrument_matches = {}
500+
storage_mock.collect.return_value = []
501+
MockMetricReaderStorage.return_value = storage_mock
502+
503+
register_mock = Mock()
504+
with patch(
505+
"opentelemetry.sdk.metrics._internal.measurement_consumer.os"
506+
) as mock_os:
507+
mock_os.register_at_fork = register_mock
508+
509+
consumer = SynchronousMeasurementConsumer(
510+
_sdk_config(metric_readers=[reader_mock])
511+
)
512+
async_instrument = MagicMock()
513+
async_instrument.callback.return_value = []
514+
consumer.register_asynchronous_instrument(async_instrument)
515+
516+
after_in_child = register_mock.call_args[1]["after_in_child"]
517+
after_in_child()
518+
519+
consumer.collect(reader_mock)
520+
521+
async_instrument.callback.assert_not_called()

0 commit comments

Comments
 (0)