Skip to content

Commit ef239e8

Browse files
feat(store): add async storage backends for PostgreSQL and DynamoDB (v0.9)
Add AsyncStore trait and two feature-gated async backend implementations: - AsyncStore trait mirrors sync Store with 15 async methods - PostgresStore (feature: postgres) using sqlx-core + sqlx-postgres - DynamoStore (feature: dynamodb) with single-table design and GSIs - Feature flags keep dependencies optional - Type aliases: BoxAsyncStore, ArcAsyncStore Technical notes: - Uses sqlx-core/sqlx-postgres directly to avoid sqlite version conflict - DynamoDB uses PK=DOC#id, SK prefixes: OP#, SNAP#, MILE#, VV# - GSI1 for operation lookup, GSI2 for actor-based queries 35 tests passing (31 existing + 4 new DynamoDB key tests).
1 parent 8450c00 commit ef239e8

11 files changed

Lines changed: 2114 additions & 2 deletions

File tree

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ sha2 = "0.10"
7777
# Storage
7878
rusqlite = { version = "0.33", features = ["bundled"] }
7979

80+
# Async storage backends (feature-gated in conflux-store)
81+
# Note: sqlx is declared directly in conflux-store to avoid sqlite version conflicts
82+
aws-sdk-dynamodb = "1.54"
83+
aws-config = "1.6"
84+
8085
# CLI
8186
clap = { version = "4.5", features = ["derive"] }
8287

crates/store/Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ repository.workspace = true
88
rust-version.workspace = true
99
description = "Persistent state storage and operation log for Conflux documents"
1010

11+
[features]
12+
default = []
13+
postgres = ["dep:sqlx-core", "dep:sqlx-postgres", "dep:async-trait"]
14+
dynamodb = ["dep:aws-sdk-dynamodb", "dep:aws-config", "dep:async-trait"]
15+
1116
[dependencies]
1217
conflux-core = { path = "../core" }
1318
conflux-schema = { path = "../schema" }
@@ -20,5 +25,18 @@ chrono = { workspace = true }
2025
uuid = { workspace = true }
2126
parking_lot = { workspace = true }
2227

28+
# Async (optional, for async backends)
29+
async-trait = { workspace = true, optional = true }
30+
31+
# PostgreSQL (optional)
32+
# Note: Using sqlx-core + sqlx-postgres directly to avoid sqlite conflicts with rusqlite
33+
sqlx-core = { version = "0.8", default-features = false, features = ["_rt-tokio", "chrono", "uuid", "json"], optional = true }
34+
sqlx-postgres = { version = "0.8", default-features = false, features = ["chrono", "uuid", "json"], optional = true }
35+
36+
# DynamoDB (optional)
37+
aws-sdk-dynamodb = { workspace = true, optional = true }
38+
aws-config = { workspace = true, optional = true }
39+
2340
[dev-dependencies]
2441
insta = { workspace = true }
42+
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }

crates/store/src/dynamo/keys.rs

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
//! Key builders for DynamoDB single-table design.
2+
3+
use conflux_core::HlcTimestamp;
4+
use uuid::Uuid;
5+
6+
/// Key prefix constants and builders for the single-table design.
7+
pub struct Keys;
8+
9+
impl Keys {
10+
// --- Partition Key Builders ---
11+
12+
/// Builds the partition key for a document: `DOC#<document_id>`
13+
pub fn doc_pk(document_id: &str) -> String {
14+
format!("DOC#{document_id}")
15+
}
16+
17+
/// Builds the GSI1 partition key for operation lookup: `OP#<operation_id>`
18+
pub fn operation_gsi1_pk(operation_id: &Uuid) -> String {
19+
format!("OP#{operation_id}")
20+
}
21+
22+
/// Builds the GSI2 partition key for actor-based queries: `ACTOR#<document_id>#<actor_id>`
23+
pub fn actor_gsi2_pk(document_id: &str, actor_id: &str) -> String {
24+
format!("ACTOR#{document_id}#{actor_id}")
25+
}
26+
27+
// --- Sort Key Builders ---
28+
29+
/// Builds the sort key for an operation: `OP#<hlc_timestamp>#<operation_id>`
30+
///
31+
/// Using HLC as the primary sort component ensures causal ordering.
32+
pub fn operation_sk(hlc: &HlcTimestamp, operation_id: &Uuid) -> String {
33+
format!("OP#{}#{}", hlc, operation_id)
34+
}
35+
36+
/// Builds the sort key for a snapshot: `SNAP#<hlc_timestamp>#<snapshot_id>`
37+
pub fn snapshot_sk(hlc: &HlcTimestamp, snapshot_id: &Uuid) -> String {
38+
format!("SNAP#{}#{}", hlc, snapshot_id)
39+
}
40+
41+
/// Builds the sort key for a milestone: `MILE#<created_at>#<milestone_id>`
42+
///
43+
/// Using created_at as the primary sort component enables newest-first queries.
44+
pub fn milestone_sk(created_at: &str, milestone_id: &Uuid) -> String {
45+
format!("MILE#{}#{}", created_at, milestone_id)
46+
}
47+
48+
/// The sort key for a version vector: `VV#`
49+
pub fn version_vector_sk() -> &'static str {
50+
"VV#"
51+
}
52+
53+
// --- Sort Key Prefixes (for range queries) ---
54+
55+
/// Prefix for operation sort keys.
56+
pub const OP_PREFIX: &'static str = "OP#";
57+
58+
/// Prefix for snapshot sort keys.
59+
pub const SNAP_PREFIX: &'static str = "SNAP#";
60+
61+
/// Prefix for milestone sort keys.
62+
pub const MILE_PREFIX: &'static str = "MILE#";
63+
64+
// --- Parsing ---
65+
66+
/// Extracts the HLC timestamp from an operation sort key.
67+
///
68+
/// Input format: `OP#<hlc>#<uuid>`
69+
#[allow(dead_code)]
70+
pub fn parse_operation_sk_hlc(sk: &str) -> Option<&str> {
71+
let rest = sk.strip_prefix(Self::OP_PREFIX)?;
72+
let parts: Vec<&str> = rest.splitn(2, '#').collect();
73+
parts.first().copied()
74+
}
75+
76+
/// Extracts the operation ID from an operation sort key.
77+
///
78+
/// Input format: `OP#<hlc>#<uuid>`
79+
#[allow(dead_code)]
80+
pub fn parse_operation_sk_id(sk: &str) -> Option<&str> {
81+
let rest = sk.strip_prefix(Self::OP_PREFIX)?;
82+
let parts: Vec<&str> = rest.splitn(2, '#').collect();
83+
parts.get(1).copied()
84+
}
85+
86+
/// Extracts the HLC timestamp from a snapshot sort key.
87+
///
88+
/// Input format: `SNAP#<hlc>#<uuid>`
89+
#[allow(dead_code)]
90+
pub fn parse_snapshot_sk_hlc(sk: &str) -> Option<&str> {
91+
let rest = sk.strip_prefix(Self::SNAP_PREFIX)?;
92+
let parts: Vec<&str> = rest.splitn(2, '#').collect();
93+
parts.first().copied()
94+
}
95+
}
96+
97+
#[cfg(test)]
98+
mod tests {
99+
use super::*;
100+
101+
#[test]
102+
fn doc_pk_format() {
103+
assert_eq!(Keys::doc_pk("my-doc"), "DOC#my-doc");
104+
}
105+
106+
#[test]
107+
fn operation_gsi1_pk_format() {
108+
let id = Uuid::nil();
109+
assert_eq!(
110+
Keys::operation_gsi1_pk(&id),
111+
"OP#00000000-0000-0000-0000-000000000000"
112+
);
113+
}
114+
115+
#[test]
116+
fn actor_gsi2_pk_format() {
117+
assert_eq!(
118+
Keys::actor_gsi2_pk("doc-1", "alice"),
119+
"ACTOR#doc-1#alice"
120+
);
121+
}
122+
123+
#[test]
124+
fn parse_operation_sk() {
125+
let sk = "OP#2024-01-01T00:00:00.000Z/node1/1#550e8400-e29b-41d4-a716-446655440000";
126+
assert_eq!(
127+
Keys::parse_operation_sk_hlc(sk),
128+
Some("2024-01-01T00:00:00.000Z/node1/1")
129+
);
130+
assert_eq!(
131+
Keys::parse_operation_sk_id(sk),
132+
Some("550e8400-e29b-41d4-a716-446655440000")
133+
);
134+
}
135+
}

crates/store/src/dynamo/mod.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
//! DynamoDB-backed async storage for Conflux.
2+
//!
3+
//! This module provides a DynamoDB implementation of the [`AsyncStore`](crate::AsyncStore) trait
4+
//! using a single-table design optimized for serverless deployments.
5+
//!
6+
//! # Table Design
7+
//!
8+
//! Uses a single-table design with the following key structure:
9+
//!
10+
//! | PK | SK | Type |
11+
//! |----|----|----|
12+
//! | `DOC#<doc_id>` | `OP#<hlc>#<op_id>` | Operation |
13+
//! | `DOC#<doc_id>` | `SNAP#<hlc>#<id>` | Snapshot |
14+
//! | `DOC#<doc_id>` | `MILE#<created>#<id>` | Milestone |
15+
//! | `DOC#<doc_id>` | `VV#` | Version Vector |
16+
//!
17+
//! # GSIs
18+
//!
19+
//! - **GSI1**: `gsi1_pk=OP#<op_id>` — Operation lookup by ID
20+
//! - **GSI2**: `gsi2_pk=ACTOR#<doc>#<actor>`, `gsi2_sk=<hlc>` — Operations by actor
21+
//!
22+
//! # Example
23+
//!
24+
//! ```ignore
25+
//! use conflux_store::DynamoStore;
26+
//!
27+
//! let store = DynamoStore::new("conflux-store").await?;
28+
//! store.create_table_if_not_exists().await?;
29+
//! store.append_operation("doc-1", &op).await?;
30+
//! ```
31+
32+
mod keys;
33+
mod store;
34+
35+
pub use store::DynamoStore;

0 commit comments

Comments
 (0)