Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions ethexe/cli/src/params/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use anyhow::{Context, Result};
use clap::Parser;
use ethexe_common::Address;
use ethexe_network::{
NetworkConfig,
DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE, NetworkConfig,
export::{Multiaddr, Protocol},
};
use gsigner::secp256k1::Signer;
use serde::Deserialize;
use std::path::PathBuf;
use std::{num::NonZeroU32, path::PathBuf};

/// Parameters for the networking service to start.
#[derive(Clone, Debug, Deserialize, Parser)]
Expand Down Expand Up @@ -61,6 +61,11 @@ pub struct NetworkParams {
#[arg(long, alias = "no-net")]
#[serde(default, rename = "no-network", alias = "no-net")]
pub no_network: bool,

/// Maximum chain length allowed in announces responses.
#[arg(long, alias = "net-max-chain-len-for-announces-response")]
#[serde(rename = "max-chain-len-for-announces-response")]
pub max_chain_len_for_announces_response: Option<NonZeroU32>,
}

impl NetworkParams {
Expand Down Expand Up @@ -137,6 +142,9 @@ impl NetworkParams {
listen_addresses,
transport_type: Default::default(),
allow_non_global_addresses: is_dev,
max_chain_len_for_announces_response: self
.max_chain_len_for_announces_response
.unwrap_or(DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE),
}))
}
}
Expand All @@ -150,6 +158,9 @@ impl MergeParams for NetworkParams {
network_listen_addr: self.network_listen_addr.or(with.network_listen_addr),
network_port: self.network_port.or(with.network_port),
no_network: self.no_network || with.no_network,
max_chain_len_for_announces_response: self
.max_chain_len_for_announces_response
.or(with.max_chain_len_for_announces_response),
}
}
}
6 changes: 5 additions & 1 deletion ethexe/network/src/db_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
mod requests;
mod responses;

use crate::{db_sync::requests::OngoingRequests, utils::AlternateCollectionFmt};
pub(crate) use crate::{
DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE,
db_sync::{requests::RetriableRequest, responses::OngoingResponses},
export::{Multiaddr, PeerId},
peer_score,
utils::ParityScaleCodec,
};
use crate::{db_sync::requests::OngoingRequests, utils::AlternateCollectionFmt};
use async_trait::async_trait;
use ethexe_common::{
Announce,
Expand All @@ -52,6 +53,7 @@ use libp2p::{
use parity_scale_codec::{Decode, Encode};
use std::{
collections::{BTreeMap, BTreeSet},
num::NonZeroU32,
pin::Pin,
sync::atomic::{AtomicU64, Ordering},
task::{Context, Poll},
Expand Down Expand Up @@ -153,6 +155,7 @@ pub(crate) struct Config {
pub max_rounds_per_request: u32,
pub request_timeout: Duration,
pub max_simultaneous_responses: u32,
pub max_chain_len_for_announces_response: NonZeroU32,
}

impl Default for Config {
Expand All @@ -161,6 +164,7 @@ impl Default for Config {
max_rounds_per_request: 10,
request_timeout: Duration::from_secs(100),
max_simultaneous_responses: 10,
max_chain_len_for_announces_response: DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE,
}
}
}
Expand Down
117 changes: 89 additions & 28 deletions ethexe/network/src/db_sync/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ use std::{
use thiserror::Error;
use tokio::task::JoinSet;

/// Maximum length of the chain for announces responses to prevent abuse
const MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE: NonZeroU32 = NonZeroU32::new(1000).unwrap();

struct OngoingResponse {
response_id: ResponseId,
peer_id: PeerId,
Expand All @@ -52,6 +49,7 @@ pub(crate) struct OngoingResponses {
db: Box<dyn DbSyncDatabase>,
db_readers: JoinSet<OngoingResponse>,
max_simultaneous_responses: u32,
max_chain_len_for_announces_response: NonZeroU32,
}

impl OngoingResponses {
Expand All @@ -61,6 +59,7 @@ impl OngoingResponses {
db,
db_readers: JoinSet::new(),
max_simultaneous_responses: config.max_simultaneous_responses,
max_chain_len_for_announces_response: config.max_chain_len_for_announces_response,
}
}

Expand All @@ -70,7 +69,11 @@ impl OngoingResponses {
ResponseId(id)
}

fn response_from_db(request: InnerRequest, db: Box<dyn DbSyncDatabase>) -> InnerResponse {
fn response_from_db(
request: InnerRequest,
db: Box<dyn DbSyncDatabase>,
max_chain_len_for_announces_response: NonZeroU32,
) -> InnerResponse {
match request {
InnerRequest::Hashes(request) => InnerHashesResponse(
request
Expand All @@ -95,7 +98,11 @@ impl OngoingResponses {
.into(),
InnerRequest::ValidCodes => db.valid_codes().into(),
InnerRequest::Announces(request) => {
match Self::process_announce_request(&db, request) {
match Self::process_announce_request(
&db,
request,
max_chain_len_for_announces_response,
) {
Ok(response) => response.into(),
Err(e) => {
log::trace!("cannot complete announces request {request:?}: {e}");
Expand All @@ -109,23 +116,27 @@ impl OngoingResponses {
fn process_announce_request<DB: AnnounceStorageRO + GlobalsStorageRO + ConfigStorageRO>(
db: &DB,
request: AnnouncesRequest,
max_chain_len_for_announces_response: NonZeroU32,
) -> Result<InnerAnnouncesResponse, ProcessAnnounceError> {
let AnnouncesRequest { head, until } = request;

// Check the requested chain length first to prevent abuse
if let AnnouncesRequestUntil::ChainLen(len) = until
&& len > MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE
&& len > max_chain_len_for_announces_response
{
// TODO #4874: use peer score to punish the peer for such requests
return Err(ProcessAnnounceError::ChainLenExceedsMax { requested: len });
return Err(ProcessAnnounceError::ChainLenExceedsMax {
requested: len,
max_allowed: max_chain_len_for_announces_response,
});
}

let genesis_announce_hash = db.config().genesis_announce_hash;
let start_announce_hash = db.globals().start_announce_hash;

let mut announces = VecDeque::new();
let mut announce_hash = head;
for _ in 0..MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE.get() {
for _ in 0..max_chain_len_for_announces_response.get() {
match until {
AnnouncesRequestUntil::Tail(tail) if announce_hash == tail => {
return Ok(InnerAnnouncesResponse(announces.into()));
Expand Down Expand Up @@ -161,7 +172,9 @@ impl OngoingResponses {
}

// TODO #4874: use peer score to punish the peer for such requests
Err(ProcessAnnounceError::ReachedMaxChainLength)
Err(ProcessAnnounceError::ReachedMaxChainLength {
max_allowed: max_chain_len_for_announces_response,
})
}

pub(crate) fn handle_response(
Expand All @@ -177,8 +190,10 @@ impl OngoingResponses {
let response_id = self.next_response_id();

let db = self.db.clone_boxed();
let max_chain_len_for_announces_response = self.max_chain_len_for_announces_response;
self.db_readers.spawn_blocking(move || {
let response = Self::response_from_db(request, db);
let response =
Self::response_from_db(request, db, max_chain_len_for_announces_response);
OngoingResponse {
response_id,
peer_id,
Expand Down Expand Up @@ -212,24 +227,25 @@ impl OngoingResponses {

#[derive(Debug, Error, PartialEq, Eq)]
enum ProcessAnnounceError {
#[error(
"requested chain length {requested} exceeds maximum allowed {MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE}"
)]
ChainLenExceedsMax { requested: NonZeroU32 },
#[error("requested chain length {requested} exceeds maximum allowed {max_allowed}")]
ChainLenExceedsMax {
requested: NonZeroU32,
max_allowed: NonZeroU32,
},
#[error("announce {hash} not found in database")]
AnnounceMissing { hash: HashOf<Announce> },
#[error("reached genesis announce {genesis}")]
ReachedGenesis { genesis: HashOf<Announce> },
#[error("reached start announce {start}")]
ReachedStart { start: HashOf<Announce> },
#[error("reached maximum chain length {MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE}")]
ReachedMaxChainLength,
#[error("reached maximum chain length {max_allowed}")]
ReachedMaxChainLength { max_allowed: NonZeroU32 },
}

#[cfg(test)]
mod tests {
use super::*;
use crate::db_sync::requests::ResponseHandler;
use crate::{DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE, db_sync::requests::ResponseHandler};
use ethexe_common::{
Announce, HashOf, ProtocolTimelines,
db::{AnnounceStorageRW, DBConfig, GlobalsStorageRW, SetConfig},
Expand Down Expand Up @@ -260,16 +276,26 @@ mod tests {
let db = Database::memory();
set_db_data(&db, HashOf::zero(), HashOf::zero());

let len = MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE.checked_add(1).unwrap();
let len = DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE
.checked_add(1)
.unwrap();
let request = AnnouncesRequest {
head: HashOf::zero(),
until: AnnouncesRequestUntil::ChainLen(len),
};

let err = OngoingResponses::process_announce_request(&db, request).unwrap_err();
let err = OngoingResponses::process_announce_request(
&db,
request,
DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE,
)
.unwrap_err();
assert_eq!(
err,
ProcessAnnounceError::ChainLenExceedsMax { requested: len }
ProcessAnnounceError::ChainLenExceedsMax {
requested: len,
max_allowed: DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE,
}
);
}

Expand All @@ -284,7 +310,12 @@ mod tests {
until: AnnouncesRequestUntil::Tail(HashOf::zero()),
};

let err = OngoingResponses::process_announce_request(&db, request).unwrap_err();
let err = OngoingResponses::process_announce_request(
&db,
request,
DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE,
)
.unwrap_err();
assert_eq!(err, ProcessAnnounceError::AnnounceMissing { hash: head });
}

Expand All @@ -306,7 +337,12 @@ mod tests {
until: AnnouncesRequestUntil::Tail(HashOf::random()),
};

let err = OngoingResponses::process_announce_request(&db, request).unwrap_err();
let err = OngoingResponses::process_announce_request(
&db,
request,
DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE,
)
.unwrap_err();
assert_eq!(err, ProcessAnnounceError::ReachedGenesis { genesis });
}

Expand All @@ -327,7 +363,12 @@ mod tests {
until: AnnouncesRequestUntil::Tail(HashOf::random()),
};

let err = OngoingResponses::process_announce_request(&db, request).unwrap_err();
let err = OngoingResponses::process_announce_request(
&db,
request,
DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE,
)
.unwrap_err();
assert_eq!(err, ProcessAnnounceError::ReachedStart { start });
}

Expand All @@ -339,7 +380,7 @@ mod tests {
let mut head_hash = parent;
let mut chain_hashes = Vec::new();

for i in 0..MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE.get() {
for i in 0..DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE.get() {
let announce = make_announce(10_000 + i as u64, parent);
let hash = db.set_announce(announce);
chain_hashes.push(hash);
Expand All @@ -362,8 +403,18 @@ mod tests {
until: AnnouncesRequestUntil::Tail(tail),
};

let err = OngoingResponses::process_announce_request(&db, request).unwrap_err();
assert_eq!(err, ProcessAnnounceError::ReachedMaxChainLength);
let err = OngoingResponses::process_announce_request(
&db,
request,
DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE,
)
.unwrap_err();
assert_eq!(
err,
ProcessAnnounceError::ReachedMaxChainLength {
max_allowed: DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE,
}
);
}

#[test]
Expand All @@ -386,7 +437,12 @@ mod tests {
until: AnnouncesRequestUntil::Tail(tail_hash),
};

let response = OngoingResponses::process_announce_request(&db, request).unwrap();
let response = OngoingResponses::process_announce_request(
&db,
request,
DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE,
)
.unwrap();
assert_eq!(response.0, vec![middle, head]);
ResponseHandler::handle_announces(response, request).unwrap_done();
}
Expand All @@ -412,7 +468,12 @@ mod tests {
until: AnnouncesRequestUntil::ChainLen(length),
};

let response = OngoingResponses::process_announce_request(&db, request).unwrap();
let response = OngoingResponses::process_announce_request(
&db,
request,
DEFAULT_MAX_CHAIN_LEN_FOR_ANNOUNCES_RESPONSE,
)
.unwrap();
assert_eq!(response.0, vec![middle, head]);
ResponseHandler::handle_announces(response, request).unwrap_done();
}
Expand Down
Loading
Loading