chanevents+faraday: add chan events store and start#237
chanevents+faraday: add chan events store and start#237bitromortac wants to merge 8 commits intolightninglabs:faraday-forwarding-abilityfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a new subsystem for monitoring and recording Lightning Network channel events. It adds the necessary infrastructure to track channel lifecycle events, such as opening, closing, and status updates, and integrates this monitoring into the main application lifecycle. The changes include database configuration support for both SQLite and Postgres to ensure persistent storage of these events. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
| import ( | ||
| "github.com/btcsuite/btclog/v2" | ||
| "github.com/lightninglabs/faraday/accounting" | ||
| "github.com/lightninglabs/faraday/chanevents" |
There was a problem hiding this comment.
will remove the root logger mention from the commit message
There was a problem hiding this comment.
Code Review
This pull request introduces a channel events monitor to track LND channel state changes and record them in a persistent database, supporting both SQLite and Postgres backends. The implementation includes a background monitor loop, initial state synchronization, and integration into the main Faraday service lifecycle. Review feedback identifies several critical areas for improvement: implementing a reconnection strategy for the monitor loop to ensure resilience against transient LND connection failures, addressing a potential race condition between initial synchronization and event subscription, and optimizing the startup process to prevent redundant database entries. Additionally, a missing default case in the database store initialization was noted as a potential source of nil pointer panics.
| case err, ok := <-errChan: | ||
| if !ok { | ||
| log.Info("Channel event error stream closed") | ||
| return | ||
| } | ||
| log.Errorf("error from channel event subscription: %v", err) | ||
| return |
There was a problem hiding this comment.
The monitor loop terminates permanently if the channel event subscription encounters an error or the error channel is closed. This will cause the monitor to stop working silently until the entire daemon is restarted. It is recommended to implement a reconnection strategy with exponential backoff to ensure the monitor remains resilient to transient connection issues with lnd.
There was a problem hiding this comment.
This will be addressed in a later PR.
| if err := m.initialSync(ctx); err != nil { | ||
| log.Errorf("error during initial sync: %v", err) | ||
| // We'll continue anyway, maybe the subscription will work. | ||
| } | ||
|
|
||
| // Subscribe to channel events. | ||
| // TODO: Add reconnection logic. | ||
| eventChan, errChan, err := m.lnd.SubscribeChannelEvents(ctx) |
There was a problem hiding this comment.
There is a potential race condition because initialSync is performed before starting the subscription. Any channel events occurring during the sync process or in the interval before the subscription is established might be missed. To avoid this, the subscription should be started first (to begin buffering events), followed by the initial sync, and then the buffered events should be processed while filtering out any that were already captured during the sync.
There was a problem hiding this comment.
Would address this in a later PR.
| func (m *Monitor) initialSync(ctx context.Context) error { | ||
| log.Info("Performing initial sync of channel state") | ||
|
|
||
| closedChannels, err := m.lnd.ClosedChannels(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("error listing closed channels: %w", err) | ||
| } | ||
|
|
||
| for _, channel := range closedChannels { | ||
| // Channels that didn't confirm onchain will be present here, | ||
| // but don't have a channel ID. We skip those. | ||
| if channel.ChannelID == 0 { | ||
| log.Debugf("Skipping closed channel with no "+ | ||
| "channel ID: %s", channel.ChannelPoint) | ||
|
|
||
| continue | ||
| } | ||
|
|
||
| err := m.addChannel( | ||
| ctx, channel.PubKeyBytes, channel.ChannelPoint, | ||
| channel.ChannelID, | ||
| ) | ||
|
|
||
| if err != nil { | ||
| log.Errorf("error adding closed channel %s: %v", | ||
| channel.ChannelPoint, err) | ||
|
|
||
| continue | ||
| } | ||
|
|
||
| dbChan, err := m.store.GetChannel(ctx, channel.ChannelPoint) | ||
| if err != nil { | ||
| log.Errorf("error getting closed channel %s from db: %v", | ||
| channel.ChannelPoint, err) | ||
|
|
||
| continue | ||
| } | ||
|
|
||
| if err := m.store.AddChannelEvent(ctx, &ChannelEvent{ | ||
| ChannelID: dbChan.ID, | ||
| EventType: EventTypeOffline, | ||
| }); err != nil { | ||
| log.Errorf("error adding offline event for closed "+ | ||
| "channel %s: %v", channel.ChannelPoint, err) | ||
| } | ||
| } | ||
|
|
||
| channels, err := m.lnd.ListChannels(ctx, false, false) | ||
| if err != nil { | ||
| return fmt.Errorf("error listing channels: %w", err) | ||
| } | ||
|
|
||
| for _, channel := range channels { | ||
| // We make sure the channel exists in the store. | ||
| err := m.addChannel( | ||
| ctx, channel.PubKeyBytes, channel.ChannelPoint, | ||
| channel.ChannelID, | ||
| ) | ||
| if err != nil { | ||
| log.Errorf("error adding channel %s: %v", | ||
| channel.ChannelPoint, err) | ||
|
|
||
| continue | ||
| } | ||
|
|
||
| dbChan, err := m.store.GetChannel(ctx, channel.ChannelPoint) | ||
| if err != nil { | ||
| log.Errorf("error getting channel %s from db: %v", | ||
| channel.ChannelPoint, err) | ||
|
|
||
| continue | ||
| } | ||
|
|
||
| eventType := EventTypeOffline | ||
| if channel.Active { | ||
| eventType = EventTypeOnline | ||
| } | ||
|
|
||
| if err := m.store.AddChannelEvent(ctx, &ChannelEvent{ | ||
| ChannelID: dbChan.ID, | ||
| EventType: eventType, | ||
| }); err != nil { | ||
| log.Errorf("error adding event for channel %s: %v", | ||
| channel.ChannelPoint, err) | ||
| } | ||
|
|
||
| if err := m.store.AddChannelEvent(ctx, &ChannelEvent{ | ||
| ChannelID: dbChan.ID, | ||
| EventType: EventTypeUpdate, | ||
| LocalBalance: fn.Some(channel.LocalBalance), | ||
| RemoteBalance: fn.Some(channel.RemoteBalance), | ||
| }); err != nil { | ||
| log.Errorf("error adding event for channel %s: %v", | ||
| channel.ChannelPoint, err) | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
The initialSync process iterates through all closed and open channels and records Online/Offline and Update events on every startup. This approach has several issues:
- Database Bloat: It inserts redundant events even if the channel state hasn't changed since the last run.
- Scalability: For nodes with a large number of channels, this results in many database queries and inserts on every startup.
- Shutdown Responsiveness: The loops do not check if the context has been cancelled (
ctx.Done()), which could delay shutdown if there are many channels to process.
Consider only recording events if the state has changed from the last known state in the database, and ensure the loops are context-aware.
There was a problem hiding this comment.
This is true as well, but I think we want to address this later. An action I took was to add a isSync row to signal that the event was recorded during startup, to distinguish real events from startup sync.
This was forgotten previously.
Introduce requireEqualEvent to reduce boilerplate in store tests, comparing all user-set fields while ignoring the auto-assigned ID.
We want to know if an update came from an initial sync. This also helps us to identify data gaps and one can be sure it was not due to an actual event. Modify the migration as it's unreleased.
Also refactor the root logger.
690ada9 to
4587493
Compare
Adds the channel event monitor and collects channel update events.