Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions storage/postgresql/admin_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ func (t *adminTX) Commit() error {
t.mu.Lock()
defer t.mu.Unlock()
t.closed = true
if t.tx.Conn().IsClosed() {
return fmt.Errorf("commit aborted: connection already closed")
}
return t.tx.Commit(context.TODO())
}

Expand All @@ -112,6 +115,9 @@ func (t *adminTX) Close() error {
return nil
}
t.closed = true
if t.tx.Conn().IsClosed() {
return nil
}
return t.tx.Rollback(context.TODO())
}

Expand Down
42 changes: 35 additions & 7 deletions storage/postgresql/log_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,11 @@ func (t *logTreeTX) QueueLeaf(ctx context.Context, leaf *trillian.LogLeaf, queue
existingLeaves := make([]*trillian.LogLeaf, 1)
result, err := t.tx.Exec(ctx, queueLeafSQL, t.treeID, leaf.LeafIdentityHash, leaf.LeafValue, leaf.ExtraData, args[0], leaf.MerkleLeafHash, args[1])
if err != nil {
klog.Warningf("Failed to queue leaf: %s", err)
if ctx.Err() != nil {
klog.V(1).Infof("Failed to queue leaf: %s", err)
} else {
klog.Warningf("Failed to queue leaf: %s", err)
}
return nil, postgresqlToGRPC(err)
}
queuedCounter.Add(1, label)
Expand Down Expand Up @@ -667,13 +671,21 @@ func (t *logTreeTX) getLeavesByRangeInternal(ctx context.Context, start, count i

rows, err := t.tx.Query(ctx, selectLeavesByRangeSQL, start, start+count, t.treeID)
if err != nil {
klog.Warningf("Failed to get leaves by range: %s", err)
if ctx.Err() != nil {
klog.V(1).Infof("Failed to get leaves by range: %s", err)
} else {
klog.Warningf("Failed to get leaves by range: %s", err)
}
return nil, err
}
defer func() {
rows.Close()
if err := rows.Err(); err != nil {
klog.Errorf("rows.Err(): %v", err)
if ctx.Err() != nil {
klog.V(1).Infof("rows.Err(): %v", err)
} else {
klog.Errorf("rows.Err(): %v", err)
}
}
}()

Expand Down Expand Up @@ -709,7 +721,11 @@ func (t *logTreeTX) getLeavesByRangeInternal(ctx context.Context, start, count i
ret = append(ret, leaf)
}
if err := rows.Err(); err != nil {
klog.Warningf("Failed to read returned leaves: %s", err)
if ctx.Err() != nil {
klog.V(1).Infof("Failed to read returned leaves: %s", err)
} else {
klog.Warningf("Failed to read returned leaves: %s", err)
}
return nil, err
}

Expand Down Expand Up @@ -804,13 +820,21 @@ func (t *logTreeTX) StoreSignedLogRoot(ctx context.Context, root *trillian.Signe
func (t *logTreeTX) getLeavesByHashInternal(ctx context.Context, leafHashes [][]byte, query string, desc string) ([]*trillian.LogLeaf, error) {
rows, err := t.tx.Query(ctx, query, leafHashes, t.treeID)
if err != nil {
klog.Warningf("Query() %s hash = %v", desc, err)
if ctx.Err() != nil {
klog.V(1).Infof("Query() %s hash = %v", desc, err)
} else {
klog.Warningf("Query() %s hash = %v", desc, err)
}
return nil, err
}
defer func() {
rows.Close()
if err := rows.Err(); err != nil {
klog.Errorf("rows.Err(): %v", err)
if ctx.Err() != nil {
klog.V(1).Infof("rows.Err(): %v", err)
} else {
klog.Errorf("rows.Err(): %v", err)
}
}
}()

Expand Down Expand Up @@ -848,7 +872,11 @@ func (t *logTreeTX) getLeavesByHashInternal(ctx context.Context, leafHashes [][]
ret = append(ret, leaf)
}
if err := rows.Err(); err != nil {
klog.Warningf("Failed to read returned leaves: %s", err)
if ctx.Err() != nil {
klog.V(1).Infof("Failed to read returned leaves: %s", err)
} else {
klog.Warningf("Failed to read returned leaves: %s", err)
}
return nil, err
}

Expand Down
60 changes: 57 additions & 3 deletions storage/postgresql/tree_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"runtime/debug"
"sync"
"time"

"github.com/google/trillian"
"github.com/google/trillian/storage/cache"
Expand Down Expand Up @@ -65,6 +66,20 @@ type postgreSQLTreeStorage struct {
// (See https://github.com/jackc/pgx/wiki/Automatic-Prepared-Statement-Caching)
}

// connIsUsable checks whether a pgx connection is in a clean, usable state.
// It returns false if the connection is closed or has an active/failed transaction,
// which can happen when a context cancellation interrupts a query mid-flight and
// the connection is returned to the pool without proper cleanup.
// See https://github.com/jackc/pgx/issues/2100
func connIsUsable(c *pgx.Conn) bool {
if c.IsClosed() {
return false
}
txStatus := c.PgConn().TxStatus()
// 'I' = idle (no transaction), 'T' = in transaction, 'E' = failed transaction
return txStatus == 'I'
}

// OpenDB opens a database connection pool for all PostgreSQL-based storage implementations.
func OpenDB(dbURL string) (*pgxpool.Pool, error) {
pgxConfig, err := pgxpool.ParseConfig(dbURL)
Expand All @@ -73,6 +88,19 @@ func OpenDB(dbURL string) (*pgxpool.Pool, error) {
return nil, err
}

// Mitigate connection pool contamination caused by context cancellation
// during in-flight queries. When a gRPC context is cancelled, pgx may
// close the underlying connection or leave it in a dirty transaction state.
// These callbacks ensure such connections are destroyed rather than being
// returned to or acquired from the pool.
// See https://github.com/jackc/pgx/issues/2100
pgxConfig.PrepareConn = func(ctx context.Context, c *pgx.Conn) (bool, error) {
return connIsUsable(c), nil
}
pgxConfig.AfterRelease = func(c *pgx.Conn) bool {
return connIsUsable(c)
}

db, err := pgxpool.NewWithConfig(context.TODO(), pgxConfig)
if err != nil {
// Don't log uri as it could contain credentials
Expand All @@ -92,7 +120,11 @@ func newTreeStorage(db *pgxpool.Pool) *postgreSQLTreeStorage {
func (m *postgreSQLTreeStorage) beginTreeTx(ctx context.Context, tree *trillian.Tree, hashSizeBytes int, subtreeCache *cache.SubtreeCache) (treeTX, error) {
t, err := m.db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
klog.Warningf("Could not start tree TX: %s", err)
if ctx.Err() != nil {
klog.V(1).Infof("Could not start tree TX: %s", err)
} else {
klog.Warningf("Could not start tree TX: %s", err)
}
return treeTX{}, err
}

Expand Down Expand Up @@ -291,17 +323,33 @@ func (t *treeTX) Commit(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()

// If the connection is already closed (e.g., due to context cancellation
// during a prior query), don't attempt the commit. This avoids noisy
// "failed to deallocate cached statement(s): conn closed" errors.
if t.tx.Conn().IsClosed() {
t.closed = true
return fmt.Errorf("commit aborted: connection already closed")
}

tiles, err := t.subtreeCache.UpdatedTiles()
if err != nil {
klog.Warningf("SubtreeCache updated tiles error: %v", err)
return err
}
if err := t.storeSubtrees(ctx, tiles); err != nil {

// Use a non-cancellable context for the commit path to prevent context
// cancellation (e.g., from a gRPC client disconnect) from interrupting
// the commit and causing "conn closed" errors during cached statement
// deallocation. A timeout is applied as a safety net.
commitCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cancel()

if err := t.storeSubtrees(commitCtx, tiles); err != nil {
klog.Warningf("TX commit flush error: %v", err)
return err
}
t.closed = true
if err := t.tx.Commit(ctx); err != nil {
if err := t.tx.Commit(commitCtx); err != nil {
klog.Warningf("TX commit error: %s, stack:\n%s", err, string(debug.Stack()))
return err
}
Expand All @@ -310,6 +358,12 @@ func (t *treeTX) Commit(ctx context.Context) error {

func (t *treeTX) rollbackInternal() error {
t.closed = true
// If the connection is already closed (e.g., due to context cancellation
// during a prior query), don't attempt the rollback. The server has
// already discarded the transaction when the connection closed.
if t.tx.Conn().IsClosed() {
return nil
}
if err := t.tx.Rollback(context.TODO()); err != nil {
klog.Warningf("TX rollback error: %s, stack:\n%s", err, string(debug.Stack()))
return err
Expand Down