Skip to content

Commit 9a62b1e

Browse files
committed
Prune stale payjoin sessions on DB open
Delete payjoin sessions older than 30 days when the payjoin database is accessed. Remove related event rows in the same cleanup pass.
1 parent 6b06cf7 commit 9a62b1e

File tree

2 files changed

+76
-3
lines changed

2 files changed

+76
-3
lines changed

src/payjoin/db.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub type Result<T> = std::result::Result<T, Error>;
1616

1717
/// Default filename for the payjoin database
1818
pub const DB_FILENAME: &str = "payjoin.sqlite";
19+
const SESSION_RETENTION_SECS: i64 = 30 * 24 * 60 * 60;
1920

2021
/// Returns the current Unix timestamp in seconds
2122
#[inline]
@@ -109,6 +110,78 @@ impl Database {
109110
Ok(was_seen_before)
110111
}
111112

113+
/// Removes old completed sessions and stale incomplete sessions plus their event logs.
114+
pub fn prune_expired_sessions(&self) -> Result<()> {
115+
let cutoff = now() - SESSION_RETENTION_SECS;
116+
let mut conn = self.conn();
117+
let tx = conn.transaction()?;
118+
let stale_send_session_ids = {
119+
let mut stmt = tx.prepare(
120+
"SELECT session_id FROM send_sessions
121+
WHERE (completed_at IS NOT NULL AND completed_at < ?1)
122+
OR (
123+
completed_at IS NULL
124+
AND session_id IN (
125+
SELECT session_id FROM send_session_events
126+
GROUP BY session_id
127+
HAVING MAX(created_at) < ?1
128+
)
129+
)",
130+
)?;
131+
let rows = stmt.query_map(params![cutoff], |row| row.get::<_, i64>(0))?;
132+
let mut ids = Vec::new();
133+
for row in rows {
134+
ids.push(row?);
135+
}
136+
ids
137+
};
138+
let stale_receive_session_ids = {
139+
let mut stmt = tx.prepare(
140+
"SELECT session_id FROM receive_sessions
141+
WHERE (completed_at IS NOT NULL AND completed_at < ?1)
142+
OR (
143+
completed_at IS NULL
144+
AND session_id IN (
145+
SELECT session_id FROM receive_session_events
146+
GROUP BY session_id
147+
HAVING MAX(created_at) < ?1
148+
)
149+
)",
150+
)?;
151+
let rows = stmt.query_map(params![cutoff], |row| row.get::<_, i64>(0))?;
152+
let mut ids = Vec::new();
153+
for row in rows {
154+
ids.push(row?);
155+
}
156+
ids
157+
};
158+
159+
for session_id in stale_send_session_ids {
160+
tx.execute(
161+
"DELETE FROM send_session_events WHERE session_id = ?1",
162+
params![session_id],
163+
)?;
164+
tx.execute(
165+
"DELETE FROM send_sessions WHERE session_id = ?1",
166+
params![session_id],
167+
)?;
168+
}
169+
170+
for session_id in stale_receive_session_ids {
171+
tx.execute(
172+
"DELETE FROM receive_session_events WHERE session_id = ?1",
173+
params![session_id],
174+
)?;
175+
tx.execute(
176+
"DELETE FROM receive_sessions WHERE session_id = ?1",
177+
params![session_id],
178+
)?;
179+
}
180+
181+
tx.commit()?;
182+
Ok(())
183+
}
184+
112185
/// Returns IDs of all active (incomplete) receive sessions
113186
pub fn get_recv_session_ids(&self) -> Result<Vec<SessionId>> {
114187
let conn = self.conn();

src/utils.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,9 @@ pub fn open_payjoin_db(
142142
use crate::payjoin::db::{DB_FILENAME, Database};
143143
let wallet_dir = prepare_home_dir(datadir)?.join(wallet_name);
144144
std::fs::create_dir_all(&wallet_dir).map_err(|e| Error::Generic(e.to_string()))?;
145-
Ok(std::sync::Arc::new(Database::create(
146-
wallet_dir.join(DB_FILENAME),
147-
)?))
145+
let db = std::sync::Arc::new(Database::create(wallet_dir.join(DB_FILENAME))?);
146+
db.prune_expired_sessions()?;
147+
Ok(db)
148148
}
149149

150150
#[cfg(any(

0 commit comments

Comments
 (0)