diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index db6f24b88dc..23afc4701ef 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -2796,65 +2796,110 @@ def run_error_message_on_failure_to_read_aci_sec_context(args): ), f"Did not find expected log messages: {expected_log_messages}" -def run_error_message_on_failure_to_fetch_snapshot(const_args): - args = copy.deepcopy(const_args) - args.nodes = infra.e2e_args.min_nodes(args, 0) - with infra.network.network( - args.nodes, - args.binary_dir, - args.debug_nodes, - pdb=args.pdb, - ) as network: - network.start_and_open(args) +def _assert_snapshot_fetch_failure_messages(node, timeout_s=0): + """Assert that the expected snapshot fetch failure messages appear in node's logs. - primary, _ = network.find_primary() + Polls the node's logs until all expected messages are found or timeout_s elapses. + Use the default timeout_s=0 for nodes that have already stopped (single pass). + Pass a positive timeout_s for nodes still running (poll until messages appear). + """ + expected_patterns = [ + re.compile(r"Fetching snapshot from .* \(attempt 1/3\)"), + re.compile(r"Fetching snapshot from .* \(attempt 2/3\)"), + re.compile(r"Fetching snapshot from .* \(attempt 3/3\)"), + re.compile(r"Exceeded maximum snapshot fetch retries \([0-9]+\), giving up"), + ] + end_time = time.time() + timeout_s + remaining = list(expected_patterns) + while True: + out_path, _ = node.get_logs() + with open(out_path, "r", encoding="utf-8") as f: + for line in f: + matched = [e for e in remaining if re.search(e, line)] + for m in matched: + remaining.remove(m) + LOG.info(f"Found expected log message: {line.rstrip()}") + if not remaining or time.time() >= end_time: + break + time.sleep(0.5) + assert not remaining, f"Did not find expected log messages: {remaining}" + + +def test_join_time_snapshot_fetch_failure(network, args): + # Test that the join-time FetchSnapshot task produces the expected retry + # and "giving up" log messages when the snapshot endpoint is unreachable. + # + # Strategy: + # 1. Add an intermediate node joined from a snapshot so its startup_seqno + # > 0. That node will act as the join target. + # 2. Join a fresh node (no snapshot, startup_seqno = 0) targeting the + # intermediate node via primary_rpc_interface. Because the + # intermediate node's startup_seqno > 0, it returns StartupSeqnoIsOld + # for the fresh joiner, triggering the FetchSnapshot task. + # 3. primary_rpc_interface lacks the SnapshotRead operator feature, so + # GET /node/snapshot returns HTTP 404, exhausting the 3-attempt retry + # loop and logging "Exceeded maximum snapshot fetch retries". + primary, _ = network.find_primary() - new_node = network.create_node() + # Ensure at least one committed snapshot exists so that a joining node + # can be given one (startup_seqno > 0). + network.txs.issue(network, number_txs=args.snapshot_tx_interval * 2) + network.get_committed_snapshots(primary) + + # Add an intermediate node that starts from that snapshot. + intermediate_node = network.create_node() + network.join_node(intermediate_node, args.package, args, target_node=primary) + network.trust_node(intermediate_node, args) + intermediate_node.wait_for_node_to_join(timeout=10) + + # Now join a fresh node (no snapshot) targeting the intermediate node via + # primary_rpc_interface. StartupSeqnoIsOld is expected, so use a short + # timeout and catch the exception. + # Timeout accounts for: 3 fetch attempts × 1s retry_interval = ~3s, plus + # join retry overhead and test environment scheduling headroom. + failing_node = network.create_node() + try: + network.join_node( + failing_node, + args.package, + args, + target_node=intermediate_node, + from_snapshot=False, + join_target_interface_name=infra.interfaces.PRIMARY_RPC_INTERFACE, + timeout=15, + ) + except infra.network.StartupSeqnoIsOld: + pass # expected: FetchSnapshot exhausts retries and node cannot join - # Shut down primary to cause snapshot fetch to fail - primary.remote.stop() + _assert_snapshot_fetch_failure_messages(failing_node) - failed = False - try: - LOG.info("Starting join") - network.join_node( - new_node, - args.package, - args, - target_node=primary, - timeout=10, - from_snapshot=False, - wait_for_node_in_store=False, - ) - new_node.wait_for_node_to_join(timeout=5) - except Exception as e: - LOG.info(f"Joining node could not join as expected {e}") - failed = True - assert failed, "Joining node could not join failed node as expected" - - expected_log_messages = [ - re.compile(r"Fetching snapshot from .* \(attempt 1/3\)"), - re.compile(r"Fetching snapshot from .* \(attempt 2/3\)"), - re.compile(r"Fetching snapshot from .* \(attempt 3/3\)"), - re.compile( - r"Exceeded maximum snapshot fetch retries \([0-9]+\), giving up" - ), - ] +def test_error_message_on_failure_to_fetch_snapshot(network, args): + # Add a new backup node pointed at the primary_rpc_interface for snapshot + # fetching. That interface does NOT expose the SnapshotRead operator + # feature, so every fetch request returns HTTP 404, reliably driving the + # BackupSnapshotFetch retry loop to exhaustion. + primary, _ = network.find_primary() + new_node = network.create_node() + network.join_node( + new_node, + args.package, + args, + target_node=primary, + timeout=5, + backup_snapshot_fetch_enabled=True, + backup_snapshot_fetch_target_rpc_interface=infra.interfaces.PRIMARY_RPC_INTERFACE, + ) + network.trust_node(new_node, args) + new_node.wait_for_node_to_join(timeout=5) - out_path, _ = new_node.get_logs() - for line in open(out_path, "r", encoding="utf-8").readlines(): - for expected in expected_log_messages: - match = re.search(expected, line) - if match: - expected_log_messages.remove(expected) - LOG.info(f"Found expected log message: {line}") - if len(expected_log_messages) == 0: - break + # Issue enough transactions to trigger a new snapshot on the primary. + # The snapshot_evidence hook on new_node then schedules BackupSnapshotFetch, + # which exhausts its 3 attempts (all HTTP 404) and logs "giving up". + network.txs.issue(network, number_txs=args.snapshot_tx_interval * 2) + network.get_committed_snapshots(primary) - assert ( - len(expected_log_messages) == 0 - ), f"Did not find expected log messages: {expected_log_messages}" + _assert_snapshot_fetch_failure_messages(new_node, timeout_s=30) def test_backup_snapshot_fetch(network, args): @@ -2997,6 +3042,8 @@ def run_backup_snapshot_download(const_args): network.start_and_open(args, backup_snapshot_fetch_enabled=True) test_backup_snapshot_fetch(network, args) test_backup_snapshot_fetch_max_size(network, args) + test_join_time_snapshot_fetch_failure(network, args) + test_error_message_on_failure_to_fetch_snapshot(network, args) def run_propose_request_vote(const_args): diff --git a/tests/infra/network.py b/tests/infra/network.py index 7023c12302e..7ee7a3b3e7a 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -347,6 +347,7 @@ def _setup_node( read_only_ledger_dirs=None, from_snapshot=True, snapshots_dir=None, + join_target_interface_name=None, **kwargs, ): # Contact primary if no target node is set @@ -400,7 +401,8 @@ def _setup_node( label=args.label, common_dir=self.common_dir, target_rpc_address=target_node.get_public_rpc_address( - interface_name=infra.interfaces.FILE_SERVING_RPC_INTERFACE + interface_name=join_target_interface_name + or infra.interfaces.FILE_SERVING_RPC_INTERFACE ), snapshots_dir=snapshots_dir, read_only_snapshots_dir=read_only_snapshots_dir,