-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathintegration_tests.rs
More file actions
361 lines (291 loc) · 16 KB
/
integration_tests.rs
File metadata and controls
361 lines (291 loc) · 16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
#![cfg(not(target_arch = "wasm32"))]
mod common;
use common::{
collect_river_node_diagnostics, connect_ws_with_retries, deploy_room_contract,
get_all_room_states, river_states_equal, send_test_message, subscribe_to_contract,
subscribe_to_contract_with_retries, update_room_state_delta, wait_for_update_response,
RoomTestState,
};
use freenet_scaffold::ComposableState;
use freenet_stdlib::prelude::*;
use rand::SeedableRng;
use std::time::Duration;
use testresult::TestResult;
use tracing::{level_filters::LevelFilter, span, Instrument, Level};
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_invitation_message_propagation() -> TestResult {
tracing_subscriber::fmt()
.with_max_level(LevelFilter::INFO)
.init();
let span = span!(Level::INFO, "test_invitation_message_propagation");
async move {
let gw_port = common::get_free_port()?;
let alice_port = common::get_free_port()?;
let bob_port = common::get_free_port()?;
let charlie_port = common::get_free_port()?;
let gw_ws_port = common::get_free_port()?;
let alice_ws_port = common::get_free_port()?;
let bob_ws_port = common::get_free_port()?;
let charlie_ws_port = common::get_free_port()?;
let test_seed = *b"river_invite_test_12345678901234";
let mut test_rng = rand::rngs::StdRng::from_seed(test_seed);
println!("Using deterministic test seed: {test_seed:?}");
println!("Test RNG initial state configured for deterministic network topology");
let (gw_config, _gw_preset) = common::base_node_test_config_with_rng(
true,
vec![],
Some(gw_port),
gw_ws_port,
"river_test_gw_invite",
None,
None,
Some(0.0), // Gateway at 0.0
&mut test_rng,
).await?;
let gw_config_info = common::gw_config_from_path_with_rng(
gw_config.network_api.public_port.unwrap(),
_gw_preset.temp_dir.path(),
&mut test_rng,
)?;
let (alice_config, _alice_preset) = common::base_node_test_config_with_rng(
false,
vec![serde_json::to_string(&gw_config_info)?],
Some(alice_port),
alice_ws_port,
"river_test_alice_invite",
None,
None,
Some(0.01), // Alice at 0.01 (close to gateway)
&mut test_rng,
).await?;
let (bob_config, _bob_preset) = common::base_node_test_config_with_rng(
false,
vec![serde_json::to_string(&gw_config_info)?],
Some(bob_port),
bob_ws_port,
"river_test_bob_invite",
None,
None,
Some(0.02), // Bob at 0.02 (close to Alice)
&mut test_rng,
).await?;
let (charlie_config, _charlie_preset) = common::base_node_test_config_with_rng(
false,
vec![serde_json::to_string(&gw_config_info)?],
Some(charlie_port),
charlie_ws_port,
"river_test_charlie_invite",
None,
None,
Some(0.03), // Charlie at 0.03 (close to Bob)
&mut test_rng,
).await?;
let gateway_node = async {
use freenet::{local_node::NodeConfig, server::serve_gateway};
let config = gw_config.build().await?;
let node = NodeConfig::new(config.clone()).await?;
let gateway_services = serve_gateway(config.ws_api).await?;
let node = node.build(gateway_services).await?;
node.run().await
};
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let alice_node = async {
use freenet::{local_node::NodeConfig, server::serve_gateway};
let config = alice_config.build().await?;
let node = NodeConfig::new(config.clone()).await?;
let alice_services = serve_gateway(config.ws_api).await?;
let node = node.build(alice_services).await?;
node.run().await
};
let bob_node = async {
use freenet::{local_node::NodeConfig, server::serve_gateway};
let config = bob_config.build().await?;
let node = NodeConfig::new(config.clone()).await?;
let bob_services = serve_gateway(config.ws_api).await?;
let node = node.build(bob_services).await?;
node.run().await
};
let charlie_node = async {
use freenet::{local_node::NodeConfig, server::serve_gateway};
let config = charlie_config.build().await?;
let node = NodeConfig::new(config.clone()).await?;
let charlie_services = serve_gateway(config.ws_api).await?;
let node = node.build(charlie_services).await?;
node.run().await
};
let alice_signing_key = ed25519_dalek::SigningKey::from_bytes(&[1u8; 32]);
let alice_verifying_key = alice_signing_key.verifying_key();
let initial_state = RoomTestState::new_test_room();
println!("[CONFIG] Pre-configured Alice owner key: {:?}", alice_verifying_key);
println!("[CONFIG] Pre-configured initial state with {} members", initial_state.room_state.members.members.len());
tokio::time::sleep(std::time::Duration::from_secs(20)).await;
let network_test = tokio::time::timeout(Duration::from_secs(200), async move {
let mut client_node1 = connect_ws_with_retries(alice_ws_port, "Alice", 5).await?;
let mut _bob_client = connect_ws_with_retries(bob_ws_port, "Bob", 5).await?;
let mut _charlie_client = connect_ws_with_retries(charlie_ws_port, "Charlie", 5).await?;
let mut _gateway_client = connect_ws_with_retries(gw_ws_port, "Gateway", 5).await?;
println!("Waiting for P2P connectivity with close topology (4 nodes)...");
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
{
let mut clients_for_diagnostics = vec![&mut client_node1, &mut _bob_client, &mut _charlie_client, &mut _gateway_client];
let node_names = ["Alice", "Bob", "Charlie", "Gateway"];
let _ = collect_river_node_diagnostics(&mut clients_for_diagnostics, &node_names, vec![], "AFTER 100S WAIT - P2P CONNECTIVITY CHECK").await;
}
println!("=== River Invitation Flow Test ===");
println!("Testing: Gateway + Alice (Peer 1) + Bob (Peer 2)");
println!("\n[STEP 1] Create a room on Alice");
println!(" - Using pre-configured Alice owner key: {:?}", alice_verifying_key);
println!("Deploying contract...");
let contract_key = deploy_room_contract(
&mut client_node1,
initial_state.room_state.clone(),
&initial_state.parameters,
false,
).await.map_err(|e| format!("Failed to deploy River contract: {}", e))?;
println!("\n[STEP 2] Alice subscribes to the room");
println!("About to call subscribe_to_contract for Alice...");
let subscribe_start = std::time::Instant::now();
subscribe_to_contract(&mut client_node1, contract_key).await
.map_err(|e| format!("Alice subscribe failed after {:?}: {}", subscribe_start.elapsed(), e))?;
println!("[SUCCESS] Alice subscribe completed successfully in {:?}", subscribe_start.elapsed());
tokio::time::sleep(Duration::from_secs(2)).await;
println!("[STEP 3] Continuing to Step 3 after 2s sleep...");
println!("\n[STEP 3] Alice creates invitation for Bob");
let bob_signing_key = ed25519_dalek::SigningKey::from_bytes(&[5u8; 32]);
let bob_verifying_key = bob_signing_key.verifying_key();
println!(" - Bob's key: {:?}", bob_verifying_key);
let bob_member = river_core::room_state::member::Member {
owner_member_id: alice_verifying_key.into(),
member_vk: bob_verifying_key,
invited_by: alice_verifying_key.into(),
};
let authorized_bob_member = river_core::room_state::member::AuthorizedMember::new(
bob_member, &alice_signing_key
);
println!(" - Created authorized member for Bob");
println!("\n Step 4: Bob accepts invitation and joins room");
// Use retries for Bob's subscribe since it needs network routing
// which can be slower in CI environments
subscribe_to_contract_with_retries(&mut _bob_client, contract_key, 3).await
.map_err(|e| format!("Bob subscribe failed: {}", e))?;
let mut bob_clients = vec![&mut _bob_client];
let bob_room_states = get_all_room_states(&mut bob_clients, contract_key).await
.map_err(|e| format!("Bob failed to get room state: {}", e))?;
let mut bob_room_state = bob_room_states[0].clone();
let bob_members_delta = river_core::room_state::member::MembersDelta::new(
vec![authorized_bob_member.clone()]
);
bob_room_state.members.apply_delta(&bob_room_state.clone(), &initial_state.parameters, &Some(bob_members_delta))
.map_err(|e| format!("Failed to add Bob to members: {}", e))?;
let bob_member_info = river_core::room_state::member_info::MemberInfo {
member_id: bob_signing_key.verifying_key().into(),
version: 0,
preferred_nickname: river_core::room_state::privacy::SealedBytes::public("Bob".to_string().into_bytes()),
};
let authorized_bob_info = river_core::room_state::member_info::AuthorizedMemberInfo::new_with_member_key(
bob_member_info, &bob_signing_key
);
bob_room_state.member_info.member_info.push(authorized_bob_info.clone());
let bob_membership_delta = river_core::room_state::ChatRoomStateV1Delta {
members: Some(river_core::room_state::member::MembersDelta::new(vec![authorized_bob_member.clone()])),
..Default::default()
};
println!(" - Sending Bob's membership delta to network...");
update_room_state_delta(&mut _bob_client, contract_key, bob_membership_delta).await
.map_err(|e| format!("Failed to update Bob's membership: {}", e))?;
println!(" - Waiting for membership update response...");
wait_for_update_response(&mut _bob_client, &contract_key).await
.map_err(|e| format!("Bob membership update response failed: {}", e))?;
println!(" - Bob membership update successful!");
println!(" - Sending Bob's member info (nickname)...");
let bob_info_delta = river_core::room_state::ChatRoomStateV1Delta {
member_info: Some(vec![authorized_bob_info.clone()]),
..Default::default()
};
update_room_state_delta(&mut _bob_client, contract_key, bob_info_delta).await
.map_err(|e| format!("Failed to update Bob's member info: {}", e))?;
wait_for_update_response(&mut _bob_client, &contract_key).await
.map_err(|e| format!("Bob member info update response failed: {}", e))?;
println!(" - Bob member info update successful!");
println!(" - Bob successfully joined the room");
tokio::time::sleep(Duration::from_secs(3)).await;
println!("\n Step 5: Testing message propagation between Alice and Bob");
println!(" - Alice sends message: 'Hello Bob!'");
let mut alice_clients = vec![&mut client_node1];
let alice_room_states = get_all_room_states(&mut alice_clients, contract_key).await?;
send_test_message(&mut client_node1, contract_key, &alice_room_states[0], &initial_state.parameters,
"Hello Bob!".to_string(), &alice_signing_key).await
.map_err(|e| format!("Alice failed to send message: {}", e))?;
wait_for_update_response(&mut client_node1, &contract_key).await?;
tokio::time::sleep(Duration::from_secs(2)).await;
println!(" - Bob sends message: 'Hello Alice!'");
let mut bob_clients = vec![&mut _bob_client];
let bob_room_states = get_all_room_states(&mut bob_clients, contract_key).await?;
send_test_message(&mut _bob_client, contract_key, &bob_room_states[0], &initial_state.parameters,
"Hello Alice!".to_string(), &bob_signing_key).await
.map_err(|e| format!("Bob failed to send message: {}", e))?;
wait_for_update_response(&mut _bob_client, &contract_key).await?;
tokio::time::sleep(Duration::from_secs(3)).await;
println!("\n Step 6: Verifying message propagation (Issue #1775 test)");
let mut all_clients = vec![&mut client_node1, &mut _bob_client];
let final_states = get_all_room_states(&mut all_clients, contract_key).await?;
let (alice_final_state, bob_final_state) = (&final_states[0], &final_states[1]);
println!(" - Alice sees {} messages", alice_final_state.recent_messages.messages.len());
println!(" - Bob sees {} messages", bob_final_state.recent_messages.messages.len());
for (i, msg) in alice_final_state.recent_messages.messages.iter().enumerate() {
println!(" Alice msg {}: '{}'", i+1, msg.message.content);
}
for (i, msg) in bob_final_state.recent_messages.messages.iter().enumerate() {
println!(" Bob msg {}: '{}'", i+1, msg.message.content);
}
if alice_final_state.recent_messages.messages.len() != bob_final_state.recent_messages.messages.len() {
println!(" Alice has {} messages, Bob has {} messages",
alice_final_state.recent_messages.messages.len(),
bob_final_state.recent_messages.messages.len());
return Err("Inconsistent message propagation detected".into());
}
if !river_states_equal(alice_final_state, bob_final_state) {
println!("State inconsistency between Alice and Bob!");
return Err("Room state inconsistency detected".into());
}
println!("SUCCESS: Both Alice and Bob see all {} messages consistently!",
alice_final_state.recent_messages.messages.len());
println!("TEST PASSED - Message propagation works correctly!");
Ok(())
}).instrument(span!(Level::INFO, "test_invitation_message_propagation_network_test"));
tokio::select! {
result = gateway_node => {
match result {
Ok(_) => Err("Gateway node exited unexpectedly".into()),
Err(e) => Err(format!("Gateway node failed: {}", e).into())
}
}
result = alice_node => {
match result {
Ok(_) => Err("Alice node exited unexpectedly".into()),
Err(e) => Err(format!("Alice node failed: {}", e).into())
}
}
result = bob_node => {
match result {
Ok(_) => Err("Bob node exited unexpectedly".into()),
Err(e) => Err(format!("Bob node failed: {}", e).into())
}
}
result = charlie_node => {
match result {
Ok(_) => Err("Charlie node exited unexpectedly".into()),
Err(e) => Err(format!("Charlie node failed: {}", e).into())
}
}
result = network_test => {
match result {
Ok(inner_result) => inner_result,
Err(_timeout_error) => Err("Invitation message propagation test timed out after 500 seconds".into())
}
}
}
}
.instrument(span)
.await
}