The daemon mode is implemented and functional with advanced features for production use:
nostrweet daemon \
--user alice \
--user bob \
--user charlie \
--relay wss://relay.damus.io \
--relay wss://nos.lol \
--blossom-server https://blossom.example.com \
--max-concurrent 3 \
--poll-interval 300- ✅ Continuous monitoring of multiple Twitter users
- ✅ Automatic posting to Nostr relays
- ✅ Cache-as-state architecture (no separate state files)
- ✅ Automatic recovery from crashes
- ✅ Per-user exponential backoff on failures
- ✅ Rate limiting for Twitter API
- ✅ Statistics reporting every 60 seconds
- ✅ Graceful shutdown with Ctrl+C
- ✅ Downloads and posts referenced profiles
- ✅ Media handling and optional Blossom uploads
- ✅ Smart resume with
since_idfor efficient Twitter API usage
The daemon mode continuously monitors a set of Twitter users and automatically posts new tweets to Nostr relays. It uses the local cache as the complete state mechanism, eliminating the need for separate state files.
The local cache directory serves as the complete state mechanism:
- Tweet presence: If
YYYYMMDD_HHMMSS_username_tweetid.jsonexists, the tweet has been downloaded - Nostr event presence: If
nostr_events/event_<nostr_event_id>.jsonexists, the tweet has been posted to Nostr - Profile presence: If
YYYYMMDD_HHMMSS_username_profile.jsonexists, the profile has been downloaded - Not-found markers:
.not_foundfiles indicate tweets that no longer exist
Key insight: The filename format (YYYYMMDD_HHMMSS_username_tweetid.json) contains all metadata needed:
- Timestamp tells us when the tweet was created
- Username identifies the Twitter user
- Tweet ID is the unique identifier we can use with Twitter's API
This design ensures:
- State is continuously written during processing (not just at the end)
- Recovery from crashes is automatic (just restart and check cache)
- No state synchronization issues between state file and actual files
- State inspection is simple (just look at the filesystem)
- The latest tweet ID can be extracted directly from filenames
nostrweet daemon \
--user user1 \
--user user2 \
--user user3 \
--relay wss://relay1.com \
--relay wss://relay2.com \
--poll-interval 300 \
--data-dir ./downloads--user: Twitter username to monitor (can be specified multiple times)--relay: Nostr relay to post to (can be specified multiple times)--poll-interval: Seconds between polling cycles (default: 300)--data-dir: Data directory for all storage (required)--blossom-server: Blossom server for media (can be specified multiple times)--mnemonic: Optional BIP39 mnemonic phrase for deriving Nostr keys
#[derive(Parser, Debug)]
struct DaemonCommand {
/// Twitter usernames to monitor
#[arg(short, long = "user", required = true, action = clap::ArgAction::Append)]
users: Vec<String>,
/// Nostr relay addresses to post to
#[arg(short, long = "relay", required = true, action = clap::ArgAction::Append)]
relays: Vec<String>,
/// Blossom server addresses for media uploads
#[arg(short = 'b', long = "blossom-server", action = clap::ArgAction::Append)]
blossom_servers: Vec<String>,
/// Seconds between polling cycles
#[arg(short, long, default_value = "300")]
poll_interval: u64,
/// BIP39 mnemonic phrase for deriving Nostr keys
#[arg(long, env = "NOSTRWEET_MNEMONIC")]
mnemonic: Option<String>,
}async fn init_daemon(config: DaemonConfig) -> Result<DaemonState> {
// Verify output directory exists
ensure_output_dir(&config.output_dir)?;
// Test Twitter API access
let twitter_client = TwitterClient::new(&config.output_dir)?;
// Connect to Nostr relays
let nostr_client = connect_to_relays(&config.relays).await?;
// Initialize user states from cache
let user_states = init_user_states(&config.users, &config.output_dir)?;
Ok(DaemonState {
config,
twitter_client,
nostr_client,
user_states
})
}async fn run_daemon(mut state: DaemonState) -> Result<()> {
loop {
for username in &state.config.users {
// Process each user
process_user(&mut state, username).await?;
// Small delay between users
tokio::time::sleep(Duration::from_secs(5)).await;
}
// Wait for next polling cycle
info!("Sleeping for {} seconds", state.config.poll_interval);
tokio::time::sleep(Duration::from_secs(state.config.poll_interval)).await;
}
}async fn process_user(state: &mut DaemonState, username: &str) -> Result<()> {
info!("Processing user: @{}", username);
// Fetch recent tweets (with retry logic)
let tweets = fetch_with_retry(&state.twitter_client, username).await?;
for tweet in tweets {
// Check if already downloaded (cache check)
if is_tweet_cached(&tweet.id, &state.config.output_dir) {
debug!("Tweet {} already cached", tweet.id);
continue;
}
// Download tweet and media
download_tweet_and_media(&tweet, &state.config.output_dir).await?;
info!("Downloaded new tweet: {}", tweet.id);
// Download referenced profiles
download_referenced_profiles(&tweet, &state.twitter_client).await?;
// Check if already posted to Nostr
if is_tweet_posted_to_nostr(&tweet.id, &state.config.output_dir) {
debug!("Tweet {} already posted to Nostr", tweet.id);
continue;
}
// Post to Nostr
post_tweet_to_nostr(&tweet, &state.nostr_client).await?;
info!("Posted tweet {} to Nostr", tweet.id);
// Post referenced profiles to Nostr
post_referenced_profiles_to_nostr(&tweet, &state.nostr_client).await?;
}
Ok(())
}State is written immediately after each operation:
- Tweet Download: Save
tweet.jsonimmediately after download - Media Download: Save media files as they complete
- Nostr Posting: Save
nostr_events/event_*.jsonimmediately after successful post - Profile Updates: Save profile JSON after each fetch
downloads/
├── 20240101_120000_user1_123456789.json # Tweet
├── user1_123456789_1.jpg # Media
├── 20240101_120000_user1_profile.json # Profile
├── tweet_987654321.not_found # Not found marker
└── nostr_events/
└── event_abc123def456.json # Nostr event
The daemon uses the filesystem to intelligently resume from the exact point where it left off:
fn find_latest_tweet_id_for_user(username: &str, output_dir: &Path) -> Option<String> {
// List all files matching *_username_*.json
let pattern = format!("{dir}/*_{username}_*.json", dir = output_dir.display());
let mut latest_tweet_id: Option<String> = None;
for path in glob::glob(&pattern).ok()?.flatten() {
if let Some(filename) = path.file_stem() {
// Extract tweet ID from filename (last part after final underscore)
if let Some(tweet_id) = filename.to_string_lossy().split('_').last() {
// Twitter IDs are sortable (snowflake IDs increase over time)
if latest_tweet_id.as_ref().map_or(true, |latest| tweet_id > latest) {
latest_tweet_id = Some(tweet_id.to_string());
}
}
}
}
latest_tweet_id
}async fn fetch_new_tweets_for_user(client: &TwitterClient, username: &str, output_dir: &Path) -> Result<Vec<Tweet>> {
// Find the latest tweet we already have
let since_id = find_latest_tweet_id_for_user(username, output_dir);
// Fetch only tweets newer than our latest
client.get_user_timeline(username, since_id).await
}- No duplicate fetching: Only retrieves tweets newer than what's cached
- Efficient catch-up: Can resume after any downtime period
- Self-healing: If files are deleted, automatically re-fetches
- No state files: The cache directory IS the state
- API-friendly: Uses Twitter's
since_idparameter for optimal pagination
On startup or after crash:
- For each user, scan cache to find the latest tweet ID
- Use
since_idto fetch only newer tweets from Twitter - Check nostr_events/ to see what's been posted
- Post any cached tweets that haven't been posted yet
This approach is:
- Deterministic: Same cache state always produces same behavior
- Resilient: No separate state files to corrupt or lose
- Transparent: State is human-readable in the filesystem
async fn fetch_with_retry(client: &TwitterClient, username: &str) -> Result<Vec<Tweet>> {
let mut backoff = ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_secs(1))
.with_max_interval(Duration::from_secs(60))
.with_max_elapsed_time(Some(Duration::from_secs(300)))
.build();
backoff::future::retry(backoff, || async {
match client.get_user_timeline(username, Some(20)).await {
Ok(tweets) => Ok(tweets),
Err(e) if is_rate_limit_error(&e) => {
warn!("Rate limited, backing off");
Err(backoff::Error::transient(e))
}
Err(e) if is_network_error(&e) => {
warn!("Network error, retrying");
Err(backoff::Error::transient(e))
}
Err(e) => Err(backoff::Error::permanent(e))
}
}).await
}async fn main() -> Result<()> {
let (shutdown_tx, shutdown_rx) = oneshot::channel();
// Handle SIGTERM/SIGINT
tokio::spawn(async move {
signal::ctrl_c().await.unwrap();
shutdown_tx.send(()).unwrap();
});
// Run daemon with shutdown signal
tokio::select! {
result = run_daemon(state) => result,
_ = shutdown_rx => {
info!("Received shutdown signal");
// State is already persisted continuously
Ok(())
}
}
}- User timeline: 1500 requests per 15 minutes
- With 10 users, poll every 5 minutes: ~30 requests per 15 minutes (well within limits)
struct RateLimiter {
requests_per_window: u32,
window_duration: Duration,
request_times: VecDeque<Instant>,
}
impl RateLimiter {
async fn wait_if_needed(&mut self) {
// Remove old requests outside window
let cutoff = Instant::now() - self.window_duration;
while let Some(&front) = self.request_times.front() {
if front < cutoff {
self.request_times.pop_front();
} else {
break;
}
}
// If at limit, wait
if self.request_times.len() >= self.requests_per_window as usize {
let oldest = self.request_times.front().unwrap();
let wait_until = *oldest + self.window_duration;
let wait_duration = wait_until.saturating_duration_since(Instant::now());
if wait_duration > Duration::ZERO {
info!("Rate limit reached, waiting {:?}", wait_duration);
tokio::time::sleep(wait_duration).await;
}
}
// Record this request
self.request_times.push_back(Instant::now());
}
}# Required
export TWITTER_BEARER_TOKEN="your_token"
export NOSTRWEET_DATA_DIR="./downloads"
export NOSTRWEET_RELAYS="wss://relay1.com,wss://relay2.com"
# Optional
export NOSTRWEET_MNEMONIC="your twelve word mnemonic phrase here"
export RUST_LOG="info,nostrweet=debug"# daemon.toml
[daemon]
poll_interval = 300
max_tweets_per_poll = 20
[twitter]
users = ["user1", "user2", "user3"]
[nostr]
relays = ["wss://relay1.com", "wss://relay2.com"]
blossom_servers = ["https://blossom1.com"]
[rate_limits]
requests_per_window = 100
window_seconds = 900async fn health_check(state: &DaemonState) -> HealthStatus {
HealthStatus {
uptime: state.start_time.elapsed(),
last_poll: state.last_poll_time,
tweets_downloaded: count_cached_tweets(&state.config.output_dir),
tweets_posted: count_nostr_events(&state.config.output_dir),
relay_status: check_relay_connections(&state.nostr_client).await,
}
}- Tweets downloaded per user
- Tweets posted to Nostr
- API rate limit status
- Relay connection status
- Error counts by type
// Structured logging for monitoring
info!(
user = username,
tweet_id = tweet.id,
action = "downloaded",
"Tweet downloaded successfully"
);
error!(
user = username,
error = %e,
retry_count = retries,
"Failed to fetch timeline"
);- Cache state detection
- Rate limiter logic
- Error classification (transient vs permanent)
Use the faux crate for creating test doubles:
use faux::create;
#[create]
pub struct TwitterClient {
// ...
}
#[cfg_attr(test, faux::methods)]
impl TwitterClient {
pub async fn get_user_timeline(&self, username: &str, max_results: Option<u32>) -> Result<Vec<Tweet>> {
// Real implementation
}
}
#[tokio::test]
async fn test_daemon_recovery() {
// Setup test cache with existing data
let temp_dir = setup_test_cache().await;
// Create mock Twitter client
let mut mock_client = TwitterClient::faux();
faux::when!(mock_client.get_user_timeline).then_return(Ok(vec![test_tweet()]));
// Start daemon with mock
let state = init_daemon_with_mocks(test_config(&temp_dir), mock_client).await.unwrap();
// Verify it doesn't re-download existing tweets
assert!(!will_download_tweet(&state, "existing_tweet_id"));
// Verify it continues from where it left off
assert!(will_download_tweet(&state, "new_tweet_id"));
}#[cfg_attr(test, faux::create)]
pub struct NostrClient {
// ...
}
#[cfg_attr(test, faux::methods)]
impl NostrClient {
pub async fn send_event(&self, event: Event) -> Result<EventId> {
// Real implementation
}
}
#[tokio::test]
async fn test_nostr_posting() {
let mut mock_nostr = NostrClient::faux();
// Setup expectation
faux::when!(mock_nostr.send_event).then_return(Ok(EventId::from_hex("abc123")?));
// Test posting logic
let result = post_tweet_to_nostr(&tweet, &mock_nostr).await;
assert!(result.is_ok());
// Verify the mock was called
faux::when!(mock_nostr.send_event).times(1);
}- Mock Twitter API responses using
faux - Mock Nostr relay connections using
faux - Verify complete flow from fetch to post
[dev-dependencies]
faux = "0.1"The daemon now includes all features from both phases in a single implementation:
- ✅ Multiple user monitoring with round-robin processing
- ✅ Advanced polling loop with per-user state
- ✅ Exponential backoff error handling per user
- ✅ Cache-based state architecture
- ✅ Graceful shutdown (Ctrl+C)
- ✅ Per-user rate limiting with sliding window
- ✅ Concurrent processing control (configurable)
- ✅ Statistics reporting (every 60 seconds)
- ✅ Smart polling intervals based on user state
- ✅ Command:
nostrweet daemon
- ⬜ Configuration file support
- ⬜ Health check endpoint (HTTP/metrics)
- ⬜ Prometheus metrics export
- ✅ Graceful shutdown with state preservation
- ✅ Resume from last position after restart using filesystem-based approach with
since_id - ⬜ Web dashboard for monitoring
- ⬜ Distributed locking for multi-instance
- ⬜ Database for state (PostgreSQL/SQLite)
- ⬜ Admin API for management
- ⬜ Alerting integration (PagerDuty/Slack)
- ⬜ Docker container with health checks
- ⬜ Kubernetes deployment manifests
- Private Key Storage: Never log or expose private keys
- API Token Security: Mask tokens in logs
- File Permissions: Ensure cache directory has appropriate permissions
- Input Validation: Validate all usernames and URLs
- Rate Limit Respect: Never bypass API rate limits
- Batch Operations: Process multiple tweets in parallel where possible
- Connection Pooling: Reuse HTTP and WebSocket connections
- Incremental Updates: Only fetch new tweets since last check
- Media Caching: Skip re-downloading existing media
- Async I/O: Use tokio for all I/O operations
- Web Dashboard: Show daemon status and statistics
- Webhook Support: Trigger actions on new tweets
- Filter Rules: Only post tweets matching certain criteria
- Thread Support: Properly handle Twitter threads
- Quote Tweet Handling: Special formatting for quotes
- Media Optimization: Compress/resize media before posting
- Multi-Account Support: Post from different Nostr accounts per Twitter user