diff --git a/apps/evm/single/entrypoint.sh b/apps/evm/single/entrypoint.sh index a91c70d4a9..e23caa76dd 100755 --- a/apps/evm/single/entrypoint.sh +++ b/apps/evm/single/entrypoint.sh @@ -88,6 +88,10 @@ if [ -n "$DA_NAMESPACE" ]; then default_flags="$default_flags --rollkit.da.namespace $DA_NAMESPACE" fi +if [ -n "$DA_SIGNING_ADDRESSES" ]; then + default_flags="$default_flags --rollkit.da.signing_addresses $DA_SIGNING_ADDRESSES" +fi + # If no arguments passed, show help if [ $# -eq 0 ]; then exec evm-single diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index b1c43d6cff..39c7b1615e 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -3,6 +3,7 @@ package submitting import ( "bytes" "context" + "encoding/json" "fmt" "time" @@ -13,6 +14,7 @@ import ( "github.com/evstack/ev-node/block/internal/common" coreda "github.com/evstack/ev-node/core/da" "github.com/evstack/ev-node/pkg/config" + pkgda "github.com/evstack/ev-node/pkg/da" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/rpc/server" "github.com/evstack/ev-node/pkg/signer" @@ -124,6 +126,9 @@ type DASubmitter struct { // calculate namespaces bytes once and reuse them namespaceBz []byte namespaceDataBz []byte + + // address selector for multi-account support + addressSelector pkgda.AddressSelector } // NewDASubmitter creates a new DA submitter @@ -147,6 +152,17 @@ func NewDASubmitter( metrics = common.NopMetrics() } + // Create address selector based on configuration + var addressSelector pkgda.AddressSelector + if len(config.DA.SigningAddresses) > 0 { + addressSelector = pkgda.NewRoundRobinSelector(config.DA.SigningAddresses) + daSubmitterLogger.Info(). + Int("num_addresses", len(config.DA.SigningAddresses)). + Msg("initialized round-robin address selector for multi-account DA submissions") + } else { + addressSelector = pkgda.NewNoOpSelector() + } + return &DASubmitter{ da: da, config: config, @@ -156,6 +172,7 @@ func NewDASubmitter( logger: daSubmitterLogger, namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(), namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(), + addressSelector: addressSelector, } } @@ -235,7 +252,6 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, cache cache.Manager) er "header", s.namespaceBz, []byte(s.config.DA.SubmitOptions), - cache, func() uint64 { return cache.NumPendingHeaders() }, ) } @@ -279,7 +295,6 @@ func (s *DASubmitter) SubmitData(ctx context.Context, cache cache.Manager, signe "data", s.namespaceDataBz, []byte(s.config.DA.SubmitOptions), - cache, func() uint64 { return cache.NumPendingData() }, ) } @@ -340,6 +355,44 @@ func (s *DASubmitter) createSignedData(dataList []*types.SignedData, signer sign return signedDataList, nil } +// mergeSubmitOptions merges the base submit options with a signing address. +// If the base options are valid JSON, the signing address is added to the JSON object. +// Otherwise, a new JSON object is created with just the signing address. +// Returns the base options unchanged if no signing address is provided. +func mergeSubmitOptions(baseOptions []byte, signingAddress string) ([]byte, error) { + if signingAddress == "" { + return baseOptions, nil + } + + var optionsMap map[string]interface{} + + // If base options are provided, try to parse them as JSON + if len(baseOptions) > 0 { + // Try to unmarshal existing options, ignoring errors for non-JSON input + if err := json.Unmarshal(baseOptions, &optionsMap); err != nil { + // Not valid JSON - start with empty map + optionsMap = make(map[string]interface{}) + } + } + + // Ensure map is initialized even if unmarshal returned nil + if optionsMap == nil { + optionsMap = make(map[string]interface{}) + } + + // Add or override the signing address + // Note: Uses "signer_address" to match Celestia's TxConfig JSON schema + optionsMap["signer_address"] = signingAddress + + // Marshal back to JSON + mergedOptions, err := json.Marshal(optionsMap) + if err != nil { + return nil, fmt.Errorf("failed to marshal submit options: %w", err) + } + + return mergedOptions, nil +} + // submitToDA is a generic helper for submitting items to the DA layer with retry, backoff, and gas price logic. func submitToDA[T any]( s *DASubmitter, @@ -350,7 +403,6 @@ func submitToDA[T any]( itemType string, namespace []byte, options []byte, - cache cache.Manager, getTotalPendingFn func() uint64, ) error { marshaled, err := marshalItems(ctx, items, marshalFn, itemType) @@ -397,12 +449,24 @@ func submitToDA[T any]( return err } + // Select signing address and merge with options + signingAddress := s.addressSelector.Next() + mergedOptions, err := mergeSubmitOptions(options, signingAddress) + if err != nil { + s.logger.Error().Err(err).Msg("failed to merge submit options with signing address") + return fmt.Errorf("failed to merge submit options: %w", err) + } + + if signingAddress != "" { + s.logger.Debug().Str("signingAddress", signingAddress).Msg("using signing address for DA submission") + } + submitCtx, cancel := context.WithTimeout(ctx, submissionTimeout) defer cancel() // Perform submission start := time.Now() - res := types.SubmitWithHelpers(submitCtx, s.da, s.logger, marshaled, rs.GasPrice, namespace, options) + res := types.SubmitWithHelpers(submitCtx, s.da, s.logger, marshaled, rs.GasPrice, namespace, mergedOptions) s.logger.Debug().Int("attempts", rs.Attempt).Dur("elapsed", time.Since(start)).Uint64("code", uint64(res.Code)).Msg("got SubmitWithHelpers response from celestia") // Record submission result for observability diff --git a/block/internal/submitting/da_submitter_mocks_test.go b/block/internal/submitting/da_submitter_mocks_test.go index fc309e0638..9ef14fba36 100644 --- a/block/internal/submitting/da_submitter_mocks_test.go +++ b/block/internal/submitting/da_submitter_mocks_test.go @@ -86,7 +86,6 @@ func TestSubmitToDA_MempoolRetry_IncreasesGasAndSucceeds(t *testing.T) { nsBz, opts, nil, - nil, ) assert.NoError(t, err) @@ -138,7 +137,6 @@ func TestSubmitToDA_UnknownError_RetriesSameGasThenSucceeds(t *testing.T) { nsBz, opts, nil, - nil, ) assert.NoError(t, err) assert.Equal(t, []float64{5.5, 5.5}, usedGas) @@ -195,7 +193,6 @@ func TestSubmitToDA_TooBig_HalvesBatch(t *testing.T) { nsBz, opts, nil, - nil, ) assert.NoError(t, err) assert.Equal(t, []int{4, 2}, batchSizes) @@ -245,7 +242,6 @@ func TestSubmitToDA_SentinelNoGas_PreservesGasAcrossRetries(t *testing.T) { nsBz, opts, nil, - nil, ) assert.NoError(t, err) assert.Equal(t, []float64{-1, -1}, usedGas) @@ -286,7 +282,6 @@ func TestSubmitToDA_PartialSuccess_AdvancesWindow(t *testing.T) { nsBz, opts, nil, - nil, ) assert.NoError(t, err) assert.Equal(t, 3, totalSubmitted) diff --git a/block/internal/submitting/da_submitter_options_test.go b/block/internal/submitting/da_submitter_options_test.go new file mode 100644 index 0000000000..2f6d17a17d --- /dev/null +++ b/block/internal/submitting/da_submitter_options_test.go @@ -0,0 +1,132 @@ +package submitting + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMergeSubmitOptions_NoSigningAddress(t *testing.T) { + baseOptions := []byte(`{"key":"value"}`) + + result, err := mergeSubmitOptions(baseOptions, "") + require.NoError(t, err) + assert.Equal(t, baseOptions, result, "should return unchanged options when no signing address") +} + +func TestMergeSubmitOptions_EmptyBaseOptions(t *testing.T) { + signingAddress := "celestia1abc123" + + result, err := mergeSubmitOptions([]byte{}, signingAddress) + require.NoError(t, err) + + var resultMap map[string]interface{} + err = json.Unmarshal(result, &resultMap) + require.NoError(t, err) + + assert.Equal(t, signingAddress, resultMap["signer_address"]) +} + +func TestMergeSubmitOptions_ValidJSON(t *testing.T) { + baseOptions := []byte(`{"existing":"option","number":42}`) + signingAddress := "celestia1def456" + + result, err := mergeSubmitOptions(baseOptions, signingAddress) + require.NoError(t, err) + + var resultMap map[string]interface{} + err = json.Unmarshal(result, &resultMap) + require.NoError(t, err) + + assert.Equal(t, "option", resultMap["existing"]) + assert.Equal(t, float64(42), resultMap["number"]) // JSON numbers are float64 + assert.Equal(t, signingAddress, resultMap["signer_address"]) +} + +func TestMergeSubmitOptions_InvalidJSON(t *testing.T) { + baseOptions := []byte(`not-json-content`) + signingAddress := "celestia1ghi789" + + result, err := mergeSubmitOptions(baseOptions, signingAddress) + require.NoError(t, err) + + var resultMap map[string]interface{} + err = json.Unmarshal(result, &resultMap) + require.NoError(t, err) + + // Should create new JSON object with just the signing address + assert.Equal(t, signingAddress, resultMap["signer_address"]) + assert.Len(t, resultMap, 1, "should only contain signing address when base options are invalid JSON") +} + +func TestMergeSubmitOptions_OverrideExistingAddress(t *testing.T) { + baseOptions := []byte(`{"signer_address":"old-address","other":"data"}`) + newAddress := "celestia1new456" + + result, err := mergeSubmitOptions(baseOptions, newAddress) + require.NoError(t, err) + + var resultMap map[string]interface{} + err = json.Unmarshal(result, &resultMap) + require.NoError(t, err) + + assert.Equal(t, newAddress, resultMap["signer_address"], "should override existing signing address") + assert.Equal(t, "data", resultMap["other"]) +} + +func TestMergeSubmitOptions_NilBaseOptions(t *testing.T) { + signingAddress := "celestia1jkl012" + + result, err := mergeSubmitOptions(nil, signingAddress) + require.NoError(t, err) + + var resultMap map[string]interface{} + err = json.Unmarshal(result, &resultMap) + require.NoError(t, err) + + assert.Equal(t, signingAddress, resultMap["signer_address"]) +} + +func TestMergeSubmitOptions_ComplexJSON(t *testing.T) { + baseOptions := []byte(`{ + "nested": { + "key": "value" + }, + "array": [1, 2, 3], + "bool": true + }`) + signingAddress := "celestia1complex" + + result, err := mergeSubmitOptions(baseOptions, signingAddress) + require.NoError(t, err) + + var resultMap map[string]interface{} + err = json.Unmarshal(result, &resultMap) + require.NoError(t, err) + + // Check nested structure is preserved + nested, ok := resultMap["nested"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "value", nested["key"]) + + // Check array is preserved + array, ok := resultMap["array"].([]interface{}) + require.True(t, ok) + assert.Len(t, array, 3) + + // Check bool is preserved + assert.Equal(t, true, resultMap["bool"]) + + // Check signing address was added + assert.Equal(t, signingAddress, resultMap["signer_address"]) +} + +func TestMergeSubmitOptions_NullJSON(t *testing.T) { + base := []byte("null") + merged, err := mergeSubmitOptions(base, `{"signer_address": "abc"}`) + require.NoError(t, err) + require.NotNil(t, merged) + require.Contains(t, string(merged), "signer_address") +} diff --git a/core/da/dummy.go b/core/da/dummy.go index a1b9ca2e90..0f9fd38244 100644 --- a/core/da/dummy.go +++ b/core/da/dummy.go @@ -221,7 +221,7 @@ func (d *DummyDA) SubmitWithOptions(ctx context.Context, blobs []Blob, gasPrice d.blobs[idStr] = blob d.commitments[idStr] = commitment - d.proofs[idStr] = commitment // Simple proof + d.proofs[idStr] = commitment // Simple proof d.namespaceByID[idStr] = namespace // Store namespace for this blob ids = append(ids, id) diff --git a/docs/learn/config.md b/docs/learn/config.md index 0321b12e77..ff855f28aa 100644 --- a/docs/learn/config.md +++ b/docs/learn/config.md @@ -24,6 +24,7 @@ This document provides a comprehensive reference for all configuration options a - [DA Gas Price](#da-gas-price) - [DA Gas Multiplier](#da-gas-multiplier) - [DA Submit Options](#da-submit-options) + - [DA Signing Addresses](#da-signing-addresses) - [DA Namespace](#da-namespace) - [DA Header Namespace](#da-namespace) - [DA Data Namespace](#da-data-namespace) @@ -378,13 +379,15 @@ _Constant:_ `FlagDAGasMultiplier` ### DA Submit Options **Description:** -Additional options passed to the DA layer when submitting data. The format and meaning of these options depend on the specific DA implementation being used. +Additional options passed to the DA layer when submitting data. The format and meaning of these options depend on the specific DA implementation being used. For example, with Celestia, this can include custom gas settings or other submission parameters in JSON format. + +**Note:** If you configure multiple signing addresses (see [DA Signing Addresses](#da-signing-addresses)), the selected signing address will be automatically merged into these options as a JSON field `signer_address` (matching Celestia's TxConfig schema). If the base options are already valid JSON, the signing address is added to the existing object; otherwise, a new JSON object is created. **YAML:** ```yaml da: - submit_options: "{"key":"value"}" # Example, format depends on DA layer + submit_options: '{"key":"value"}' # Example, format depends on DA layer ``` **Command-line Flag:** @@ -393,6 +396,41 @@ _Example:_ `--rollkit.da.submit_options '{"custom_param":true}'` _Default:_ `""` (empty) _Constant:_ `FlagDASubmitOptions` +### DA Signing Addresses + +**Description:** +A comma-separated list of signing addresses to use for DA blob submissions. When multiple addresses are provided, they will be used in round-robin fashion to prevent sequence mismatches that can occur with high-throughput Cosmos SDK-based DA layers. This is particularly useful for Celestia when submitting many transactions concurrently. + +Each submission will select the next address in the list, and that address will be automatically added to the `submit_options` as `signer_address`. This ensures that the DA layer (e.g., celestia-node) uses the specified account for signing that particular blob submission. + +**Setup Requirements:** + +- All addresses must be loaded into the DA node's keyring and have sufficient funds for transaction fees +- For Celestia, see the guide on setting up multiple accounts in the DA node documentation + +**YAML:** + +```yaml +da: + signing_addresses: + - "celestia1abc123..." + - "celestia1def456..." + - "celestia1ghi789..." +``` + +**Command-line Flag:** +`--evnode.da.signing_addresses ` +_Example:_ `--rollkit.da.signing_addresses celestia1abc...,celestia1def...,celestia1ghi...` +_Default:_ `[]` (empty, uses default DA node behavior) +_Constant:_ `FlagDASigningAddresses` + +**Behavior:** + +- If no signing addresses are configured, submissions use the DA layer's default signing behavior +- If one address is configured, all submissions use that address +- If multiple addresses are configured, they are used in round-robin order to distribute the load and prevent nonce/sequence conflicts +- The address selection is thread-safe for concurrent submissions + ### DA Namespace **Description:** diff --git a/pkg/config/config.go b/pkg/config/config.go index eef506b075..8e7f8cce26 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -70,6 +70,8 @@ const ( FlagDADataNamespace = FlagPrefixEvnode + "da.data_namespace" // FlagDASubmitOptions is a flag for data availability submit options FlagDASubmitOptions = FlagPrefixEvnode + "da.submit_options" + // FlagDASigningAddresses is a flag for specifying multiple DA signing addresses + FlagDASigningAddresses = FlagPrefixEvnode + "da.signing_addresses" // FlagDAMempoolTTL is a flag for specifying the DA mempool TTL FlagDAMempoolTTL = FlagPrefixEvnode + "da.mempool_ttl" // FlagDAMaxSubmitAttempts is a flag for specifying the maximum DA submit attempts @@ -162,6 +164,7 @@ type DAConfig struct { GasPrice float64 `mapstructure:"gas_price" yaml:"gas_price" comment:"Gas price for data availability transactions. Use -1 for automatic gas price determination. Higher values may result in faster inclusion."` GasMultiplier float64 `mapstructure:"gas_multiplier" yaml:"gas_multiplier" comment:"Multiplier applied to gas price when retrying failed DA submissions. Values > 1 increase gas price on retries to improve chances of inclusion."` SubmitOptions string `mapstructure:"submit_options" yaml:"submit_options" comment:"Additional options passed to the DA layer when submitting data. Format depends on the specific DA implementation being used."` + SigningAddresses []string `mapstructure:"signing_addresses" yaml:"signing_addresses" comment:"List of addresses to use for DA submissions. When multiple addresses are provided, they will be used in round-robin fashion to prevent sequence mismatches. Useful for high-throughput chains."` Namespace string `mapstructure:"namespace" yaml:"namespace" comment:"Namespace ID used when submitting blobs to the DA layer. When a DataNamespace is provided, only the header is sent to this namespace."` DataNamespace string `mapstructure:"data_namespace" yaml:"data_namespace" comment:"Namespace ID for submitting data to DA layer. Use this to speed-up light clients."` BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Average block time of the DA chain (duration). Determines frequency of DA layer syncing, maximum backoff time for retries, and is multiplied by MempoolTTL to calculate transaction expiration. Examples: \"15s\", \"30s\", \"1m\", \"2m30s\", \"10m\"."` @@ -328,6 +331,7 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().String(FlagDANamespace, def.DA.Namespace, "DA namespace for header (or blob) submissions") cmd.Flags().String(FlagDADataNamespace, def.DA.DataNamespace, "DA namespace for data submissions") cmd.Flags().String(FlagDASubmitOptions, def.DA.SubmitOptions, "DA submit options") + cmd.Flags().StringSlice(FlagDASigningAddresses, def.DA.SigningAddresses, "Comma-separated list of addresses for DA submissions (used in round-robin)") cmd.Flags().Uint64(FlagDAMempoolTTL, def.DA.MempoolTTL, "number of DA blocks until transaction is dropped from the mempool") cmd.Flags().Int(FlagDAMaxSubmitAttempts, def.DA.MaxSubmitAttempts, "maximum number of attempts to submit data to the DA layer before giving up") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 7116e26362..c9284076b7 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -73,6 +73,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagDAGasMultiplier, DefaultConfig().DA.GasMultiplier) assertFlagValue(t, flags, FlagDANamespace, DefaultConfig().DA.Namespace) assertFlagValue(t, flags, FlagDASubmitOptions, DefaultConfig().DA.SubmitOptions) + assertFlagValue(t, flags, FlagDASigningAddresses, DefaultConfig().DA.SigningAddresses) assertFlagValue(t, flags, FlagDAMempoolTTL, DefaultConfig().DA.MempoolTTL) assertFlagValue(t, flags, FlagDAMaxSubmitAttempts, DefaultConfig().DA.MaxSubmitAttempts) @@ -104,7 +105,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRPCAddress, DefaultConfig().RPC.Address) // Count the number of flags we're explicitly checking - expectedFlagCount := 39 // Update this number if you add more flag checks above + expectedFlagCount := 40 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/da/selector.go b/pkg/da/selector.go new file mode 100644 index 0000000000..d5bb996fd0 --- /dev/null +++ b/pkg/da/selector.go @@ -0,0 +1,62 @@ +package da + +import ( + "sync/atomic" +) + +// AddressSelector defines the interface for selecting a signing address from a list. +type AddressSelector interface { + // Next returns the next address to use for signing. + // Implementations may return empty string (NoOpSelector) or panic (RoundRobinSelector with no addresses). + Next() string +} + +// RoundRobinSelector implements round-robin selection of signing addresses. +// This helps prevent sequence mismatches in Cosmos SDK when submitting +// multiple transactions concurrently. +type RoundRobinSelector struct { + addresses []string + counter atomic.Uint64 +} + +// NewRoundRobinSelector creates a new round-robin address selector. +// Panics if addresses is empty - use NewNoOpSelector instead. +func NewRoundRobinSelector(addresses []string) *RoundRobinSelector { + if len(addresses) == 0 { + panic("NewRoundRobinSelector: addresses slice is empty; use NewNoOpSelector instead") + } + return &RoundRobinSelector{ + addresses: addresses, + } +} + +// Next returns the next address in round-robin fashion. +// Thread-safe for concurrent access. +// Panics if no addresses are configured - this indicates a programming error. +func (s *RoundRobinSelector) Next() string { + if len(s.addresses) == 0 { + panic("RoundRobinSelector.Next: no addresses configured; use NewNoOpSelector instead") + } + + if len(s.addresses) == 1 { + return s.addresses[0] + } + + // Atomically increment and get the previous value for this call + index := s.counter.Add(1) - 1 + return s.addresses[index%uint64(len(s.addresses))] +} + +// NoOpSelector always returns an empty string. +// Used when no signing addresses are configured. +type NoOpSelector struct{} + +// NewNoOpSelector creates a selector that returns no address. +func NewNoOpSelector() *NoOpSelector { + return &NoOpSelector{} +} + +// Next returns an empty string. +func (s *NoOpSelector) Next() string { + return "" +} diff --git a/pkg/da/selector_test.go b/pkg/da/selector_test.go new file mode 100644 index 0000000000..c25a35e682 --- /dev/null +++ b/pkg/da/selector_test.go @@ -0,0 +1,151 @@ +package da + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRoundRobinSelector_EmptyList(t *testing.T) { + // Should panic when creating selector with empty address list + assert.Panics(t, func() { + NewRoundRobinSelector([]string{}) + }, "should panic when creating RoundRobinSelector with empty address list") +} + +func TestRoundRobinSelector_NextWithoutAddresses(t *testing.T) { + // Should panic if Next is called on a selector with no addresses + // (e.g., if someone creates the struct directly without using the constructor) + selector := &RoundRobinSelector{addresses: []string{}} + assert.Panics(t, func() { + selector.Next() + }, "should panic when calling Next with no addresses configured") +} + +func TestRoundRobinSelector_SingleAddress(t *testing.T) { + addresses := []string{"celestia1abc123"} + selector := NewRoundRobinSelector(addresses) + + // All calls should return the same address + for i := 0; i < 10; i++ { + addr := selector.Next() + assert.Equal(t, "celestia1abc123", addr, "should always return the single address") + } +} + +func TestRoundRobinSelector_MultipleAddresses(t *testing.T) { + addresses := []string{ + "celestia1abc123", + "celestia1def456", + "celestia1ghi789", + } + selector := NewRoundRobinSelector(addresses) + + // First round + assert.Equal(t, "celestia1abc123", selector.Next()) + assert.Equal(t, "celestia1def456", selector.Next()) + assert.Equal(t, "celestia1ghi789", selector.Next()) + + // Second round - should cycle back + assert.Equal(t, "celestia1abc123", selector.Next()) + assert.Equal(t, "celestia1def456", selector.Next()) + assert.Equal(t, "celestia1ghi789", selector.Next()) +} + +func TestRoundRobinSelector_Concurrent(t *testing.T) { + addresses := []string{ + "celestia1abc123", + "celestia1def456", + "celestia1ghi789", + } + selector := NewRoundRobinSelector(addresses) + + const numGoroutines = 100 + const numCallsPerGoroutine = 100 + + results := make([]string, numGoroutines*numCallsPerGoroutine) + var wg sync.WaitGroup + + // Launch concurrent goroutines + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(start int) { + defer wg.Done() + for j := 0; j < numCallsPerGoroutine; j++ { + addr := selector.Next() + results[start+j] = addr + } + }(i * numCallsPerGoroutine) + } + + wg.Wait() + + // Verify all results are valid addresses + for _, addr := range results { + require.Contains(t, addresses, addr, "all returned addresses should be from the configured list") + } + + // Count occurrences of each address + counts := make(map[string]int) + for _, addr := range results { + counts[addr]++ + } + + // Each address should be used approximately equally (within 10% tolerance) + expectedCount := len(results) / len(addresses) + tolerance := expectedCount / 10 + + for _, addr := range addresses { + count := counts[addr] + assert.InDelta(t, expectedCount, count, float64(tolerance), + "address %s should be used approximately %d times, got %d", addr, expectedCount, count) + } +} + +func TestRoundRobinSelector_WrapAround(t *testing.T) { + addresses := []string{"addr1", "addr2"} + selector := NewRoundRobinSelector(addresses) + + // Test wrap around behavior with large number of calls + seen := make(map[string]int) + for i := 0; i < 1000; i++ { + addr := selector.Next() + seen[addr]++ + } + + // Both addresses should be used 500 times each + assert.Equal(t, 500, seen["addr1"]) + assert.Equal(t, 500, seen["addr2"]) +} + +func TestNoOpSelector(t *testing.T) { + selector := NewNoOpSelector() + + // Should always return empty string + for i := 0; i < 10; i++ { + addr := selector.Next() + assert.Empty(t, addr, "NoOpSelector should always return empty string") + } +} + +func TestNoOpSelector_Concurrent(t *testing.T) { + selector := NewNoOpSelector() + + const numGoroutines = 50 + var wg sync.WaitGroup + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + addr := selector.Next() + assert.Empty(t, addr) + } + }() + } + + wg.Wait() +} diff --git a/sequencers/single/queue_test.go b/sequencers/single/queue_test.go index e2be3eaee3..0ede59a90e 100644 --- a/sequencers/single/queue_test.go +++ b/sequencers/single/queue_test.go @@ -283,12 +283,12 @@ func TestLoad_WithMixedData(t *testing.T) { require.Equal(2, bq.Size(), "Queue should contain only the 2 valid batches") // Check hashes to be sure (order might vary depending on datastore query) loadedHashes := make(map[string]bool) -bq.mu.Lock() -for i := bq.head; i < len(bq.queue); i++ { - h, _ := bq.queue[i].Hash() - loadedHashes[hex.EncodeToString(h)] = true -} -bq.mu.Unlock() + bq.mu.Lock() + for i := bq.head; i < len(bq.queue); i++ { + h, _ := bq.queue[i].Hash() + loadedHashes[hex.EncodeToString(h)] = true + } + bq.mu.Unlock() require.True(loadedHashes[hexHash1], "Valid batch 1 not found in queue") require.True(loadedHashes[hexHash2], "Valid batch 2 not found in queue")