Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
a75f988
fix(crossseed): resolve RSS/seeded search starvation and reduce memor…
s0up4200 Nov 26, 2025
a7f11ad
fix(crossseed): improve cache key robustness and clarify slice ownership
s0up4200 Nov 26, 2025
be8c97b
docs: document cache contracts and design tradeoffs
s0up4200 Nov 26, 2025
900fe18
fix(crossseed): correct save path handling for all folder structures
s0up4200 Nov 26, 2025
b31b323
fix(crossseed): improve recent results tracking and UI display for cr…
s0up4200 Nov 26, 2025
23083bd
fix(crossseed): correct save path handling and improve logging accuracy
s0up4200 Nov 26, 2025
b98141b
chore(crossseed): add debug logging for RSS coordination wait
s0up4200 Nov 26, 2025
48e0206
refactor(jackett): replace blocking rate limiter with async job-based…
s0up4200 Nov 26, 2025
0719948
test(scheduler): fix race condition in ErrorPropagation test
s0up4200 Nov 26, 2025
fdaf464
test(scheduler): fix race condition in MultipleIndexersPerSubmission …
s0up4200 Nov 26, 2025
963d3a0
fix subfolder infinite recursion
KyleSanderson Nov 27, 2025
166e474
fix stuck rss and dead search
KyleSanderson Nov 27, 2025
6f3fb3b
grok bad fix
KyleSanderson Nov 27, 2025
01ac6e0
normalize
KyleSanderson Nov 27, 2025
c0c076c
paths
KyleSanderson Nov 27, 2025
e4e1e37
clams
KyleSanderson Nov 27, 2025
c05d26a
pause, and wait for disk cache ttl on recheck failure
KyleSanderson Nov 27, 2025
d64294b
feat(prowlarr): add Prowlarr history endpoints and UI panel for track…
s0up4200 Nov 27, 2025
1da32de
fuck
KyleSanderson Nov 27, 2025
1385ae0
Merge branch 'fix/rss-seeded-search-starvation-and-memory' of github.…
KyleSanderson Nov 27, 2025
f756cc5
fix(crossseed): use SavePath for single-file candidate torrents
s0up4200 Nov 27, 2025
d1222c2
Merge branch 'fix/rss-seeded-search-starvation-and-memory' of github.…
s0up4200 Nov 27, 2025
4871976
cycle until we're done
KyleSanderson Nov 27, 2025
5a50120
Merge branch 'fix/rss-seeded-search-starvation-and-memory' of github.…
KyleSanderson Nov 27, 2025
e49b34e
fix(crossseed): use Subfolder layout for single file into folder matches
s0up4200 Nov 27, 2025
526b29d
Merge branch 'fix/rss-seeded-search-starvation-and-memory' of github.…
s0up4200 Nov 27, 2025
42c5b13
fix(crossseed): use Subfolder layout for single file into folder matches
s0up4200 Nov 27, 2025
637edee
fix: tidy up the History Panel
s0up4200 Nov 27, 2025
8339503
feat(indexers): replace Prowlarr history panel with scheduler activit…
s0up4200 Nov 27, 2025
4d97da6
fix(crossseed): use ContentPath for TV episodes matching season packs
s0up4200 Nov 27, 2025
83a921e
fix(web): replace date-fns with native dateTimeUtils
s0up4200 Nov 27, 2025
165bd14
refactor(web): restructure IndexersPage
s0up4200 Nov 27, 2025
6986fc9
fix(crossseed,jackett): address scheduler starvation and recovery issues
s0up4200 Nov 27, 2025
2ff613e
fix(jackett): prevent queue starvation and deadline failures for queu…
s0up4200 Nov 27, 2025
30d5755
fix(web): warn that episode matches disable Auto Torrent Management
s0up4200 Nov 27, 2025
e46f393
feat(web): improve search history params display with torznab categor…
s0up4200 Nov 27, 2025
7297b52
fix(web): remove unnecessary border from SearchDetailDialog header
s0up4200 Nov 27, 2025
3001a00
fix(web): update cancel button text to indicate stopping state during…
s0up4200 Nov 27, 2025
bbcb214
fix(web): complete torznab category mappings and add cancel loading s…
s0up4200 Nov 27, 2025
0f3d08f
Merge branch 'main' into fix/rss-seeded-search-starvation-and-memory
s0up4200 Nov 27, 2025
c0eb38f
fix(jackett): replace daily limit inference with escalating backoff
s0up4200 Nov 27, 2025
16943d5
fix(jackett): reset escalation on success for all search paths
s0up4200 Nov 27, 2025
82218da
fix(jackett): route interactive searches through scheduler for UI vis…
s0up4200 Nov 27, 2025
038f28b
fix(jackett): make scheduler Stop idempotent and restore cooldowns on…
s0up4200 Nov 27, 2025
552c186
fix(crossseed): replace blocking recheck with background auto-resume
s0up4200 Nov 27, 2025
ae8dc16
feat(crossseed): add hasExtraSourceFiles function to identify additio…
s0up4200 Nov 27, 2025
96970d3
fix(crossseed): replace per-torrent goroutines with single recheck re…
s0up4200 Nov 27, 2025
13a36c0
fix(crossseed): trigger recheck for hasExtraFiles torrents added in s…
s0up4200 Nov 28, 2025
6be386d
feat(crossseed): source-specific tags for RSS, seeded search, complet…
s0up4200 Nov 28, 2025
7f0faae
fix(crossseed): update tests to use source-specific tag fields
s0up4200 Nov 28, 2025
96aa386
feat(crossseed): add REPACK and PROPER to strict variant matching
s0up4200 Nov 28, 2025
e005894
fix(crossseed): disable ATM for episodes matched to season packs
s0up4200 Nov 28, 2025
ad53f6d
feat(crossseed): exempt season packs from REPACK/PROPER strict matching
s0up4200 Nov 28, 2025
95eb364
fix(crossseed): prevent premature removal from recheck queue before r…
s0up4200 Nov 29, 2025
04c7c34
fix(crossseed): only mark automation runs as stuck when no active run…
s0up4200 Nov 29, 2025
784d472
refactor(crossseed): clean up debug logging in filterIndexersByExisti…
s0up4200 Nov 29, 2025
c39399e
refactor(crossseed): batch errored torrent recovery with per-torrent …
s0up4200 Nov 29, 2025
f51edc5
test(crossseed): update recovery tests for batched API calls
s0up4200 Nov 29, 2025
b9b3ef5
fix(crossseed): remove ineffective context timeout wrapper
s0up4200 Nov 29, 2025
3fc1abb
fix(crossseed): trust file episode markers for TV content detection
s0up4200 Nov 29, 2025
ca7ae1b
refactor(crossseed): remove AddCrossSeedTag field from API
s0up4200 Nov 29, 2025
bd4b27b
Merge branch 'main' into fix/rss-seeded-search-starvation-and-memory
s0up4200 Nov 29, 2025
d7806f0
Merge branch 'main' into fix/rss-seeded-search-starvation-and-memory
s0up4200 Nov 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions internal/qbittorrent/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
type FilesManager interface {
GetCachedFiles(ctx context.Context, instanceID int, hash string) (qbt.TorrentFiles, error)
// GetCachedFilesBatch returns cached files for a set of torrents and the hashes that were missing/stale.
// Callers must pass hashes already trimmed/normalized (e.g. lowercase hex)
// Callers must pass hashes already trimmed/normalized (e.g. uppercase hex)
// because implementations treat the provided keys as-is when populating lookups and cache metadata.
GetCachedFilesBatch(ctx context.Context, instanceID int, hashes []string) (map[string]qbt.TorrentFiles, []string, error)
CacheFiles(ctx context.Context, instanceID int, hash string, files qbt.TorrentFiles) error
Expand Down Expand Up @@ -1272,13 +1272,13 @@ func (sm *SyncManager) GetTorrentFilesBatch(ctx context.Context, instanceID int,
return nil
}

// Clone the slice so later fetches cannot mutate the stored entry if the
// client reuses backing arrays across calls.
copied := make(qbt.TorrentFiles, len(*files))
copy(copied, *files)
// Clone the API response once. This clone is shared between the caller's
// result map and the cache. Callers must treat returned slices as read-only.
callerCopy := make(qbt.TorrentFiles, len(*files))
copy(callerCopy, *files)

mu.Lock()
filesByHash[ch] = copied
filesByHash[ch] = callerCopy
mu.Unlock()
return nil
})
Expand All @@ -1288,15 +1288,15 @@ func (sm *SyncManager) GetTorrentFilesBatch(ctx context.Context, instanceID int,
return filesByHash, err
}

// Cache all newly fetched files in batch
// Cache all newly fetched files in batch.
// Fresh fetches share the cloned slice between caller and cache (one clone total).
// Cache hits (handled earlier) return isolated clones.
// IMPORTANT: Callers must treat qbt.TorrentFiles as read-only to avoid cache corruption.
if fm := sm.getFilesManager(); fm != nil && len(hashesToFetch) > 0 {
fetchedFiles := make(map[string]qbt.TorrentFiles)
for _, canonicalHash := range hashesToFetch {
if files, ok := filesByHash[canonicalHash]; ok {
// Cache a clone to keep cache entries isolated.
cloned := make(qbt.TorrentFiles, len(files))
copy(cloned, files)
fetchedFiles[canonicalHash] = cloned
fetchedFiles[canonicalHash] = files
}
}
if len(fetchedFiles) > 0 {
Expand Down
73 changes: 72 additions & 1 deletion internal/services/crossseed/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/autobrr/autobrr/pkg/ttlcache"
qbt "github.com/autobrr/go-qbittorrent"
"github.com/cespare/xxhash/v2"
"github.com/moistari/rls"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -69,6 +70,12 @@ type qbittorrentSync interface {
SetTags(ctx context.Context, instanceID int, hashes []string, tags string) error
}

// dedupCacheEntry stores cached deduplication results to avoid recomputation.
type dedupCacheEntry struct {
deduplicated []qbt.Torrent
duplicateHashes map[string][]string
}

// ServiceMetrics contains Prometheus metrics for the cross-seed service
type ServiceMetrics struct {
FindCandidatesDuration prometheus.Histogram
Expand Down Expand Up @@ -247,6 +254,11 @@ type Service struct {
// Cached torrent file metadata for repeated analyze/search calls.
torrentFilesCache *ttlcache.Cache[string, qbt.TorrentFiles]

// Cached deduplication results to avoid recomputing on every search run.
// Key: "dedup:{instanceID}:{torrentCount}:{hashSignature}" where hashSignature
// is derived from a sample of torrent hashes to detect list changes.
dedupCache *ttlcache.Cache[string, *dedupCacheEntry]

searchMu sync.RWMutex
searchCancel context.CancelFunc
searchState *searchRunState
Expand Down Expand Up @@ -280,6 +292,8 @@ func NewService(
SetDefaultTTL(indexerDomainCacheTTL))
contentFilesCache := ttlcache.New(ttlcache.Options[string, qbt.TorrentFiles]{}.
SetDefaultTTL(5 * time.Minute))
dedupCache := ttlcache.New(ttlcache.Options[string, *dedupCacheEntry]{}.
SetDefaultTTL(5 * time.Minute))

return &Service{
instanceStore: instanceStore,
Expand All @@ -296,6 +310,7 @@ func NewService(
automationWake: make(chan struct{}, 1),
domainMappings: initializeDomainMappings(),
torrentFilesCache: contentFilesCache,
dedupCache: dedupCache,
metrics: NewServiceMetrics(),
}
}
Expand Down Expand Up @@ -3931,6 +3946,25 @@ func (s *Service) finalizeSearchRun(state *searchRunState, canceled bool) {
s.searchMu.Unlock()
}

// dedupCacheKey generates a cache key for deduplication results based on instance ID
// and a signature derived from torrent hashes. Uses XOR of xxhash values for order-independent
// hashing - the same set of torrents produces the same key regardless of order.
func dedupCacheKey(instanceID int, torrents []qbt.Torrent) string {
n := len(torrents)
if n == 0 {
return fmt.Sprintf("dedup:%d:0:0", instanceID)
}

// XOR all individual hash digests for order-independent signature.
// XOR is commutative and associative, so order doesn't matter.
var sig uint64
for i := range torrents {
sig ^= xxhash.Sum64String(torrents[i].Hash)
}

return fmt.Sprintf("dedup:%d:%d:%x", instanceID, n, sig)
}
Comment thread
s0up4200 marked this conversation as resolved.

// deduplicateSourceTorrents removes duplicate torrents from the search queue by keeping only
// the best representative of each unique content group. It prefers torrents that already
// have a top-level folder (so subsequent cross-seeds inherit cleaner layouts) and falls back to
Expand All @@ -3951,6 +3985,19 @@ func (s *Service) deduplicateSourceTorrents(ctx context.Context, instanceID int,
return torrents, map[string][]string{}
}

// Generate cache key from instance ID and an order-independent signature of torrent hashes.
cacheKey := dedupCacheKey(instanceID, torrents)
if s.dedupCache != nil {
if entry, ok := s.dedupCache.Get(cacheKey); ok && entry != nil {
log.Trace().
Int("instanceID", instanceID).
Int("cachedCount", len(entry.deduplicated)).
Msg("[CROSSSEED-DEDUP] Using cached deduplication result")
// IMPORTANT: Returned slices are cache-backed. Do not modify.
return entry.deduplicated, entry.duplicateHashes
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Parse all torrents and track their releases
type torrentWithRelease struct {
torrent qbt.Torrent
Expand Down Expand Up @@ -4141,6 +4188,14 @@ func (s *Service) deduplicateSourceTorrents(ctx context.Context, instanceID int,
duplicateMap[rep.torrent.Hash] = append([]string(nil), group.duplicates...)
}

// Cache the result for future runs
if s.dedupCache != nil {
s.dedupCache.Set(cacheKey, &dedupCacheEntry{
deduplicated: deduplicated,
duplicateHashes: duplicateMap,
}, ttlcache.DefaultTTL)
}

return deduplicated, duplicateMap
}

Expand Down Expand Up @@ -4398,6 +4453,22 @@ func (s *Service) processSearchCandidate(ctx context.Context, state *searchRunSt
return nil
}

// Wait for RSS automation to complete before searching to avoid rate limiter contention.
// RSS uses higher priority (shorter intervals) so we yield to it.
// Cap the wait to 5 minutes to prevent indefinite blocking if RSS gets stuck.
rssWaitDeadline := time.After(5 * time.Minute)
rssWaitLoop:
for s.runActive.Load() {
select {
case <-ctx.Done():
return ctx.Err()
case <-rssWaitDeadline:
log.Warn().Msg("[CROSSSEED-SEARCH] RSS wait timeout exceeded, proceeding with search")
break rssWaitLoop
case <-time.After(500 * time.Millisecond):
}
}

searchCtx := ctx
var searchCancel context.CancelFunc
searchTimeout := computeAutomationSearchTimeout(len(allowedIndexerIDs))
Expand All @@ -4407,7 +4478,7 @@ func (s *Service) processSearchCandidate(ctx context.Context, state *searchRunSt
if searchCancel != nil {
defer searchCancel()
}
searchCtx = jackett.WithSearchPriority(searchCtx, jackett.RateLimitPriorityRSS)
searchCtx = jackett.WithSearchPriority(searchCtx, jackett.RateLimitPriorityBackground)

searchResp, err := s.SearchTorrentMatches(searchCtx, state.opts.InstanceID, torrent.Hash, TorrentSearchOptions{
IndexerIDs: allowedIndexerIDs,
Expand Down
8 changes: 5 additions & 3 deletions internal/services/jackett/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,15 @@ func (r *RateLimiter) BeforeRequest(ctx context.Context, indexer *models.Torznab
}

if cfg.MaxWait > 0 && wait > cfg.MaxWait {
// Treat large waits as a short-lived cooldown so schedulers can skip instead of retrying.
r.setCooldownLocked(indexer.ID, wait)
// Cap cooldown to maxWait to prevent cascading blocks across priorities.
// Setting the full wait would cause lower-priority tasks to inherit cooldowns
// far exceeding their own budgets, starving them unnecessarily.
r.setCooldownLocked(indexer.ID, cfg.MaxWait)
r.mu.Unlock()
return &RateLimitWaitError{
IndexerID: indexer.ID,
IndexerName: indexer.Name,
Wait: wait,
Wait: cfg.MaxWait, // Report capped wait, not theoretical wait
MaxWait: cfg.MaxWait,
Priority: cfg.Priority,
}
Expand Down
5 changes: 3 additions & 2 deletions internal/services/jackett/ratelimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func TestRateLimiterMaxWaitBudget(t *testing.T) {
t.Fatalf("expected RateLimitWaitError, got %v", err)
}

if waitErr.Wait <= waitErr.MaxWait {
t.Fatalf("expected wait to exceed max wait, got wait %v max %v", waitErr.Wait, waitErr.MaxWait)
// Wait is now capped to MaxWait to provide consistent semantics for callers
if waitErr.Wait != waitErr.MaxWait {
t.Fatalf("expected wait to be capped to max wait, got wait %v max %v", waitErr.Wait, waitErr.MaxWait)
}
}
Loading