Skip to content

Commit de0f4e8

Browse files
committed
feat: handle no addresses for dapi client too
1 parent 1da8ea2 commit de0f4e8

7 files changed

Lines changed: 407 additions & 204 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/rs-dapi-client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ offline-testing = []
2525
backon = { version = "1.3", default-features = false, features = [
2626
"tokio-sleep",
2727
] }
28+
tokio = { version = "1.40", features = ["time"] }
2829

2930
[target.'cfg(target_arch = "wasm32")'.dependencies]
3031
gloo-timers = { version = "0.3.0", features = ["futures"] }

packages/rs-dapi-client/src/dapi_client.rs

Lines changed: 151 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
//! [DapiClient] definition.
22
3-
use backon::{ConstantBuilder, Retryable};
43
use dapi_grpc::mock::Mockable;
54
use dapi_grpc::tonic::async_trait;
65
#[cfg(not(target_arch = "wasm32"))]
76
use dapi_grpc::tonic::transport::Certificate;
87
use std::fmt::{Debug, Display};
9-
use std::sync::atomic::AtomicUsize;
10-
use std::sync::Arc;
118
use std::time::Duration;
129
use tracing::Instrument;
1310

@@ -34,6 +31,13 @@ pub enum DapiClientError {
3431
/// There are no valid DAPI addresses to use.
3532
#[error("no available addresses to use")]
3633
NoAvailableAddresses,
34+
/// All available addresses have been exhausted (banned due to errors).
35+
/// Contains the last meaningful error that caused addresses to be banned.
36+
#[error("no available addresses to retry, last error: {0}")]
37+
NoAvailableAddressesToRetry(
38+
#[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))]
39+
Box<TransportError>,
40+
),
3741
/// [AddressListError] errors
3842
#[error("address list error: {0}")]
3943
AddressList(AddressListError),
@@ -49,6 +53,7 @@ impl CanRetry for DapiClientError {
4953
use DapiClientError::*;
5054
match self {
5155
NoAvailableAddresses => false,
56+
NoAvailableAddressesToRetry(_) => false,
5257
Transport(transport_error) => transport_error.can_retry(),
5358
AddressList(_) => false,
5459
#[cfg(feature = "mocks")]
@@ -57,7 +62,11 @@ impl CanRetry for DapiClientError {
5762
}
5863

5964
fn is_no_available_addresses(&self) -> bool {
60-
matches!(self, DapiClientError::NoAvailableAddresses)
65+
matches!(
66+
self,
67+
DapiClientError::NoAvailableAddresses
68+
| DapiClientError::NoAvailableAddressesToRetry(_)
69+
)
6170
}
6271
}
6372

@@ -193,6 +202,7 @@ pub fn update_address_ban_status<R, E>(
193202
#[async_trait]
194203
impl DapiRequestExecutor for DapiClient {
195204
/// Execute the [DapiRequest](crate::DapiRequest).
205+
#[tracing::instrument(name = "request_routine", skip_all)]
196206
async fn execute<R>(
197207
&self,
198208
request: R,
@@ -212,136 +222,164 @@ impl DapiRequestExecutor for DapiClient {
212222
#[cfg(not(target_arch = "wasm32"))]
213223
let applied_settings = applied_settings.with_ca_certificate(self.ca_certificate.clone());
214224

215-
// Setup retry policy:
216-
let retry_settings = ConstantBuilder::default()
217-
.with_max_times(applied_settings.retries)
218-
.with_delay(Duration::from_millis(10));
219-
220-
// Save dump dir for later use, as self is moved into routine
225+
// Save dump dir for later use
221226
#[cfg(feature = "dump")]
222227
let dump_dir = self.dump_dir.clone();
223228
#[cfg(feature = "dump")]
224229
let dump_request = request.clone();
225230

226-
let retries_counter_arc = Arc::new(AtomicUsize::new(0));
227-
let retries_counter_arc_ref = &retries_counter_arc;
228-
229-
// We need reference so that the closure is FnMut
230-
let applied_settings_ref = &applied_settings;
231-
232-
// Setup DAPI request execution routine future. It's a closure that will be called
233-
// more once to build new future on each retry.
234-
let routine = move || {
235-
let retries_counter = Arc::clone(retries_counter_arc_ref);
236-
237-
// Try to get an address to initialize transport on:
238-
let address_result = self
239-
.address_list
240-
.get_live_address()
241-
.ok_or(DapiClientError::NoAvailableAddresses);
242-
243-
let _span = tracing::trace_span!(
244-
"execute request",
245-
address = ?address_result,
246-
settings = ?applied_settings_ref,
247-
method = request.method_name(),
248-
)
249-
.entered();
250-
251-
tracing::trace!(
252-
?request,
253-
"calling {} with {} request",
254-
request.method_name(),
255-
request.request_name(),
256-
);
257-
258-
let transport_request = request.clone();
259-
let response_name = request.response_name();
260-
261-
// Create a future using `async` block that will be returned from the closure on
262-
// each retry. Could be just a request future, but need to unpack client first.
263-
async move {
264-
// It stays wrapped in `Result` since we want to return
265-
// `impl Future<Output = Result<...>`, not a `Result` itself.
266-
let address = address_result.map_err(|inner| ExecutionError {
267-
inner,
268-
retries: retries_counter.load(std::sync::atomic::Ordering::Relaxed),
269-
address: None,
270-
})?;
271-
272-
let pool = self.pool.clone();
273-
274-
let mut transport_client = R::Client::with_uri_and_settings(
275-
address.uri().clone(),
276-
applied_settings_ref,
277-
&pool,
278-
)
279-
.map_err(|error| ExecutionError {
280-
inner: DapiClientError::Transport(error),
281-
retries: retries_counter.load(std::sync::atomic::Ordering::Relaxed),
282-
address: Some(address.clone()),
283-
})?;
231+
let max_retries = applied_settings.retries;
232+
let retry_delay = Duration::from_millis(10);
284233

285-
let result = transport_request
286-
.execute_transport(&mut transport_client, applied_settings_ref)
287-
.await
288-
.map_err(DapiClientError::Transport);
234+
let mut retries: usize = 0;
235+
// Track the last transport error for when all addresses get exhausted
236+
let mut last_transport_error: Option<TransportError> = None;
237+
238+
let result: ExecutionResult<R::Response, DapiClientError> = async {
239+
loop {
240+
// Try to get an address to initialize transport on:
241+
let Some(address) = self.address_list.get_live_address() else {
242+
// No available addresses - wrap with last meaningful error if we have one
243+
let error = if let Some(transport_error) = last_transport_error.take() {
244+
tracing::debug!(
245+
"no addresses available, returning last transport error"
246+
);
247+
DapiClientError::NoAvailableAddressesToRetry(Box::new(
248+
transport_error,
249+
))
250+
} else {
251+
DapiClientError::NoAvailableAddresses
252+
};
253+
254+
return Err(ExecutionError {
255+
inner: error,
256+
retries,
257+
address: None,
258+
});
259+
};
289260

290-
let retries = retries_counter.load(std::sync::atomic::Ordering::Relaxed);
261+
tracing::trace!(
262+
?request,
263+
"calling {} with {} request",
264+
request.method_name(),
265+
request.request_name(),
266+
);
267+
268+
let transport_request = request.clone();
269+
let response_name = request.response_name();
270+
271+
// Try to create transport client
272+
let transport_client_result = R::Client::with_uri_and_settings(
273+
address.uri().clone(),
274+
&applied_settings,
275+
&self.pool,
276+
);
277+
278+
let mut transport_client = match transport_client_result {
279+
Ok(client) => client,
280+
Err(transport_error) => {
281+
let can_retry_error = transport_error.can_retry();
291282

292-
let execution_result = result
293-
.map(|inner| {
294-
tracing::trace!(response = ?inner, "received {} response", response_name);
283+
// Clone error before moving it
284+
let cloned_error = transport_error.clone();
295285

296-
ExecutionResponse {
297-
inner,
286+
let execution_error = ExecutionError {
287+
inner: DapiClientError::Transport(transport_error),
298288
retries,
299-
address: address.clone(),
289+
address: Some(address.clone()),
290+
};
291+
292+
// Update ban status
293+
let error_result: ExecutionResult<R::Response, DapiClientError> =
294+
Err(execution_error);
295+
update_address_ban_status::<R::Response, DapiClientError>(
296+
&self.address_list,
297+
&error_result,
298+
&applied_settings,
299+
);
300+
301+
// Unwrap the error back
302+
let execution_error = error_result.unwrap_err();
303+
304+
if can_retry_error && retries < max_retries {
305+
// Store last transport error
306+
last_transport_error = Some(cloned_error);
307+
308+
retries += 1;
309+
tracing::warn!(
310+
error = ?execution_error,
311+
"retrying error with sleeping {} secs",
312+
retry_delay.as_secs_f32()
313+
);
314+
transport::sleep(retry_delay).await;
315+
continue;
300316
}
301-
})
302-
.map_err(|inner| {
303-
tracing::debug!(error = ?inner, "received error: {inner}");
304317

305-
ExecutionError {
306-
inner,
318+
return Err(execution_error);
319+
}
320+
};
321+
322+
// Execute the transport request
323+
let result = transport_request
324+
.execute_transport(&mut transport_client, &applied_settings)
325+
.instrument(tracing::trace_span!(
326+
"execute_request",
327+
?address,
328+
settings = ?applied_settings,
329+
method = request.method_name(),
330+
))
331+
.await;
332+
333+
let execution_result = match result {
334+
Ok(response) => {
335+
tracing::trace!(response = ?response, "received {} response", response_name);
336+
Ok(ExecutionResponse {
337+
inner: response,
338+
retries,
339+
address: address.clone(),
340+
})
341+
}
342+
Err(transport_error) => {
343+
tracing::debug!(error = ?transport_error, "received error: {transport_error}");
344+
Err(ExecutionError {
345+
inner: DapiClientError::Transport(transport_error),
307346
retries,
308347
address: Some(address.clone()),
309-
}
310-
});
348+
})
349+
}
350+
};
311351

312352
update_address_ban_status::<R::Response, DapiClientError>(
313353
&self.address_list,
314354
&execution_result,
315-
applied_settings_ref,
355+
&applied_settings,
316356
);
317357

318-
execution_result
358+
match execution_result {
359+
Ok(response) => return Ok(response),
360+
Err(error) => {
361+
if error.can_retry() && retries < max_retries {
362+
// Store last transport error
363+
if let DapiClientError::Transport(ref te) = error.inner {
364+
last_transport_error = Some(te.clone());
365+
}
366+
367+
retries += 1;
368+
tracing::warn!(
369+
?error,
370+
"retrying error with sleeping {} secs",
371+
retry_delay.as_secs_f32()
372+
);
373+
transport::sleep(retry_delay).await;
374+
continue;
375+
}
376+
377+
return Err(error);
378+
}
379+
}
319380
}
320-
};
321-
322-
let sleeper = transport::BackonSleeper::default();
323-
324-
// Start the routine with retry policy applied:
325-
// We allow let_and_return because `result` is used later if dump feature is enabled
326-
let result: Result<
327-
ExecutionResponse<<R as TransportRequest>::Response>,
328-
ExecutionError<DapiClientError>,
329-
> = routine
330-
.retry(retry_settings)
331-
.sleep(sleeper)
332-
.notify(|error, duration| {
333-
let retries_counter = Arc::clone(&retries_counter_arc);
334-
retries_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
335-
336-
tracing::warn!(
337-
?error,
338-
"retrying error with sleeping {} secs",
339-
duration.as_secs_f32()
340-
);
341-
})
342-
.when(|e| e.can_retry())
343-
.instrument(tracing::info_span!("request routine"))
344-
.await;
381+
}
382+
.await;
345383

346384
if let Err(error) = &result {
347385
if !error.can_retry() {

0 commit comments

Comments
 (0)