From 5b189f5f6c6fe8a7763b42dc5a99a146d6921c36 Mon Sep 17 00:00:00 2001 From: Matt Leon Date: Thu, 30 Oct 2025 10:56:56 +0100 Subject: [PATCH 1/6] rpcclient: support canceling in-flight http requests Use a shutdown-aware context for HTTP POST handling so shutdown can interrupt in-flight requests. Centralize shutdown error remapping in sendPostRequestAndRespond so all error exits consistently return ErrClientShutdown when shutdown causes a context cancellation. Move the retrying HTTP POST path into sendPostRequestWithRetry and cover it with shutdown regression tests. --- rpcclient/infrastructure.go | 131 +++++++---- rpcclient/infrastructure_test.go | 382 ++++++++++++++++++++++++++++++- 2 files changed, 458 insertions(+), 55 deletions(-) diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index e34fc1827e..694e9926fe 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -98,6 +98,10 @@ const ( // defaultHTTPTimeout is the default timeout for an http request, so the // request does not block indefinitely. defaultHTTPTimeout = time.Minute + + // sendPostRequestTries is the number of times to retry failed HTTP POST + // requests before giving up. + sendPostRequestTries = 10 ) // jsonRequest holds information about a json request that is used to properly @@ -766,46 +770,54 @@ out: // handleSendPostMessage handles performing the passed HTTP request, reading the // result, unmarshalling it, and delivering the unmarshalled result to the // provided response channel. -func (c *Client) handleSendPostMessage(jReq *jsonRequest) { +func (c *Client) handleSendPostMessage(ctx context.Context, jReq *jsonRequest) { + c.sendPostRequestAndRespond(ctx, jReq, sendPostRequestTries) +} + +// sendPostRequestWithRetry performs HTTP POST retries and decodes the response +// result. It returns the raw transport error so callers can decide how to map +// shutdown-driven cancellation. +func sendPostRequestWithRetry(ctx context.Context, jReq *jsonRequest, + tries int, httpClient *http.Client, config *ConnConfig, + batch bool) ([]byte, error) { + var ( lastErr error backoff time.Duration httpResponse *http.Response + err error ) - httpURL, err := c.config.httpURL() + httpURL, err := config.httpURL() if err != nil { - jReq.responseChan <- &Response{ - err: fmt.Errorf("failed to parse address %v", err), - } - return + return nil, fmt.Errorf("failed to parse address %v", err) } - tries := 10 +retryloop: for i := 0; i < tries; i++ { var httpReq *http.Request bodyReader := bytes.NewReader(jReq.marshalledJSON) - httpReq, err = http.NewRequest("POST", httpURL, bodyReader) + httpReq, err = http.NewRequestWithContext( + ctx, "POST", httpURL, bodyReader, + ) if err != nil { - jReq.responseChan <- &Response{result: nil, err: err} - return + return nil, err } httpReq.Close = true httpReq.Header.Set("Content-Type", "application/json") - for key, value := range c.config.ExtraHeaders { + for key, value := range config.ExtraHeaders { httpReq.Header.Set(key, value) } // Configure basic access authorization. - user, pass, err := c.config.getAuth() - if err != nil { - jReq.responseChan <- &Response{result: nil, err: err} - return + user, pass, authErr := config.getAuth() + if authErr != nil { + return nil, authErr } httpReq.SetBasicAuth(user, pass) - httpResponse, err = c.httpClient.Do(httpReq) + httpResponse, err = httpClient.Do(httpReq) // Quit the retry loop on success or if we can't retry anymore. if err == nil || i == tries-1 { @@ -830,39 +842,35 @@ func (c *Client) handleSendPostMessage(jReq *jsonRequest) { select { case <-time.After(backoff): - case <-c.shutdown: - return + case <-ctx.Done(): + // Stop retrying as soon as shutdown cancels the request context. + err = ctx.Err() + break retryloop } } if err != nil { - jReq.responseChan <- &Response{err: err} - return + return nil, err } // We still want to return an error if for any reason the response // remains empty. if httpResponse == nil { - jReq.responseChan <- &Response{ - err: fmt.Errorf("invalid http POST response (nil), "+ - "method: %s, id: %d, last error=%v", - jReq.method, jReq.id, lastErr), - } - return + return nil, fmt.Errorf("invalid http POST response (nil), "+ + "method: %s, id: %d, last error=%v", + jReq.method, jReq.id, lastErr) } // Read the raw bytes and close the response. respBytes, err := io.ReadAll(httpResponse.Body) httpResponse.Body.Close() if err != nil { - err = fmt.Errorf("error reading json reply: %v", err) - jReq.responseChan <- &Response{err: err} - return + return nil, fmt.Errorf("error reading json reply: %w", err) } // Try to unmarshal the response as a regular JSON-RPC response. var resp rawResponse var batchResponse json.RawMessage - if c.batch { + if batch { err = json.Unmarshal(respBytes, &batchResponse) } else { err = json.Unmarshal(respBytes, &resp) @@ -871,50 +879,70 @@ func (c *Client) handleSendPostMessage(jReq *jsonRequest) { // When the response itself isn't a valid JSON-RPC response // return an error which includes the HTTP status code and raw // response bytes. - err = fmt.Errorf("status code: %d, response: %q", + return nil, fmt.Errorf("status code: %d, response: %q", httpResponse.StatusCode, string(respBytes)) - jReq.responseChan <- &Response{err: err} - return } - var res []byte - if c.batch { - // errors must be dealt with downstream since a whole request cannot - // "error out" other than through the status code error handled above - res, err = batchResponse, nil - } else { - res, err = resp.result() + + if batch { + // Errors must be dealt with downstream since a whole request + // cannot "error out" other than through the status code error + // handled above. + return batchResponse, nil + } + + return resp.result() +} + +// sendPostRequestAndRespond runs the retrying POST path and sends the final +// result to the waiting response channel. +func (c *Client) sendPostRequestAndRespond(ctx context.Context, + jReq *jsonRequest, tries int) { + + res, err := sendPostRequestWithRetry( + ctx, jReq, tries, c.httpClient, c.config, c.batch, + ) + + // Preserve the client contract that shutdown-related cancellations surface + // as ErrClientShutdown, even when the transport reports context.Canceled. + if errors.Is(err, context.Canceled) && + errors.Is(context.Cause(ctx), ErrClientShutdown) { + + err = ErrClientShutdown + } + + jReq.responseChan <- &Response{ + result: res, + err: err, } - jReq.responseChan <- &Response{result: res, err: err} } // sendPostHandler handles all outgoing messages when the client is running // in HTTP POST mode. It uses a buffered channel to serialize output messages // while allowing the sender to continue running asynchronously. It must be run // as a goroutine. -func (c *Client) sendPostHandler() { +func (c *Client) sendPostHandler(ctx context.Context) { out: for { // Send any messages ready for send until the shutdown channel // is closed. select { case jReq := <-c.sendPostChan: - c.handleSendPostMessage(jReq) + c.handleSendPostMessage(ctx, jReq) - case <-c.shutdown: + case <-ctx.Done(): break out } } + err := context.Cause(ctx) + // Drain any wait channels before exiting so nothing is left waiting // around to send. cleanup: for { select { case jReq := <-c.sendPostChan: - jReq.responseChan <- &Response{ - result: nil, - err: ErrClientShutdown, - } + jReq.responseChan <- &Response{result: nil, err: err} default: break cleanup @@ -1178,8 +1206,13 @@ func (c *Client) start() { // Start the I/O processing handlers depending on whether the client is // in HTTP POST mode or the default websocket mode. if c.config.HTTPPostMode { + ctx, cancel := context.WithCancelCause(context.Background()) c.wg.Add(1) - go c.sendPostHandler() + go c.sendPostHandler(ctx) + go func() { + <-c.shutdown + cancel(ErrClientShutdown) + }() } else { c.wg.Add(3) go func() { diff --git a/rpcclient/infrastructure_test.go b/rpcclient/infrastructure_test.go index 8416b7ad3c..90795b5669 100644 --- a/rpcclient/infrastructure_test.go +++ b/rpcclient/infrastructure_test.go @@ -1,11 +1,208 @@ package rpcclient import ( + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "strings" + "sync/atomic" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +// postRoundTripFunc adapts a function to implement http.RoundTripper. +type postRoundTripFunc func(*http.Request) (*http.Response, error) + +// RoundTrip invokes the wrapped test transport function. +func (f postRoundTripFunc) RoundTrip( + req *http.Request) (*http.Response, error) { + + return f(req) +} + +// cancelOnReadBody is a test response body that blocks reads until context +// cancellation is observed. +type cancelOnReadBody struct { + // ctx is the request context that drives cancellation. + ctx context.Context + // readStarted is closed when the first read call starts. + readStarted chan struct{} + // stage names the part of the request flow waiting on cancellation. + stage string +} + +// Read blocks until the request context is canceled, then returns that error. +func (b *cancelOnReadBody) Read(_ []byte) (int, error) { + select { + case <-b.readStarted: + default: + close(b.readStarted) + } + + return 0, waitForRequestContextCancellation(b.ctx, b.stage) +} + +// Close implements io.Closer for the test body. +func (b *cancelOnReadBody) Close() error { + return nil +} + +// newPostModeTestClient builds a minimal HTTP POST-mode client for transport +// behavior tests. +func newPostModeTestClient(rt http.RoundTripper) *Client { + return &Client{ + config: &ConnConfig{ + Host: "127.0.0.1:8332", + User: "user", + Pass: "pass", + DisableTLS: true, + HTTPPostMode: true, + }, + httpClient: &http.Client{ + Transport: rt, + }, + } +} + +// newPostTestRequest creates a minimal JSON-RPC request used by POST handler +// tests. +func newPostTestRequest() *jsonRequest { + body := `{"jsonrpc":"1.0","id":1,"method":"getblockcount","params":[]}` + + return &jsonRequest{ + id: 1, + method: "getblockcount", + marshalledJSON: []byte(body), + responseChan: make(chan *Response, 1), + } +} + +// waitForRequestContextCancellation bounds shutdown waits so a missing request +// context propagation becomes a symptomatic test failure instead of a hang. +func waitForRequestContextCancellation( + ctx context.Context, stage string) error { + + select { + case <-ctx.Done(): + return ctx.Err() + + case <-time.After(100 * time.Millisecond): + return fmt.Errorf("request context was not canceled during %s", + stage) + } +} + +// sendPostShutdownScenario describes one shutdown path that should fail if the +// request stops honoring shutdown cancellation. +type sendPostShutdownScenario struct { + // name is the subtest name for the shutdown path. + name string + + // tries is the retry count passed to the POST send helper under test. + tries int + + // newClient builds a POST-mode client whose transport triggers the + // shutdown path for this scenario. + newClient func(context.CancelCauseFunc, *int32) *Client + + // wantAttempts is the expected number of transport attempts before the + // scenario terminates. + wantAttempts int32 + + // wantErrContains is an optional substring that must appear in the raw + // helper error for this scenario. + wantErrContains string +} + +// sendPostShutdownScenarios enumerates the shutdown-triggered regression +// scenarios shared by the helper-level and wrapper-level POST tests. +var sendPostShutdownScenarios = []sendPostShutdownScenario{ + { + name: "during_retry_backoff", + tries: 2, + newClient: func(cancel context.CancelCauseFunc, + attempts *int32) *Client { + + return newPostModeTestClient(postRoundTripFunc( + func(*http.Request) (*http.Response, error) { + if atomic.AddInt32(attempts, 1) == 1 { + cancel(ErrClientShutdown) + } + + return nil, errors.New( + "transient transport error", + ) + }, + )) + }, + wantAttempts: 1, + }, + { + name: "on_final_retry", + tries: 2, + newClient: func(cancel context.CancelCauseFunc, + attempts *int32) *Client { + + return newPostModeTestClient(postRoundTripFunc( + func(req *http.Request) (*http.Response, error) { + current := atomic.AddInt32(attempts, 1) + if current == 1 { + return nil, errors.New( + "transient transport error", + ) + } + + // This keeps the case tied to request-context + // propagation instead of injecting context.Canceled + // directly from the fake transport. + cancel(ErrClientShutdown) + return nil, waitForRequestContextCancellation( + req.Context(), "final retry", + ) + }, + )) + }, + wantAttempts: 2, + }, + { + name: "during_body_read", + tries: 1, + newClient: func(cancel context.CancelCauseFunc, + attempts *int32) *Client { + + readStarted := make(chan struct{}) + go func() { + <-readStarted + cancel(ErrClientShutdown) + }() + + return newPostModeTestClient(postRoundTripFunc( + func(req *http.Request) (*http.Response, error) { + atomic.AddInt32(attempts, 1) + + return &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: &cancelOnReadBody{ + ctx: req.Context(), + readStarted: readStarted, + stage: "body read", + }, + }, nil + }, + )) + }, + wantAttempts: 1, + wantErrContains: "error reading json reply", + }, +} + // TestParseAddressString checks different variation of supported and // unsupported addresses. func TestParseAddressString(t *testing.T) { @@ -93,18 +290,191 @@ func TestParseAddressString(t *testing.T) { } for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { addr, err := ParseAddressString(tc.addressString) if tc.expErrStr != "" { require.Error(t, err) require.Contains(t, err.Error(), tc.expErrStr) - return + } else { + require.NoError(t, err) + require.Equal(t, tc.expNetwork, addr.Network()) + require.Equal(t, tc.expAddress, addr.String()) + } + }) + } +} + +// TestSendPostRequestWithRetrySuccess ensures that +// sendPostRequestWithRetry returns a decoded result and no error on +// a successful response. +func TestSendPostRequestWithRetrySuccess(t *testing.T) { + client := newPostModeTestClient(postRoundTripFunc( + func(*http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader( + `{"result":1,"error":null,"id":1}`, + )), + }, nil + }, + )) + jReq := newPostTestRequest() + + result, err := sendPostRequestWithRetry( + context.Background(), jReq, 1, client.httpClient, client.config, + false, + ) + require.NoError(t, err) + require.Equal(t, []byte("1"), result) +} + +// TestSendPostRequestWithRetryShutdown keeps the shutdown regression cases in +// one table while preserving a distinct symptomatic failure for each path. +func TestSendPostRequestWithRetryShutdown(t *testing.T) { + for _, tc := range sendPostShutdownScenarios { + t.Run(tc.name, func(t *testing.T) { + var attempts int32 + ctx, cancel := context.WithCancelCause(context.Background()) + client := tc.newClient(cancel, &attempts) + jReq := newPostTestRequest() + + result, err := sendPostRequestWithRetry( + ctx, jReq, tc.tries, client.httpClient, client.config, + false, + ) + require.Nil(t, result) + require.ErrorIs(t, err, context.Canceled) + if tc.wantErrContains != "" { + require.ErrorContains(t, err, tc.wantErrContains) + } + require.EqualValues(t, tc.wantAttempts, + atomic.LoadInt32(&attempts)) + }) + } +} + +// TestHTTPPostShutdownInterruptsPendingRequest ensures that a client operating +// in HTTP POST mode can interrupt an in-flight request during shutdown. +func TestHTTPPostShutdownInterruptsPendingRequest(t *testing.T) { + t.Parallel() + + // Start a local TCP listener that accepts exactly one HTTP request and + // then blocks until the client side closes the connection. + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + // requestAccepted signals when the test server has accepted the + // client's connection. + requestAccepted := make(chan struct{}) + + // serverDone signals when the server goroutine has exited. + serverDone := make(chan struct{}) + + // Run a minimum server goroutine. It accepts one connection and drains + // the request stream without replying so the client request stays in + // flight. + go func() { + defer close(serverDone) + + conn, err := listener.Accept() + if err != nil { + return + } + defer func() { + err := conn.Close() + assert.NoError(t, err) + }() + + close(requestAccepted) + + _, _ = io.Copy(io.Discard, conn) + }() + + // Ensure the listener is closed and the server goroutine exits. + t.Cleanup(func() { + err := listener.Close() + require.NoError(t, err) + <-serverDone + }) + + // Configure a POST-mode client against the local listener. + connCfg := &ConnConfig{ + Host: listener.Addr().String(), + User: "user", + Pass: "pass", + DisableTLS: true, + HTTPPostMode: true, + } + + // Start the client and register cleanup for idempotent shutdown. + client, err := New(connCfg, nil) + require.NoError(t, err) + t.Cleanup(client.Shutdown) + + // Launch one async request that should remain pending until shutdown. + future := client.GetBlockCountAsync() + + // Ensure the server sees the request before we initiate shutdown. + select { + case <-requestAccepted: + + case <-time.After(2 * time.Second): + t.Fatalf("server did not accept client connection") + } + + // The request should remain pending until shutdown is requested. + select { + case <-future: + t.Fatalf("expected request to remain pending until shutdown") + + case <-time.After(100 * time.Millisecond): + } + + client.Shutdown() + + waitDone := make(chan struct{}) + go func() { + client.WaitForShutdown() + close(waitDone) + }() + + // Wait for shutdown to complete before asserting the final error. + select { + case <-waitDone: + + case <-time.After(5 * time.Second): + t.Fatalf("client shutdown did not complete") + } + + result, err := future.Receive() + require.Zero(t, result) + require.ErrorContains(t, err, ErrClientShutdown.Error()) +} + +// TestSendPostRequestAndRespondShutdown reuses the helper-level shutdown cases +// to verify the client-facing contract: each one must surface +// ErrClientShutdown on the response channel. +func TestSendPostRequestAndRespondShutdown(t *testing.T) { + for _, tc := range sendPostShutdownScenarios { + t.Run(tc.name, func(t *testing.T) { + var attempts int32 + ctx, cancel := context.WithCancelCause(context.Background()) + client := tc.newClient(cancel, &attempts) + jReq := newPostTestRequest() + + go client.sendPostRequestAndRespond(ctx, jReq, tc.tries) + + select { + case resp := <-jReq.responseChan: + require.ErrorIs(t, resp.err, ErrClientShutdown) + require.Nil(t, resp.result) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for response") } - require.NoError(t, err) - require.Equal(t, tc.expNetwork, addr.Network()) - require.Equal(t, tc.expAddress, addr.String()) + + require.EqualValues(t, tc.wantAttempts, + atomic.LoadInt32(&attempts)) }) } } From 13ccec2c9a8b8da6d9ec3f4f36e78dd48d45327e Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Fri, 20 Mar 2026 17:30:51 -0500 Subject: [PATCH 2/6] rpcclient: avoid double-resolving POST requests on shutdown When shutdown races with sendPostRequest, a request could be marked as ErrClientShutdown and still be enqueued. The sendPostHandler cleanup loop would then try to send a second terminal response and could block forever on a full response channel. Fix this by prioritizing the shutdown path. First check shutdown with a non-blocking select and return immediately when it is already closed. Then use a second select to choose between enqueue and shutdown for the remaining race window. A regression test verifies a shutdown request is failed immediately and never enqueued. --- rpcclient/infrastructure.go | 19 +++++++++++++--- rpcclient/infrastructure_test.go | 39 ++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index 694e9926fe..09856c64aa 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -956,19 +956,32 @@ cleanup: // HTTP client associated with the client. It is backed by a buffered channel, // so it will not block until the send channel is full. func (c *Client) sendPostRequest(jReq *jsonRequest) { - // Don't send the message if shutting down. + // Prefer shutdown when it is already closed so this path is + // deterministic. This mirrors addRequest and avoids post-shutdown + // enqueueing. select { case <-c.shutdown: - jReq.responseChan <- &Response{result: nil, err: ErrClientShutdown} + jReq.responseChan <- &Response{ + result: nil, + err: ErrClientShutdown, + } + + return + default: } + // Normal path: either enqueue, or fail if shutdown closes in the race + // window after the guard above. select { case c.sendPostChan <- jReq: log.Tracef("Sent command [%s] with id %d", jReq.method, jReq.id) case <-c.shutdown: - return + jReq.responseChan <- &Response{ + result: nil, + err: ErrClientShutdown, + } } } diff --git a/rpcclient/infrastructure_test.go b/rpcclient/infrastructure_test.go index 90795b5669..bf09c65818 100644 --- a/rpcclient/infrastructure_test.go +++ b/rpcclient/infrastructure_test.go @@ -478,3 +478,42 @@ func TestSendPostRequestAndRespondShutdown(t *testing.T) { }) } } + +// TestSendPostRequestShutdownPrioritizesFailure ensures shutdown always wins +// when it is already closed before sendPostRequest is called. +func TestSendPostRequestShutdownPrioritizesFailure(t *testing.T) { + client := &Client{ + sendPostChan: make(chan *jsonRequest, 1), + shutdown: make(chan struct{}), + } + + close(client.shutdown) + + const attempts = 200 + // The old single-select implementation chose randomly when both channels + // were ready, so repeat enough times to make an accidental enqueue show up. + for i := 0; i < attempts; i++ { + jReq := &jsonRequest{ + id: uint64(i), + method: "getblockcount", + responseChan: make(chan *Response, 1), + } + client.sendPostRequest(jReq) + + select { + case resp := <-jReq.responseChan: + require.ErrorIs(t, resp.err, ErrClientShutdown) + default: + t.Fatalf("request id=%d was not failed immediately", + jReq.id) + } + + select { + case <-client.sendPostChan: + t.Fatalf("request id=%d was enqueued after shutdown", + jReq.id) + + case <-time.After(10 * time.Millisecond): + } + } +} From fac1a225f9692ce35b9b29e4b4f083d8b9160e3a Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Fri, 20 Mar 2026 17:31:51 -0500 Subject: [PATCH 3/6] rpcclient: resolve all batch futures if Send fails Batch requests were only clearing batchList on Send() errors. The per-request futures remained unresolved, so callers waiting on Receive could block forever after a failed batch round trip. Add failBatchRequests to fan out the Send() error to every queued batch request and clear tracking state in one place. A regression test now verifies queued futures complete with the same error returned by Send(). --- rpcclient/infrastructure.go | 39 ++++++++++++++++---- rpcclient/infrastructure_test.go | 62 ++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 6 deletions(-) diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index 09856c64aa..801849733b 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -1762,6 +1762,38 @@ func (c *Client) sendAsync() (FutureGetBulkResult, error) { return responseChan, nil } +// failBatchRequests resolves every queued batch request with the provided error +// and clears all internal request tracking. +// +// This function is safe for concurrent access. +func (c *Client) failBatchRequests(err error) { + c.requestLock.Lock() + defer c.requestLock.Unlock() + + c.batchLock.Lock() + defer c.batchLock.Unlock() + + for e := c.batchList.Front(); e != nil; e = e.Next() { + req := e.Value.(*jsonRequest) + + // Resolve all pending futures on the first batch-level failure + // so callers waiting on Receive don't block indefinitely. + // Safe: batch-mode responseChan buffers are unwritten here, + // so this send won't block while locks are held. Batch-mode + // requests only use addRequest (not sendPostRequest), so each + // responseChan buffer is still empty. + req.responseChan <- &Response{err: err} + } + + c.requestMap = make(map[uint64]*list.Element) + c.batchList = list.New() + + // Batch-mode requests are tracked in batchList, so requestList should + // already be empty. Keep this defensive reset for invariants and future + // call paths. + c.requestList.Init() +} + // Marshall's bulk requests and sends to the server // creates a response channel to receive the response func (c *Client) Send() error { @@ -1772,12 +1804,7 @@ func (c *Client) Send() error { batchResp, err := future.Receive() if err != nil { - // Clear batchlist in case of an error. - - c.batchLock.Lock() - c.batchList = list.New() - c.batchLock.Unlock() - + c.failBatchRequests(err) return err } diff --git a/rpcclient/infrastructure_test.go b/rpcclient/infrastructure_test.go index bf09c65818..5852edf9e8 100644 --- a/rpcclient/infrastructure_test.go +++ b/rpcclient/infrastructure_test.go @@ -517,3 +517,65 @@ func TestSendPostRequestShutdownPrioritizesFailure(t *testing.T) { } } } + +// TestBatchSendErrorResolvesQueuedFutures ensures a batch send failure resolves +// all queued futures instead of leaving them blocked. +func TestBatchSendErrorResolvesQueuedFutures(t *testing.T) { + connCfg := &ConnConfig{ + Host: "127.0.0.1:8332", + User: "user", + Pass: "pass", + DisableTLS: true, + HTTPPostMode: true, + } + + client, err := NewBatch(connCfg) + require.NoError(t, err) + t.Cleanup(func() { + client.Shutdown() + client.WaitForShutdown() + }) + + client.httpClient.Transport = postRoundTripFunc( + func(*http.Request) (*http.Response, error) { + body := io.NopCloser(strings.NewReader("not-json")) + + return &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: body, + }, nil + }, + ) + + f1 := client.GetBlockCountAsync() + f2 := client.GetBlockCountAsync() + + sendErr := client.Send() + require.Error(t, sendErr) + + assertFutureErr := func(f FutureGetBlockCountResult) { + t.Helper() + + done := make(chan error, 1) + // Receive is the blocking caller-facing path. The old bug surfaced here + // by never resolving the future, so bound it with a timeout. + go func() { + _, err := f.Receive() + done <- err + }() + + select { + case err := <-done: + require.Error(t, err) + require.EqualError(t, err, sendErr.Error()) + + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for queued batch future " + + "to resolve") + } + } + + assertFutureErr(f1) + assertFutureErr(f2) +} From 1f00e1a31f1fdd3d6c1ccc685470c34c63c054e7 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Fri, 20 Mar 2026 17:33:07 -0500 Subject: [PATCH 4/6] rpcclient: avoid duplicate batch POST handlers NewBatch called New() and then called start() again. In HTTP POST mode that created a second sendPostHandler and another shutdown-cancel goroutine, which broke the expected single-flight serialization of POST sends. Keep NewBatch as a semantic toggle only: rely on New() to start handlers once, then set batch=true. A regression test now checks that batch POST requests stay serialized through one active transport call. --- rpcclient/infrastructure.go | 11 ++-- rpcclient/infrastructure_test.go | 93 ++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 3 deletions(-) diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index 801849733b..8eb3b1cc6f 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -1610,13 +1610,18 @@ func NewBatch(config *ConnConfig) (*Client, error) { if !config.HTTPPostMode { return nil, errors.New("http post mode is required to use batch client") } - // notification parameter is nil since notifications are not supported in POST mode. + + // The notification parameter is nil since notifications are not + // supported in POST mode. client, err := New(config, nil) if err != nil { return nil, err } - client.batch = true //copy the client with changed batch setting - client.start() + + // New() already started the HTTP handlers, so only toggle batch + // semantics. + client.batch = true + return client, nil } diff --git a/rpcclient/infrastructure_test.go b/rpcclient/infrastructure_test.go index 5852edf9e8..98ca117f1a 100644 --- a/rpcclient/infrastructure_test.go +++ b/rpcclient/infrastructure_test.go @@ -579,3 +579,96 @@ func TestBatchSendErrorResolvesQueuedFutures(t *testing.T) { assertFutureErr(f1) assertFutureErr(f2) } + +// TestNewBatchSerializesPostSends ensures a batch client still serializes POST +// sends through a single handler goroutine. +func TestNewBatchSerializesPostSends(t *testing.T) { + connCfg := &ConnConfig{ + Host: "127.0.0.1:8332", + User: "user", + Pass: "pass", + DisableTLS: true, + HTTPPostMode: true, + } + + client, err := NewBatch(connCfg) + require.NoError(t, err) + t.Cleanup(func() { + client.Shutdown() + client.WaitForShutdown() + }) + + var active int32 + var maxActive int32 + release := make(chan struct{}) + + client.httpClient.Transport = postRoundTripFunc( + func(*http.Request) (*http.Response, error) { + current := atomic.AddInt32(&active, 1) + for { + prev := atomic.LoadInt32(&maxActive) + if current <= prev { + break + } + if atomic.CompareAndSwapInt32( + &maxActive, prev, current, + ) { + break + } + } + + // Hold the request open so the test can observe if + // a second POST enters the transport concurrently. + <-release + atomic.AddInt32(&active, -1) + + return &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader( + `{"result":1,"error":null}`, + )), + }, nil + }, + ) + + makeReq := func(id uint64) *jsonRequest { + return &jsonRequest{ + id: id, + method: "getblockcount", + marshalledJSON: []byte( + `{"jsonrpc":"1.0","id":1,` + + `"method":"getblockcount","params":[]}`, + ), + responseChan: make(chan *Response, 1), + } + } + + req1 := makeReq(1) + req2 := makeReq(2) + client.sendPostChan <- req1 + client.sendPostChan <- req2 + + // Wait until one request is definitely in flight before checking whether + // a duplicate handler can start a second concurrent POST. + require.Eventually(t, func() bool { + return atomic.LoadInt32(&active) >= 1 + }, time.Second, 5*time.Millisecond) + + // Allow any extra send handler goroutines to start a second in-flight + // request. + time.Sleep(100 * time.Millisecond) + observedMax := atomic.LoadInt32(&maxActive) + close(release) + + for i, req := range []*jsonRequest{req1, req2} { + select { + case resp := <-req.responseChan: + require.NoError(t, resp.err, "request %d failed", i) + case <-time.After(2 * time.Second): + t.Fatalf("timed out waiting for request %d response", i) + } + } + + require.EqualValues(t, 1, observedMax, "POST sends must be serialized") +} From 51944b270ccd0b8801dcfed60e50d06b9b60fe70 Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 6 May 2026 16:32:05 -0500 Subject: [PATCH 5/6] make fmt --- addrmgr/network.go | 2 +- chaincfg/params.go | 4 ++-- wire/msgcfcheckpt_bench_test.go | 1 - 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/addrmgr/network.go b/addrmgr/network.go index 66c0743307..e86133a2f6 100644 --- a/addrmgr/network.go +++ b/addrmgr/network.go @@ -276,7 +276,7 @@ func IsRoutable(na *wire.NetAddressV2) bool { IsRFC4843(lna) || IsRFC7343(lna) || IsRFC5737(lna) || IsRFC6598(lna) || IsLocal(lna) || IsZero(lna) || (IsRFC4193(lna) && - !IsOnionCatTor(lna))) + !IsOnionCatTor(lna))) } // GroupKey returns a string representing the network group an address is part diff --git a/chaincfg/params.go b/chaincfg/params.go index 8aa6c110c2..5eda032743 100644 --- a/chaincfg/params.go +++ b/chaincfg/params.go @@ -560,8 +560,8 @@ var RegressionNetParams = Params{ DeploymentEnder: NewMedianTimeDeploymentEnder( time.Time{}, // Never expires. ), - MinActivationHeight: 0, - AlwaysActiveHeight: 1, + MinActivationHeight: 0, + AlwaysActiveHeight: 1, CustomActivationThreshold: 108, // Only needs 75% hash rate. }, }, diff --git a/wire/msgcfcheckpt_bench_test.go b/wire/msgcfcheckpt_bench_test.go index 47bcfe0582..0bc3b7dc30 100644 --- a/wire/msgcfcheckpt_bench_test.go +++ b/wire/msgcfcheckpt_bench_test.go @@ -115,4 +115,3 @@ func BenchmarkMsgCFCheckptDecodeEmpty(b *testing.B) { } } } - From 74f48480e0dba4665b848dbdbb0267c6c05fb25d Mon Sep 17 00:00:00 2001 From: Boris Nagaev Date: Wed, 6 May 2026 17:11:20 -0500 Subject: [PATCH 6/6] tests: fix stale regtest assumptions after #2467 PR #2467 changed regtest to match Core's BIP34/65/66 activation rules, and the merged stack carries that (commit cd4e5426 "regtest: align activations with Bitcoin Core"). Height-1 regtest blocks now need a BIP34-compliant coinbase height and a post-BIP66 block version. The failing tests came from commits added after PR #2467 was opened on December 25, 2025 but before it merged on April 30, 2026: - c1a46122 ("blockchain: add ProcessBlockHeader") - f9645f07 ("blockchain: reuse existing header node in maybeAcceptBlock") - dc6e096c ("netsync: add TestSyncStateMachine for end-to-end IBD sync flow") - ce094262 ("netsync: add TestStartSyncBlockFallback for block-only sync path") - 2aae8a6d ("netsync: add TestStartSyncChainCurrent for chain-current noop path") Because those tests landed later, they kept the old regtest assumptions even though #2467 had already been authored and tested against the older tree. Once #2467 finally merged, these newer tests started building invalid regtest blocks and headers. Fix them by setting the genesis tip height to 0 before generating descendants, using Version 4 in the regtest block/header helpers, and encoding the test coinbase height with a minimal BIP34 push plus padding for the generic coinbase script-length rule. --- blockchain/accept_test.go | 3 +++ blockchain/process_test.go | 3 ++- netsync/manager_test.go | 20 +++++++++++++------- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/blockchain/accept_test.go b/blockchain/accept_test.go index 28a1ff6997..ab96b420b6 100644 --- a/blockchain/accept_test.go +++ b/blockchain/accept_test.go @@ -26,6 +26,9 @@ func TestMaybeAcceptBlockReusesHeaderNode(t *testing.T) { // // genesis -> 1 -> 2 -> 3 tip := btcutil.NewBlock(params.GenesisBlock) + // Generated descendants must start at height 1 so the coinbase height + // encoding matches regtest's BIP34 rules. + tip.SetHeight(0) _, _, err := addBlocks(3, chain, tip, []*testhelper.SpendableOut{}) if err != nil { t.Fatalf("failed to build base chain: %v", err) diff --git a/blockchain/process_test.go b/blockchain/process_test.go index bc5de726ee..b24b4a0432 100644 --- a/blockchain/process_test.go +++ b/blockchain/process_test.go @@ -36,7 +36,8 @@ func chainedHeaders(parent *wire.BlockHeader, chainParams *chaincfg.Params, merkle := chainhash.HashH(randBytes[:]) header := wire.BlockHeader{ - Version: 1, + // Regtest enforces the BIP34/65/66 version floor from height 1. + Version: 4, PrevBlock: tip.BlockHash(), MerkleRoot: merkle, Bits: chainParams.PowLimitBits, diff --git a/netsync/manager_test.go b/netsync/manager_test.go index 73eb0d51a8..b7e4b6bb95 100644 --- a/netsync/manager_test.go +++ b/netsync/manager_test.go @@ -557,11 +557,15 @@ func TestIsInIBDMode(t *testing.T) { func createTestCoinbase(height int32, params *chaincfg.Params) *wire.MsgTx { tx := wire.NewMsgTx(wire.TxVersion) - // Push the height as data to guarantee unique txids per block. - sigScript := []byte{ - 0x04, - byte(height), byte(height >> 8), - byte(height >> 16), byte(height >> 24), + // Encode the height as a minimally encoded script integer and add a trailing + // OP_0 so the script also satisfies the generic coinbase-length checks. + sigScript, err := txscript.NewScriptBuilder(). + AddInt64(int64(height)). + AddInt64(0). + Script() + if err != nil { + panic(fmt.Sprintf("unable to encode coinbase height %d: %v", + height, err)) } tx.AddTxIn(&wire.TxIn{ @@ -613,7 +617,8 @@ func generateTestBlocks( merkleRoot := cb.TxHash() header := wire.BlockHeader{ - Version: 1, + // Regtest enforces the BIP34/65/66 version floor from height 1. + Version: 4, PrevBlock: *prevHash, MerkleRoot: merkleRoot, Timestamp: prevTime.Add(time.Minute), @@ -1187,7 +1192,8 @@ func TestStartSyncChainCurrent(t *testing.T) { // IsCurrent() returns true. cb := createTestCoinbase(1, ¶ms) header := wire.BlockHeader{ - Version: 1, + // Regtest enforces the BIP34/65/66 version floor from height 1. + Version: 4, PrevBlock: *params.GenesisHash, MerkleRoot: cb.TxHash(), Timestamp: time.Now().Truncate(time.Second),