Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 99 additions & 52 deletions tests/e2e_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2796,65 +2796,110 @@ def run_error_message_on_failure_to_read_aci_sec_context(args):
), f"Did not find expected log messages: {expected_log_messages}"


def run_error_message_on_failure_to_fetch_snapshot(const_args):
args = copy.deepcopy(const_args)
args.nodes = infra.e2e_args.min_nodes(args, 0)
with infra.network.network(
args.nodes,
args.binary_dir,
args.debug_nodes,
pdb=args.pdb,
) as network:
network.start_and_open(args)
def _assert_snapshot_fetch_failure_messages(node, timeout_s=0):
"""Assert that the expected snapshot fetch failure messages appear in node's logs.

primary, _ = network.find_primary()
Polls the node's logs until all expected messages are found or timeout_s elapses.
Use the default timeout_s=0 for nodes that have already stopped (single pass).
Pass a positive timeout_s for nodes still running (poll until messages appear).
"""
expected_patterns = [
re.compile(r"Fetching snapshot from .* \(attempt 1/3\)"),
re.compile(r"Fetching snapshot from .* \(attempt 2/3\)"),
re.compile(r"Fetching snapshot from .* \(attempt 3/3\)"),
re.compile(r"Exceeded maximum snapshot fetch retries \([0-9]+\), giving up"),
]
end_time = time.time() + timeout_s
remaining = list(expected_patterns)
while True:
out_path, _ = node.get_logs()
with open(out_path, "r", encoding="utf-8") as f:
for line in f:
matched = [e for e in remaining if re.search(e, line)]
for m in matched:
remaining.remove(m)
LOG.info(f"Found expected log message: {line.rstrip()}")
if not remaining or time.time() >= end_time:
break
time.sleep(0.5)
assert not remaining, f"Did not find expected log messages: {remaining}"


def test_join_time_snapshot_fetch_failure(network, args):
# Test that the join-time FetchSnapshot task produces the expected retry
# and "giving up" log messages when the snapshot endpoint is unreachable.
#
# Strategy:
# 1. Add an intermediate node joined from a snapshot so its startup_seqno
# > 0. That node will act as the join target.
# 2. Join a fresh node (no snapshot, startup_seqno = 0) targeting the
# intermediate node via primary_rpc_interface. Because the
# intermediate node's startup_seqno > 0, it returns StartupSeqnoIsOld
# for the fresh joiner, triggering the FetchSnapshot task.
# 3. primary_rpc_interface lacks the SnapshotRead operator feature, so
# GET /node/snapshot returns HTTP 404, exhausting the 3-attempt retry
# loop and logging "Exceeded maximum snapshot fetch retries".
primary, _ = network.find_primary()

new_node = network.create_node()
# Ensure at least one committed snapshot exists so that a joining node
# can be given one (startup_seqno > 0).
network.txs.issue(network, number_txs=args.snapshot_tx_interval * 2)
network.get_committed_snapshots(primary)

# Add an intermediate node that starts from that snapshot.
intermediate_node = network.create_node()
network.join_node(intermediate_node, args.package, args, target_node=primary)
network.trust_node(intermediate_node, args)
intermediate_node.wait_for_node_to_join(timeout=10)

# Now join a fresh node (no snapshot) targeting the intermediate node via
# primary_rpc_interface. StartupSeqnoIsOld is expected, so use a short
# timeout and catch the exception.
# Timeout accounts for: 3 fetch attempts × 1s retry_interval = ~3s, plus
# join retry overhead and test environment scheduling headroom.
failing_node = network.create_node()
try:
network.join_node(
failing_node,
args.package,
args,
target_node=intermediate_node,
from_snapshot=False,
join_target_interface_name=infra.interfaces.PRIMARY_RPC_INTERFACE,
timeout=15,
)
except infra.network.StartupSeqnoIsOld:
pass # expected: FetchSnapshot exhausts retries and node cannot join

# Shut down primary to cause snapshot fetch to fail
primary.remote.stop()
_assert_snapshot_fetch_failure_messages(failing_node)

failed = False
try:
LOG.info("Starting join")
network.join_node(
new_node,
args.package,
args,
target_node=primary,
timeout=10,
from_snapshot=False,
wait_for_node_in_store=False,
)
new_node.wait_for_node_to_join(timeout=5)
except Exception as e:
LOG.info(f"Joining node could not join as expected {e}")
failed = True

assert failed, "Joining node could not join failed node as expected"

expected_log_messages = [
re.compile(r"Fetching snapshot from .* \(attempt 1/3\)"),
re.compile(r"Fetching snapshot from .* \(attempt 2/3\)"),
re.compile(r"Fetching snapshot from .* \(attempt 3/3\)"),
re.compile(
r"Exceeded maximum snapshot fetch retries \([0-9]+\), giving up"
),
]
def test_error_message_on_failure_to_fetch_snapshot(network, args):
# Add a new backup node pointed at the primary_rpc_interface for snapshot
# fetching. That interface does NOT expose the SnapshotRead operator
# feature, so every fetch request returns HTTP 404, reliably driving the
# BackupSnapshotFetch retry loop to exhaustion.
primary, _ = network.find_primary()
new_node = network.create_node()
network.join_node(
new_node,
args.package,
args,
target_node=primary,
timeout=5,
backup_snapshot_fetch_enabled=True,
backup_snapshot_fetch_target_rpc_interface=infra.interfaces.PRIMARY_RPC_INTERFACE,
)
network.trust_node(new_node, args)
new_node.wait_for_node_to_join(timeout=5)

out_path, _ = new_node.get_logs()
for line in open(out_path, "r", encoding="utf-8").readlines():
for expected in expected_log_messages:
match = re.search(expected, line)
if match:
expected_log_messages.remove(expected)
LOG.info(f"Found expected log message: {line}")
if len(expected_log_messages) == 0:
break
# Issue enough transactions to trigger a new snapshot on the primary.
# The snapshot_evidence hook on new_node then schedules BackupSnapshotFetch,
# which exhausts its 3 attempts (all HTTP 404) and logs "giving up".
network.txs.issue(network, number_txs=args.snapshot_tx_interval * 2)
network.get_committed_snapshots(primary)

assert (
len(expected_log_messages) == 0
), f"Did not find expected log messages: {expected_log_messages}"
_assert_snapshot_fetch_failure_messages(new_node, timeout_s=30)


def test_backup_snapshot_fetch(network, args):
Expand Down Expand Up @@ -2997,6 +3042,8 @@ def run_backup_snapshot_download(const_args):
network.start_and_open(args, backup_snapshot_fetch_enabled=True)
test_backup_snapshot_fetch(network, args)
test_backup_snapshot_fetch_max_size(network, args)
test_join_time_snapshot_fetch_failure(network, args)
test_error_message_on_failure_to_fetch_snapshot(network, args)


def run_propose_request_vote(const_args):
Expand Down
4 changes: 3 additions & 1 deletion tests/infra/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ def _setup_node(
read_only_ledger_dirs=None,
from_snapshot=True,
snapshots_dir=None,
join_target_interface_name=None,
**kwargs,
):
# Contact primary if no target node is set
Expand Down Expand Up @@ -400,7 +401,8 @@ def _setup_node(
label=args.label,
common_dir=self.common_dir,
target_rpc_address=target_node.get_public_rpc_address(
interface_name=infra.interfaces.FILE_SERVING_RPC_INTERFACE
interface_name=join_target_interface_name
or infra.interfaces.FILE_SERVING_RPC_INTERFACE
),
snapshots_dir=snapshots_dir,
read_only_snapshots_dir=read_only_snapshots_dir,
Expand Down