Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions threadFramework.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log/slog"
"net/http"
"sync"
"sync/atomic"
)

// EXPERIMENTAL: WorkerExtension allows you to register an external worker where instead of calling frankenphp handlers on
Expand Down Expand Up @@ -99,3 +100,46 @@ func startExternalWorkerPipe(w *worker, externalWorker WorkerExtension, thread *
}
}
}

type Worker struct {
ExtensionName string
WorkerFileName string
WorkerEnv PreparedEnv
Comment thread
dunglas marked this conversation as resolved.
MinThreads int
RequestChan chan *WorkerRequest[any, any]
ActivatedCount atomic.Int32
DrainCount atomic.Int32
}

func (w *Worker) Name() string {
return w.ExtensionName
}

func (w *Worker) FileName() string {
return w.WorkerFileName
}

func (w *Worker) Env() PreparedEnv {
return w.WorkerEnv
}

func (w *Worker) GetMinThreads() int {
return w.MinThreads
}

func (w *Worker) ThreadActivatedNotification(threadId int) {
w.ActivatedCount.Add(1)
}

func (w *Worker) ThreadDrainNotification(threadId int) {
w.DrainCount.Add(1)
}

func (w *Worker) ThreadDeactivatedNotification(threadId int) {
w.DrainCount.Add(-1)
w.ActivatedCount.Add(-1)
}

func (w *Worker) ProvideRequest() *WorkerRequest[any, any] {
return <-w.RequestChan
}
70 changes: 13 additions & 57 deletions threadFramework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package frankenphp
import (
"io"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -13,73 +13,29 @@ import (

// mockWorkerExtension implements the WorkerExtension interface
type mockWorkerExtension struct {
name string
fileName string
env PreparedEnv
minThreads int
requestChan chan *WorkerRequest[any, any]
activatedCount int
drainCount int
deactivatedCount int
mu sync.Mutex
Worker
}

func newMockWorkerExtension(name, fileName string, minThreads int) *mockWorkerExtension {
return &mockWorkerExtension{
name: name,
fileName: fileName,
env: make(PreparedEnv),
minThreads: minThreads,
requestChan: make(chan *WorkerRequest[any, any], 10), // Buffer to avoid blocking
Worker: Worker{
ExtensionName: name,
WorkerFileName: fileName,
WorkerEnv: nil,
MinThreads: minThreads,
RequestChan: make(chan *WorkerRequest[any, any], minThreads),
ActivatedCount: atomic.Int32{},
DrainCount: atomic.Int32{},
},
}
}

func (m *mockWorkerExtension) Name() string {
return m.name
}

func (m *mockWorkerExtension) FileName() string {
return m.fileName
}

func (m *mockWorkerExtension) Env() PreparedEnv {
return m.env
}

func (m *mockWorkerExtension) GetMinThreads() int {
return m.minThreads
}

func (m *mockWorkerExtension) ThreadActivatedNotification(threadId int) {
m.mu.Lock()
defer m.mu.Unlock()
m.activatedCount++
}

func (m *mockWorkerExtension) ThreadDrainNotification(threadId int) {
m.mu.Lock()
defer m.mu.Unlock()
m.drainCount++
}

func (m *mockWorkerExtension) ThreadDeactivatedNotification(threadId int) {
m.mu.Lock()
defer m.mu.Unlock()
m.deactivatedCount++
}

func (m *mockWorkerExtension) ProvideRequest() *WorkerRequest[any, any] {
return <-m.requestChan
}

func (m *mockWorkerExtension) InjectRequest(r *WorkerRequest[any, any]) {
m.requestChan <- r
m.RequestChan <- r
}

func (m *mockWorkerExtension) GetActivatedCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.activatedCount
return int(m.ActivatedCount.Load())
}

func TestWorkerExtension(t *testing.T) {
Expand Down