Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions caddy/mercure-skip.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//go:build nomercure

package caddy

import (
Expand Down
2 changes: 1 addition & 1 deletion caddy/workerconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type workerConfig struct {
// MaxConsecutiveFailures sets the maximum number of consecutive failures before panicking (defaults to 6, set to -1 to never panick)
MaxConsecutiveFailures int `json:"max_consecutive_failures,omitempty"`

requestOptions []frankenphp.RequestOption
requestOptions []frankenphp.RequestOption
}

func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) {
Expand Down
2 changes: 1 addition & 1 deletion frankenphp.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func Init(options ...Option) error {
return err
}

regularRequestChan = make(chan contextHolder, opt.numThreads-workerThreadCount)
regularRequestChan = make(chan contextHolder)
regularThreads = make([]*phpThread, 0, opt.numThreads-workerThreadCount)
for i := 0; i < opt.numThreads-workerThreadCount; i++ {
convertToRegularThread(getInactivePHPThread())
Expand Down
42 changes: 30 additions & 12 deletions threadregular.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package frankenphp

import (
"context"
"runtime"
"sync"
"sync/atomic"
)

// representation of a non-worker PHP thread
Expand All @@ -16,9 +18,10 @@ type regularThread struct {
}

var (
regularThreads []*phpThread
regularThreadMu = &sync.RWMutex{}
regularRequestChan chan contextHolder
regularThreads []*phpThread
regularThreadMu = &sync.RWMutex{}
regularRequestChan chan contextHolder
queuedRegularThreads = atomic.Int32{}
)

func convertToRegularThread(thread *phpThread) {
Expand Down Expand Up @@ -81,6 +84,7 @@ func (handler *regularThread) waitForRequest() string {
// go back to beforeScriptExecution
return handler.beforeScriptExecution()
case ch = <-regularRequestChan:
case ch = <-handler.thread.requestChan:
}

handler.ctx = ch.ctx
Expand All @@ -100,23 +104,35 @@ func (handler *regularThread) afterRequest() {
func handleRequestWithRegularPHPThreads(ch contextHolder) error {
metrics.StartRequest()

select {
case regularRequestChan <- ch:
// a thread was available to handle the request immediately
<-ch.frankenPHPContext.done
metrics.StopRequest()

return nil
default:
// no thread was available
runtime.Gosched()

if queuedRegularThreads.Load() == 0 {
regularThreadMu.RLock()
for _, thread := range regularThreads {
select {
case thread.requestChan <- ch:
regularThreadMu.RUnlock()
<-ch.frankenPHPContext.done
metrics.StopRequest()

return nil
default:
// thread was not available
}
}
regularThreadMu.RUnlock()
}

// if no thread was available, mark the request as queued and fan it out to all threads
queuedRegularThreads.Add(1)
metrics.QueuedRequest()

for {
select {
case regularRequestChan <- ch:
queuedRegularThreads.Add(-1)
metrics.DequeuedRequest()

<-ch.frankenPHPContext.done
metrics.StopRequest()

Expand All @@ -125,7 +141,9 @@ func handleRequestWithRegularPHPThreads(ch contextHolder) error {
// the request has triggered scaling, continue to wait for a thread
case <-timeoutChan(maxWaitTime):
// the request has timed out stalling
queuedRegularThreads.Add(-1)
metrics.DequeuedRequest()
metrics.StopRequest()

ch.frankenPHPContext.reject(ErrMaxWaitTimeExceeded)

Expand Down
36 changes: 24 additions & 12 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/dunglas/frankenphp/internal/fastabs"
Expand All @@ -28,6 +30,7 @@ type worker struct {
maxConsecutiveFailures int
onThreadReady func(int)
onThreadShutdown func(int)
queuedRequests atomic.Int32
}

var (
Expand Down Expand Up @@ -253,24 +256,30 @@ func (worker *worker) isAtThreadLimit() bool {
func (worker *worker) handleRequest(ch contextHolder) error {
metrics.StartWorkerRequest(worker.name)

// dispatch requests to all worker threads in order
worker.threadMutex.RLock()
for _, thread := range worker.threads {
select {
case thread.requestChan <- ch:
worker.threadMutex.RUnlock()
<-ch.frankenPHPContext.done
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
runtime.Gosched()

return nil
default:
// thread is busy, continue
if worker.queuedRequests.Load() == 0 {
// dispatch requests to all worker threads in order
worker.threadMutex.RLock()
for _, thread := range worker.threads {
select {
case thread.requestChan <- ch:
worker.threadMutex.RUnlock()
<-ch.frankenPHPContext.done
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))

return nil
default:
// thread is busy, continue
}
}
worker.threadMutex.RUnlock()
}
worker.threadMutex.RUnlock()

// if no thread was available, mark the request as queued and apply the scaling strategy
worker.queuedRequests.Add(1)
metrics.QueuedWorkerRequest(worker.name)

for {
workerScaleChan := scaleChan
if worker.isAtThreadLimit() {
Expand All @@ -279,6 +288,7 @@ func (worker *worker) handleRequest(ch contextHolder) error {

select {
case worker.requestChan <- ch:
worker.queuedRequests.Add(-1)
metrics.DequeuedWorkerRequest(worker.name)
<-ch.frankenPHPContext.done
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
Expand All @@ -288,7 +298,9 @@ func (worker *worker) handleRequest(ch contextHolder) error {
// the request has triggered scaling, continue to wait for a thread
case <-timeoutChan(maxWaitTime):
// the request has timed out stalling
worker.queuedRequests.Add(-1)
metrics.DequeuedWorkerRequest(worker.name)
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))

ch.frankenPHPContext.reject(ErrMaxWaitTimeExceeded)

Expand Down