Skip to content

Commit 528765e

Browse files
cjen1-msftachamayou
andcommitted
Retries for fetching snapshots (microsoft#7317)
Co-authored-by: Amaury Chamayou <amaury@xargs.fr>
1 parent 0a376d4 commit 528765e

7 files changed

Lines changed: 126 additions & 8 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1212
### Added
1313

1414
- Improved logging of snapshot digests (#7300)
15+
- Node will now retry when fetching snapshots. This is controlled with `command.join.fetch_snapshot_max_attempts` and `command.join.fetch_snapshot_retry_interval`. (#7317)
1516

1617
[6.0.14]: https://github.com/microsoft/CCF/releases/tag/ccf-6.0.14
1718

doc/host_config_schema/cchost_config.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,17 @@
406406
"type": "boolean",
407407
"default": true,
408408
"description": "Whether to ask the target for a newer snapshot before joining. The node will ask the target what their latest snapshot is, and if that is later than what the node has locally, will fetch it via RPC before launching. Should generally only be turned off for specific test cases"
409+
},
410+
"fetch_snapshot_max_attempts": {
411+
"type": "integer",
412+
"default": 3,
413+
"description": "Maximum number of attempts to fetch a recent snapshot from the target node",
414+
"minimum": 1
415+
},
416+
"fetch_snapshot_retry_interval": {
417+
"type": "string",
418+
"default": "1000ms",
419+
"description": "Interval (time string) between retries to fetch a recent snapshot from the target node"
409420
}
410421
},
411422
"required": ["target_rpc_address"],

src/host/configuration.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ namespace host
150150
ccf::ds::TimeString retry_timeout = {"1000ms"};
151151
bool follow_redirect = true;
152152
bool fetch_recent_snapshot = true;
153+
size_t fetch_snapshot_max_attempts = 3;
154+
ccf::ds::TimeString fetch_snapshot_retry_interval = {"1000ms"};
153155

154156
bool operator==(const Join&) const = default;
155157
};
@@ -212,7 +214,9 @@ namespace host
212214
CCHostConfig::Command::Join,
213215
retry_timeout,
214216
follow_redirect,
215-
fetch_recent_snapshot);
217+
fetch_recent_snapshot,
218+
fetch_snapshot_max_attempts,
219+
fetch_snapshot_retry_interval);
216220

217221
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Recover);
218222
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Recover);

src/host/main.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -878,7 +878,9 @@ int main(int argc, char** argv) // NOLINT(bugprone-exception-escape)
878878
auto latest_peer_snapshot = snapshots::fetch_from_peer(
879879
config.command.join.target_rpc_address,
880880
config.command.service_certificate_file,
881-
latest_local_idx);
881+
latest_local_idx,
882+
config.command.join.fetch_snapshot_max_attempts,
883+
config.command.join.fetch_snapshot_retry_interval.count_ms());
882884

883885
if (latest_peer_snapshot.has_value())
884886
{

src/snapshots/fetch.h

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ namespace snapshots
4040
std::vector<uint8_t> snapshot_data;
4141
};
4242

43-
static std::optional<SnapshotResponse> fetch_from_peer(
43+
static std::optional<SnapshotResponse> try_fetch_from_peer(
4444
const std::string& peer_address,
4545
const std::string& path_to_peer_cert,
4646
size_t latest_local_snapshot)
@@ -289,4 +289,36 @@ namespace snapshots
289289
return std::nullopt;
290290
}
291291
}
292+
293+
static std::optional<SnapshotResponse> fetch_from_peer(
294+
const std::string& peer_address,
295+
const std::string& path_to_peer_cert,
296+
size_t latest_local_snapshot,
297+
size_t max_attempts,
298+
size_t retry_delay_ms)
299+
{
300+
for (size_t attempt = 0; attempt < max_attempts; ++attempt)
301+
{
302+
LOG_INFO_FMT(
303+
"Fetching snapshot from {} (attempt {}/{})",
304+
peer_address,
305+
attempt + 1,
306+
max_attempts);
307+
308+
if (attempt > 0)
309+
{
310+
std::this_thread::sleep_for(std::chrono::milliseconds(retry_delay_ms));
311+
}
312+
313+
auto response = try_fetch_from_peer(
314+
peer_address, path_to_peer_cert, latest_local_snapshot);
315+
if (response.has_value())
316+
{
317+
return response;
318+
}
319+
}
320+
LOG_INFO_FMT(
321+
"Exceeded maximum snapshot fetch retries ({}), giving up", max_attempts);
322+
return std::nullopt;
323+
}
292324
}

tests/e2e_operations.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import sys
3232
import pathlib
3333
import infra.concurrency
34+
import ccf.read_ledger
35+
import re
3436

3537
from loguru import logger as LOG
3638

@@ -1553,6 +1555,67 @@ def test_error_message_on_failure_to_read_aci_sec_context(args):
15531555
), f"Did not find expected log messages: {expected_log_messages}"
15541556

15551557

1558+
def test_error_message_on_failure_to_fetch_snapshot(const_args):
1559+
args = copy.deepcopy(const_args)
1560+
args.nodes = infra.e2e_args.min_nodes(args, 0)
1561+
with infra.network.network(
1562+
args.nodes,
1563+
args.binary_dir,
1564+
args.debug_nodes,
1565+
pdb=args.pdb,
1566+
) as network:
1567+
network.start_and_open(args)
1568+
1569+
primary, _ = network.find_primary()
1570+
1571+
new_node = network.create_node("local://localhost")
1572+
1573+
# Shut down primary to cause snapshot fetch to fail
1574+
primary.remote.stop()
1575+
1576+
failed = False
1577+
try:
1578+
LOG.info("Starting join")
1579+
network.join_node(
1580+
new_node,
1581+
args.package,
1582+
args,
1583+
target_node=primary,
1584+
timeout=10,
1585+
from_snapshot=False,
1586+
wait_for_node_in_store=False,
1587+
)
1588+
new_node.wait_for_node_to_join(timeout=5)
1589+
except Exception as e:
1590+
LOG.info(f"Joining node could not join as expected {e}")
1591+
failed = True
1592+
1593+
assert failed, "Joining node could not join failed node as expected"
1594+
1595+
expected_log_messages = [
1596+
re.compile(r"Fetching snapshot from .* \(attempt 1/3\)"),
1597+
re.compile(r"Fetching snapshot from .* \(attempt 2/3\)"),
1598+
re.compile(r"Fetching snapshot from .* \(attempt 3/3\)"),
1599+
re.compile(
1600+
r"Exceeded maximum snapshot fetch retries \([0-9]+\), giving up"
1601+
),
1602+
]
1603+
1604+
out_path, _ = new_node.get_logs()
1605+
for line in open(out_path, "r", encoding="utf-8").readlines():
1606+
for expected in expected_log_messages:
1607+
match = re.search(expected, line)
1608+
if match:
1609+
expected_log_messages.remove(expected)
1610+
LOG.info(f"Found expected log message: {line}")
1611+
if len(expected_log_messages) == 0:
1612+
break
1613+
1614+
assert (
1615+
len(expected_log_messages) == 0
1616+
), f"Did not find expected log messages: {expected_log_messages}"
1617+
1618+
15561619
def run(args):
15571620
run_max_uncommitted_tx_count(args)
15581621
run_file_operations(args)

tests/infra/network.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -354,10 +354,11 @@ def _setup_node(
354354
**kwargs,
355355
):
356356
# Contact primary if no target node is set
357-
primary, _ = self.find_primary(
358-
timeout=args.ledger_recovery_timeout if recovery else 10
359-
)
360-
target_node = target_node or primary
357+
if target_node is None:
358+
primary, _ = self.find_primary(
359+
timeout=args.ledger_recovery_timeout if recovery else 10
360+
)
361+
target_node = primary
361362
LOG.info(f"Joining from target node {target_node.local_node_id}")
362363

363364
committed_ledger_dirs = read_only_ledger_dirs or []
@@ -369,6 +370,9 @@ def _setup_node(
369370
if from_snapshot:
370371
# Only retrieve snapshot from primary if the snapshot directory is not specified
371372
if snapshots_dir is None:
373+
primary, _ = self.find_primary(
374+
timeout=args.ledger_recovery_timeout if recovery else 10
375+
)
372376
read_only_snapshots_dir = self.get_committed_snapshots(primary)
373377
if os.listdir(snapshots_dir) or os.listdir(read_only_snapshots_dir):
374378
LOG.info(
@@ -1040,10 +1044,11 @@ def join_node(
10401044
target_node=None,
10411045
timeout=JOIN_TIMEOUT,
10421046
stop_on_error=False,
1047+
wait_for_node_in_store=True,
10431048
**kwargs,
10441049
):
10451050
self.setup_join_node(node, lib_name, args, target_node, **kwargs)
1046-
self.run_join_node(node, timeout, stop_on_error)
1051+
self.run_join_node(node, timeout, stop_on_error, wait_for_node_in_store)
10471052

10481053
def trust_node(
10491054
self,

0 commit comments

Comments
 (0)