Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 15 additions & 111 deletions src/Internal/ServiceContainer.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

namespace Temporal\Internal;

use JetBrains\PhpStorm\Immutable;
use Spiral\Attributes\ReaderInterface;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Exception\ExceptionInterceptorInterface;
Expand All @@ -30,129 +29,34 @@
use Temporal\Worker\LoopInterface;
use Temporal\WorkerFactory;

#[Immutable]
final class ServiceContainer
{
/**
* @var LoopInterface
*/
#[Immutable]
public LoopInterface $loop;

/**
* @var ClientInterface
*/
#[Immutable]
public ClientInterface $client;

/**
* @var ReaderInterface
*/
#[Immutable]
public ReaderInterface $reader;

/**
* @var EnvironmentInterface
*/
#[Immutable]
public EnvironmentInterface $env;

/**
* @var MarshallerInterface<array>
*/
#[Immutable]
public MarshallerInterface $marshaller;

/**
* @var RepositoryInterface<WorkflowPrototype>
*/
#[Immutable]
public RepositoryInterface $workflows;

/**
* @var ProcessCollection
*/
#[Immutable]
public ProcessCollection $running;

/**
* @var ActivityCollection
*/
#[Immutable]
public ActivityCollection $activities;
/** @var RepositoryInterface<WorkflowPrototype> */
public readonly RepositoryInterface $workflows;
public readonly ProcessCollection $running;
public readonly ActivityCollection $activities;
public readonly WorkflowReader $workflowsReader;
public readonly ActivityReader $activitiesReader;

/**
* @var QueueInterface
*/
#[Immutable]
public QueueInterface $queue;

/**
* @var DataConverterInterface
*/
#[Immutable]
public DataConverterInterface $dataConverter;

/**
* @var WorkflowReader
*/
#[Immutable]
public WorkflowReader $workflowsReader;

/**
* @var ActivityReader
*/
#[Immutable]
public ActivityReader $activitiesReader;

/**
* @var ExceptionInterceptorInterface
*/
public ExceptionInterceptorInterface $exceptionInterceptor;

/**
* @var PipelineProvider
*/
public PipelineProvider $interceptorProvider;

/**
* @param LoopInterface $loop
* @param EnvironmentInterface $env
* @param ClientInterface $client
* @param ReaderInterface $reader
* @param QueueInterface $queue
* @param MarshallerInterface<array> $marshaller
* @param DataConverterInterface $dataConverter
* @param ExceptionInterceptorInterface $exceptionInterceptor
* @param PipelineProvider $interceptorProvider
*/
public function __construct(
LoopInterface $loop,
EnvironmentInterface $env,
ClientInterface $client,
ReaderInterface $reader,
QueueInterface $queue,
MarshallerInterface $marshaller,
DataConverterInterface $dataConverter,
ExceptionInterceptorInterface $exceptionInterceptor,
PipelineProvider $interceptorProvider,
public readonly LoopInterface $loop,
public readonly EnvironmentInterface $env,
public readonly ClientInterface $client,
public readonly ReaderInterface $reader,
public readonly QueueInterface $queue,
public readonly MarshallerInterface $marshaller,
public readonly DataConverterInterface $dataConverter,
public readonly ExceptionInterceptorInterface $exceptionInterceptor,
public readonly PipelineProvider $interceptorProvider,
) {
$this->env = $env;
$this->loop = $loop;
$this->client = $client;
$this->reader = $reader;
$this->queue = $queue;
$this->marshaller = $marshaller;
$this->dataConverter = $dataConverter;
$this->interceptorProvider = $interceptorProvider;

$this->workflows = new WorkflowCollection();
$this->activities = new ActivityCollection();
$this->running = new ProcessCollection();

$this->workflowsReader = new WorkflowReader($this->reader);
$this->activitiesReader = new ActivityReader($this->reader);
$this->exceptionInterceptor = $exceptionInterceptor;
}

public static function fromWorkerFactory(
Expand Down
2 changes: 2 additions & 0 deletions src/Internal/Workflow/Process/DeferredGenerator.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use Temporal\DataConverter\ValuesInterface;

/**
* A wrapper around a generator that doesn't start the wrapped generator ASAP.
*
* @implements \Iterator<mixed, mixed>
*
* @internal
Expand Down
14 changes: 14 additions & 0 deletions src/Internal/Workflow/Process/HandlerState.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Temporal\Internal\Workflow\Process;

/**
* @internal
*/
final class HandlerState
{
public int $updates = 0;
public int $signals = 0;
}
30 changes: 18 additions & 12 deletions src/Internal/Workflow/Process/Scope.php
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ public function start(\Closure $handler, ValuesInterface $values = null, bool $d
*/
public function startUpdate(callable $handler, ValuesInterface $values, Deferred $resolver): void
{
// Update handler counter
++$this->context->getHandlerState()->updates;
$this->then(
fn() => --$this->context->getHandlerState()->updates,
fn() => --$this->context->getHandlerState()->updates,
);

$this->then(
$resolver->resolve(...),
function (\Throwable $error) use ($resolver): void {
Expand All @@ -186,6 +193,13 @@ function (\Throwable $error) use ($resolver): void {
*/
public function startSignal(callable $handler, ValuesInterface $values): void
{
// Update handler counter
++$this->context->getHandlerState()->signals;
$this->then(
fn() => --$this->context->getHandlerState()->signals,
fn() => --$this->context->getHandlerState()->signals,
);

// Create a coroutine generator
$this->coroutine = $this->callSignalOrUpdateHandler($handler, $values);
$this->next();
Expand Down Expand Up @@ -300,6 +314,7 @@ public function onAwait(Deferred $deferred): void

// do not cancel already complete promises
$cleanup = function () use ($cancelID): void {
$this->makeCurrent();
$this->context->resolveConditions();
unset($this->onCancel[$cancelID]);
};
Expand Down Expand Up @@ -543,19 +558,10 @@ private function onResult(mixed $result): void
}
}

/**
* @param \Closure $tick
* @return mixed
*/
private function defer(\Closure $tick)
private function defer(\Closure $tick): void
{
$listener = $this->services->loop->once($this->layer, $tick);

if ($this->services->queue->count() === 0) {
$this->services->loop->tick();
}

return $listener;
$this->services->loop->once($this->layer, $tick);
$this->services->queue->count() === 0 and $this->services->loop->tick();
}

public function destroy(): void
Expand Down
1 change: 1 addition & 0 deletions src/Internal/Workflow/ScopeContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public static function fromWorkflowContext(
$context->workflowInstance,
$context->input,
$context->getLastCompletionResultValues(),
$context->handlers,
);

$ctx->parent = $context;
Expand Down
42 changes: 33 additions & 9 deletions src/Internal/Workflow/WorkflowContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
use Temporal\Internal\Transport\Request\Panic;
use Temporal\Internal\Transport\Request\SideEffect;
use Temporal\Internal\Transport\Request\UpsertSearchAttributes;
use Temporal\Internal\Workflow\Process\HandlerState;
use Temporal\Promise;
use Temporal\Worker\Transport\Command\RequestInterface;
use Temporal\Workflow\ActivityStubInterface;
Expand Down Expand Up @@ -86,12 +87,16 @@ class WorkflowContext implements WorkflowContextInterface, HeaderCarrier, Destro
/** @var Pipeline<WorkflowOutboundCallsInterceptor, PromiseInterface> */
private Pipeline $callsInterceptor;

/**
* @param HandlerState $handlers Counter of active Update and Signal handlers
*/
public function __construct(
protected ServiceContainer $services,
protected ClientInterface $client,
protected WorkflowInstanceInterface&Destroyable $workflowInstance,
protected Input $input,
protected ?ValuesInterface $lastCompletionResult = null
protected ?ValuesInterface $lastCompletionResult = null,
protected HandlerState $handlers = new HandlerState(),
) {
$this->requestInterceptor = $services->interceptorProvider
->getPipeline(WorkflowOutboundRequestInterceptor::class);
Expand Down Expand Up @@ -514,6 +519,14 @@ public function getStackTrace(): string
return StackRenderer::renderTrace($this->trace);
}

/**
* {@inheritDoc}
*/
public function allHandlersFinished(): bool
{
return $this->handlers->signals === 0 && $this->handlers->updates === 0;
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -619,6 +632,25 @@ public function uuid7(?DateTimeInterface $dateTime = null): PromiseInterface
return $this->sideEffect(static fn(): UuidInterface => \Ramsey\Uuid\Uuid::uuid7($dateTime));
}

/**
* @internal
*/
public function getHandlerState(): HandlerState
{
return $this->handlers;
}

/**
* @internal
*/
#[\Override]
public function destroy(): void
{
$this->awaits = [];
$this->workflowInstance->destroy();
unset($this->workflowInstance);
}

/**
* @param callable|PromiseInterface ...$conditions
*/
Expand Down Expand Up @@ -691,12 +723,4 @@ protected function recordTrace(): void
{
$this->trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS);
}

#[\Override]
public function destroy(): void
{
$this->awaits = [];
$this->workflowInstance->destroy();
unset($this->workflowInstance);
}
}
20 changes: 20 additions & 0 deletions src/Workflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,26 @@ public static function getStackTrace(): string
return self::getCurrentContext()->getStackTrace();
}

/**
* Whether update and signal handlers have finished executing.
*
* Consider waiting on this condition before workflow return or continue-as-new, to prevent
* interruption of in-progress handlers by workflow exit:
*
* ```php
* yield Workflow.await(static fn() => Workflow::allHandlersFinished());
* ```
*
* @return bool True if all handlers have finished executing.
*/
public static function allHandlersFinished(): bool
{
/** @var ScopedContextInterface $context */
$context = self::getCurrentContext();

return $context->allHandlersFinished();
}

/**
* Upsert search attributes
*
Expand Down
27 changes: 27 additions & 0 deletions src/Workflow/HandlerUnfinishedPolicy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Temporal\Workflow;

/**
* Actions taken if a workflow terminates with running handlers.
*
* Policy defining actions taken when a workflow exits while update or signal handlers are running.
* The workflow exit may be due to successful return, failure, cancellation, or continue-as-new.
*/
enum HandlerUnfinishedPolicy
{
/**
* Issue a warning in addition to abandoning.
*/
case WARN_AND_ABANDON;

/**
* Abandon the handler.
*
* In the case of an update handler, this means that the client will receive an error rather than
* the update result.
*/
case ABANDON;
}
14 changes: 14 additions & 0 deletions src/Workflow/WorkflowContextInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,20 @@ public function awaitWithTimeout($interval, ...$conditions): PromiseInterface;
*/
public function getStackTrace(): string;

/**
* Whether update and signal handlers have finished executing.
*
* Consider waiting on this condition before workflow return or continue-as-new, to prevent
* interruption of in-progress handlers by workflow exit:
*
* ```php
* yield Workflow.await(static fn() => Workflow::allHandlersFinished());
* ```
*
* @return bool True if all handlers have finished executing.
*/
public function allHandlersFinished(): bool;

/**
* @param array<string, mixed> $searchAttributes
*/
Expand Down
Loading