feat: [ADR-070] Unordered Transactions (1/2)#18641
Conversation
WalkthroughThe overall change introduces support for unordered transactions in the Cosmos SDK, which allows transactions to be processed out of sequence without nonce management by the client. This is achieved through new flags, updates to the transaction factory, and modifications to the AnteHandler decorators for signature verification and transaction management. New tests are added to ensure the integrity of the feature, and documentation is updated to guide through the upgrade process and usage. Changes
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit's AI:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
|
This is great stuff @alexanderbez Does this remove idempotency? If I sign a transfer and broadcast it 2 times, will it be sent twice? |
This is a great question. Assuming the transaction is identical, it would be misappropriate use to send an identical tx twice (e.g. a double spend via MITM attack). So what will happen is that CometBFT will reject the duplicate tx. From the SDK's perspective, even if the 2nd tx made it in somehow to the node, the app's mempool should reject it as well. Finally, as a measure of last resort, This will be clearly documented. |
|
@julienrbrt I've updated some tx related logic to support a new TxBody field -- unordered, e.g. |
client/v2 still uses client/tx for now, so nothing to change there :) |
|
Ok @tac0turtle @yihuang, this PR is at a point where the core logic is more or less complete (AnteHandler + map/manager implementation). The last critical bit left is how to handle node restarts, i.e. ensuring the map is durable. As I see it there are two viable options: Option A: Seed map from consensus block store
Option B: Seed map from application state, i.e. relying on store
@yihuang let me know if I'm missing other options? |
| return ctx, errorsmod.Wrapf(sdkerrors.ErrInvalidRequest, "unordered tx ttl exceeds %d", d.maxUnOrderedTTL) | ||
| } | ||
|
|
||
| txHash := sha256.Sum256(ctx.TxBytes()) |
There was a problem hiding this comment.
is there a better way to reuse the tx hash calculated by the cometbft?
There was a problem hiding this comment.
Unfortunately, we do not have this information exposed on the types.Context. I don't even think CometBFT provides it actually.
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return latestHeight, true |
There was a problem hiding this comment.
it seems when this timeout happens, it could returns 0, true, which leads to the call of expiredTxs with 0 as parameter, which will expires all tx hashes right?
I don't think we need this timeout context here at all.
There was a problem hiding this comment.
which leads to the call of expiredTxs with 0 as parameter, which will expires all tx hashes right
If expiredTxs is called with 0, it'll essentially be a no-op, returning an empty []TxHash. I.e. it wont purge anything.
yihuang
left a comment
There was a problem hiding this comment.
only one minor suggestion for code changes.
|
@yihuang I responded to the comments. Pls let me know what you think :) |
Co-authored-by: yihuang <huang@crypto.com>
| * Update the `App` constructor to create, load, and save the unordered transaction | ||
| manager. | ||
|
|
||
| ```go | ||
| func NewApp(...) *App { | ||
| // ... | ||
|
|
||
| // create, start, and load the unordered tx manager | ||
| utxDataDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data") | ||
| app.UnorderedTxManager = unorderedtx.NewManager(utxDataDir) | ||
| app.UnorderedTxManager.Start() | ||
|
|
||
| if err := app.UnorderedTxManager.OnInit(); err != nil { | ||
| panic(fmt.Errorf("failed to initialize unordered tx manager: %w", err)) | ||
| } |
There was a problem hiding this comment.
Ensure that the UnorderedTxManager is properly initialized in the App constructor, as failure to do so will result in a panic. This is a critical step for the unordered transactions feature to function correctly.
| utxDataDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data") | ||
| app.UnorderedTxManager = unorderedtx.NewManager(utxDataDir) |
There was a problem hiding this comment.
The review comment is correct in stating that the UnorderedTxManager is initialized with a data directory and that there should be error handling for potential I/O issues. The script output shows that the NewManager function in x/auth/ante/unorderedtx/manager.go does attempt to create the directory if it does not exist. However, it does not handle errors that may occur during the directory creation, as the error from os.Mkdir is ignored. This could lead to unhandled I/O errors if the directory creation fails.
- The directory creation is attempted but not properly error-handled in
NewManager. - Potential I/O issues during directory creation are not addressed.
Analysis chain
The UnorderedTxManager is being initialized with a data directory derived from the application options. Ensure that the directory is created if it does not exist and that error handling is in place for potential I/O issues.
Scripts Executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Verify that the data directory for UnorderedTxManager is created and handled correctly.
rg --type go 'NewManager' --files-with-matches
Length of output: 568
Script:
#!/bin/bash
# Check the implementation of NewManager in unorderedtx/manager.go for directory creation and error handling.
cat x/auth/ante/unorderedtx/manager.go
Length of output: 6834
| // Manager contains the tx hash dictionary for duplicates checking, and expire | ||
| // them when block production progresses. | ||
| type Manager struct { | ||
| // blockCh defines a channel to receive newly committed block heights | ||
| blockCh chan uint64 | ||
| // doneCh allows us to ensure the purgeLoop has gracefully terminated prior to closing | ||
| doneCh chan struct{} | ||
|
|
||
| // dataDir defines the directory to store unexpired unordered transactions | ||
| // | ||
| // XXX: Note, ideally we avoid the need to store unexpired unordered transactions | ||
| // directly to file. However, store v1 does not allow such a primitive. But, | ||
| // once store v2 is fully integrated, we can remove manual file handling and | ||
| // store the unexpired unordered transactions directly to SS. | ||
| // | ||
| // Ref: https://github.com/cosmos/cosmos-sdk/issues/18467 | ||
| dataDir string | ||
|
|
||
| mu sync.RWMutex | ||
| // txHashes defines a map from tx hash -> TTL value, which is used for duplicate | ||
| // checking and replay protection, as well as purging the map when the TTL is | ||
| // expired. | ||
| txHashes map[TxHash]uint64 | ||
| } |
There was a problem hiding this comment.
The Manager struct is defined with fields for managing unordered transactions. It includes a channel for block heights, a done channel, a data directory, and a map for transaction hashes with their TTL. The struct seems well-structured for its purpose. However, the comment on lines 42-45 mentions a future improvement related to store v2. This should be tracked as a TODO to ensure it is not forgotten when store v2 is integrated.
| func (m *Manager) Start() { | ||
| go m.purgeLoop() | ||
| } |
There was a problem hiding this comment.
The Start method starts the purge loop in a separate goroutine. This is a potential source of non-determinism and should be carefully managed to ensure that the goroutine is properly synchronized with the rest of the application.
| func (m *Manager) Add(txHash TxHash, ttl uint64) { | ||
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
|
|
||
| m.txHashes[txHash] = ttl | ||
| } |
There was a problem hiding this comment.
The Add method adds a transaction hash with its TTL to the map. The previous comment from coderabbitai[bot] suggests enforcing a maximum TTL. This should be implemented to prevent TTL values that exceed the system's limits.
func (m *Manager) Add(txHash TxHash, ttl uint64) {
m.mu.Lock()
defer m.mu.Unlock()
+ if ttl > DefaultMaxUnOrderedTTL {
+ ttl = DefaultMaxUnOrderedTTL
+ }
m.txHashes[txHash] = ttl
}Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
| func (m *Manager) Add(txHash TxHash, ttl uint64) { | |
| m.mu.Lock() | |
| defer m.mu.Unlock() | |
| m.txHashes[txHash] = ttl | |
| } | |
| func (m *Manager) Add(txHash TxHash, ttl uint64) { | |
| m.mu.Lock() | |
| defer m.mu.Unlock() | |
| if ttl > DefaultMaxUnOrderedTTL { | |
| ttl = DefaultMaxUnOrderedTTL | |
| } | |
| m.txHashes[txHash] = ttl | |
| } |
| // restore with height > ttl which should result in no unordered txs synced | ||
| txm2 := unorderedtx.NewManager(dataDir) | ||
| s2 := unorderedtx.NewSnapshotter(txm2) | ||
| err = s2.RestoreExtension(200, unorderedtx.SnapshotFormat, pr) | ||
| require.NoError(t, err) | ||
| require.Empty(t, txm2.Size()) | ||
|
|
||
| // restore with with height < ttl which should result in all unordered txs synced | ||
| txm3 := unorderedtx.NewManager(dataDir) | ||
| s3 := unorderedtx.NewSnapshotter(txm3) | ||
| err = s3.RestoreExtension(50, unorderedtx.SnapshotFormat, pr) | ||
| require.NoError(t, err) | ||
| require.Equal(t, 100, txm3.Size()) | ||
|
|
||
| for i := 0; i < 100; i++ { | ||
| require.True(t, txm3.Contains([32]byte{byte(i)})) | ||
| } |
There was a problem hiding this comment.
The test checks the scenario where the height is greater than the TTL, expecting no unordered transactions to be synced. This is a good test for boundary conditions, but it would be beneficial to also include a test where the height is exactly equal to the TTL to ensure that edge case is handled correctly.
// Add a test case where height == ttl| func TestUnorderedTxManager_SimpleSize(t *testing.T) { | ||
| txm := unorderedtx.NewManager(t.TempDir()) | ||
| defer func() { | ||
| require.NoError(t, txm.Close()) | ||
| }() | ||
|
|
||
| txm.Start() | ||
|
|
||
| txm.Add([32]byte{0xFF}, 100) | ||
| txm.Add([32]byte{0xAA}, 100) | ||
| txm.Add([32]byte{0xCC}, 100) | ||
|
|
||
| require.Equal(t, 3, txm.Size()) |
There was a problem hiding this comment.
The test TestUnorderedTxManager_SimpleSize checks the size of the manager after adding transactions. Ensure that the test covers scenarios where transactions are removed or expired to verify that the size is updated correctly.
// Add tests for removal and expiration of transactions.| func TestUnorderedTxManager_SimpleContains(t *testing.T) { | ||
| txm := unorderedtx.NewManager(t.TempDir()) | ||
| defer func() { | ||
| require.NoError(t, txm.Close()) | ||
| }() | ||
|
|
||
| txm.Start() | ||
|
|
||
| for i := 0; i < 10; i++ { | ||
| txHash := [32]byte{byte(i)} | ||
| txm.Add(txHash, 100) | ||
| require.True(t, txm.Contains(txHash)) | ||
| } | ||
|
|
||
| for i := 10; i < 20; i++ { | ||
| txHash := [32]byte{byte(i)} | ||
| require.False(t, txm.Contains(txHash)) | ||
| } |
There was a problem hiding this comment.
The test TestUnorderedTxManager_SimpleContains correctly checks if the manager contains certain transactions. It would be beneficial to also test the behavior when transactions are removed or have expired.
// Add tests for checking contains after transactions are removed or expired.| func TestUnorderedTxManager_CloseInit(t *testing.T) { | ||
| dataDir := t.TempDir() | ||
| txm := unorderedtx.NewManager(dataDir) | ||
| txm.Start() | ||
|
|
||
| // add a handful of unordered txs | ||
| for i := 0; i < 100; i++ { | ||
| txm.Add([32]byte{byte(i)}, 100) | ||
| } | ||
|
|
||
| // close the manager, which should flush all unexpired txs to file | ||
| require.NoError(t, txm.Close()) | ||
|
|
||
| // create a new manager, start it | ||
| txm2 := unorderedtx.NewManager(dataDir) | ||
| defer func() { | ||
| require.NoError(t, txm2.Close()) | ||
| }() | ||
|
|
||
| // start and execute OnInit, which should load the unexpired txs from file | ||
| txm2.Start() | ||
| require.NoError(t, txm2.OnInit()) | ||
| require.Equal(t, 100, txm2.Size()) | ||
|
|
||
| for i := 0; i < 100; i++ { | ||
| require.True(t, txm2.Contains([32]byte{byte(i)})) | ||
| } |
There was a problem hiding this comment.
The test TestUnorderedTxManager_CloseInit checks the persistence of transactions across manager instances. Ensure that the test covers scenarios where transactions expire between manager instances.
// Add tests for transaction expiration between manager instances.| func TestUnorderedTxManager_Flow(t *testing.T) { | ||
| txm := unorderedtx.NewManager(t.TempDir()) | ||
| defer func() { | ||
| require.NoError(t, txm.Close()) | ||
| }() | ||
|
|
||
| txm.Start() | ||
|
|
||
| // Seed the manager with a txs, some of which should eventually be purged and | ||
| // the others will remain. Txs with TTL less than or equal to 50 should be purged. | ||
| for i := 1; i <= 100; i++ { | ||
| txHash := [32]byte{byte(i)} | ||
|
|
||
| if i <= 50 { | ||
| txm.Add(txHash, uint64(i)) | ||
| } else { | ||
| txm.Add(txHash, 100) | ||
| } | ||
| } | ||
|
|
||
| // start a goroutine that mimics new blocks being made every 500ms | ||
| doneBlockCh := make(chan bool) | ||
| go func() { | ||
| ticker := time.NewTicker(time.Millisecond * 500) | ||
| defer ticker.Stop() | ||
|
|
||
| var ( | ||
| height uint64 = 1 | ||
| i = 101 | ||
| ) | ||
| for range ticker.C { | ||
| txm.OnNewBlock(height) | ||
| height++ | ||
|
|
||
| if height > 51 { | ||
| doneBlockCh <- true | ||
| return | ||
| } else { | ||
| txm.Add([32]byte{byte(i)}, 50) | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| // Eventually all the txs that should be expired by block 50 should be purged. | ||
| // The remaining txs should remain. | ||
| require.Eventually( | ||
| t, | ||
| func() bool { | ||
| return txm.Size() == 50 | ||
| }, | ||
| 2*time.Minute, | ||
| 5*time.Second, | ||
| ) | ||
|
|
||
| <-doneBlockCh |
There was a problem hiding this comment.
The test TestUnorderedTxManager_Flow simulates a more complex scenario with transactions being added and purged over time. Ensure that the test covers edge cases such as transactions expiring on the boundary of the TTL.
// Add edge case tests for transactions expiring on the boundary of the TTL.| func DefaultSigVerificationGasConsumer(meter storetypes.GasMeter, sig signing.SignatureV2, params types.Params) error { | ||
| pubkey := sig.PubKey | ||
|
|
||
| switch pubkey := pubkey.(type) { | ||
| case *ed25519.PubKey: | ||
| meter.ConsumeGas(params.SigVerifyCostED25519, "ante verify: ed25519") |
There was a problem hiding this comment.
The previous comment by coderabbitai regarding the handling of ED25519 public keys is still valid. The code should be updated to reflect the commented intention of not returning an error for ED25519 public keys.
- return errorsmod.Wrap(sdkerrors.ErrInvalidPubKey, "ED25519 public keys are unsupported")
+ // return errorsmod.Wrap(sdkerrors.ErrInvalidPubKey, "ED25519 public keys are unsupported")Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
| func DefaultSigVerificationGasConsumer(meter storetypes.GasMeter, sig signing.SignatureV2, params types.Params) error { | |
| pubkey := sig.PubKey | |
| switch pubkey := pubkey.(type) { | |
| case *ed25519.PubKey: | |
| meter.ConsumeGas(params.SigVerifyCostED25519, "ante verify: ed25519") | |
| func DefaultSigVerificationGasConsumer(meter storetypes.GasMeter, sig signing.SignatureV2, params types.Params) error { | |
| pubkey := sig.PubKey | |
| switch pubkey := pubkey.(type) { | |
| case *ed25519.PubKey: | |
| meter.ConsumeGas(params.SigVerifyCostED25519, "ante verify: ed25519") | |
| // return errorsmod.Wrap(sdkerrors.ErrInvalidPubKey, "ED25519 public keys are unsupported") |
| } | ||
|
|
||
| func (m *Manager) Start() { | ||
| go m.purgeLoop() |
Check notice
Code scanning / CodeQL
Spawning a Go routine
| for txHash, ttl := range m.txHashes { | ||
| chunk := unorderedTxToBytes(txHash, ttl) | ||
|
|
||
| if _, err = w.Write(chunk); err != nil { | ||
| return fmt.Errorf("failed to write unordered tx to buffer: %w", err) | ||
| } | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map
|
There is a test case in |
| string memo = 2; | ||
| int64 timeout_height = 3; | ||
| uint64 some_new_field = 4; | ||
| uint64 some_new_field = 5; |
There was a problem hiding this comment.
Changing the field number of an existing field can break backward compatibility as serialized data will not be correctly understood by the new schema. Ensure that this change is intentional and that all consumers of this protobuf are updated accordingly.
| string memo = 2; | ||
| int64 timeout_height = 3; | ||
| uint64 some_new_field = 4; | ||
| uint64 some_new_field = 5; |
There was a problem hiding this comment.
It's because a new field was added with tag 4 to TxBody which in TestUpdatedTxBody is some_new_field.
From what I understand, the rejecting of unknown fields happen by checking the tag numbers, so TxBody didn't have a tag 4 but because TestUpdatedTxBody did have it it was erroring (as expected). Now a field with tag 4 is expected, so to trigger the error of an unexpected field we have to modify the proto message used for this test.
There was a problem hiding this comment.
good find!! could you add a comment to not confuse future readers
There was a problem hiding this comment.
added a small comment on the test case 👌
There was a problem hiding this comment.
Yeah this was such a footgun -- never would've found out. Thanks!
Co-authored-by: yihuang <huang@crypto.com> Co-authored-by: Facundo <facundomedica@gmail.com>
Description
ref: #13009
ref: #18553
This PR introduces the implementation outlined in ADR-070 -- unordered transactions.
Note, this PR is one of two total PRs, which primarily deals with the core business logic such as the AnteHandler decorators and the manager(map) implementation.
A 2nd PR will be made that addresses state management of the manager (map). A proposal for such an implementation can be found here: #18739
Author Checklist
All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.
I have...
!in the type prefix if API or client breaking changeCHANGELOG.mdReviewers Checklist
All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.
I have...