-
Notifications
You must be signed in to change notification settings - Fork 107
Expand file tree
/
Copy pathrun_publisher.py
More file actions
60 lines (49 loc) · 1.97 KB
/
run_publisher.py
File metadata and controls
60 lines (49 loc) · 1.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from __future__ import annotations
import asyncio
import uuid
from temporalio.client import Client
from temporalio.common import RawValue
from temporalio.contrib.workflow_streams import WorkflowStreamClient
from workflow_streams.shared import (
TASK_QUEUE,
TOPIC_PROGRESS,
TOPIC_STATUS,
OrderInput,
ProgressEvent,
StatusEvent,
)
from workflow_streams.workflows.order_workflow import OrderWorkflow
async def main() -> None:
client = await Client.connect("localhost:7233")
workflow_id = f"workflow-stream-order-{uuid.uuid4().hex[:8]}"
handle = await client.start_workflow(
OrderWorkflow.run,
OrderInput(order_id="order-1"),
id=workflow_id,
task_queue=TASK_QUEUE,
)
stream = WorkflowStreamClient.create(client, workflow_id)
converter = client.data_converter.payload_converter
# Single iterator over both topics — avoids a cancellation race
# between two concurrent subscribers. result_type=RawValue
# delivers the underlying Payload so we can dispatch heterogeneous
# events on item.topic. The loop ends either on the in-band
# `complete` terminator (break) or because the iterator exhausts
# when the workflow reaches a terminal state without one (e.g. on
# failure). Either way we then await handle.result(), which raises
# if the workflow failed.
async for item in stream.subscribe(
[TOPIC_STATUS, TOPIC_PROGRESS], result_type=RawValue
):
if item.topic == TOPIC_STATUS:
evt = converter.from_payload(item.data.payload, StatusEvent)
print(f"[status] {evt.kind}: order={evt.order_id}")
if evt.kind == "complete":
break
elif item.topic == TOPIC_PROGRESS:
progress = converter.from_payload(item.data.payload, ProgressEvent)
print(f"[progress] {progress.message}")
result = await handle.result()
print(f"workflow result: {result}")
if __name__ == "__main__":
asyncio.run(main())