Skip to content

Commit 74e470c

Browse files
authored
[Python/Beam] Add MailboxProcessor tests and fix bugs (#4360)
1 parent 9217213 commit 74e470c

9 files changed

Lines changed: 666 additions & 114 deletions

File tree

pyrightconfig.ci.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"**/node_modules/**",
66
"temp/tests/Python/test_applicative.py",
77
"temp/tests/Python/test_hash_set.py",
8+
"temp/tests/Python/test_mailbox_processor.py",
89
"temp/tests/Python/test_nested_and_recursive_pattern.py",
910
"temp/tests/Python/fable_modules/thoth_json_python/encode.py"
1011
]

src/fable-library-beam/fable_async_builder.erl

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,14 @@ while(Guard, Computation) ->
144144
false -> zero()
145145
end.
146146

147-
%% For: iterate over list, bind body for each element
148-
for(List, Body) when is_reference(List) ->
149-
for(get(List), Body);
150-
for(List, Body) ->
147+
%% For: iterate over list or enumerable, bind body for each element
148+
for(List, Body) when is_list(List) ->
151149
case List of
152150
[] -> zero();
153151
[H | T] -> bind(Body(H), fun(_) -> for(T, Body) end)
154-
end.
152+
end;
153+
for(Ref, Body) when is_reference(Ref) ->
154+
for(get(Ref), Body);
155+
for(Enumerable, Body) ->
156+
%% Lazy seq or other enumerable — convert to list first
157+
for(fable_utils:to_list(Enumerable), Body).

src/fable-library-beam/fable_mailbox.erl

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@
2020
%% Constructor: create agent with empty queue, not started.
2121
%% State stored in process dict keyed by a unique Ref.
2222
default(Body) -> default(Body, undefined).
23-
default(Body, _CancelToken) ->
23+
default(Body, CancelToken) ->
2424
Ref = make_ref(),
25-
put(Ref, #{body => Body, messages => [], continuation => undefined}),
25+
put(Ref, #{body => Body, messages => [], continuation => undefined, cancel_token => CancelToken}),
2626
#{ref => Ref}.
2727

2828
%% Static Start: create + start + return agent
2929
start(Body) -> start(Body, undefined).
30-
start(Body, _CancelToken) ->
31-
Agent = default(Body),
30+
start(Body, CancelToken) ->
31+
Agent = default(Body, CancelToken),
3232
start_instance(Agent),
3333
Agent.
3434

@@ -81,16 +81,27 @@ post_and_async_reply(Agent, BuildMessage) ->
8181
OnSuccess(Value)
8282
end).
8383

84-
%% Internal: if continuation AND message available, invoke continuation with message
84+
%% Internal: if continuation AND message available, invoke continuation with message.
85+
%% Check cancellation token before processing.
8586
process_events(Agent) ->
8687
Ref = maps:get(ref, Agent),
8788
State = get(Ref),
88-
case {maps:get(continuation, State), maps:get(messages, State)} of
89-
{undefined, _} ->
89+
case maps:get(continuation, State) of
90+
undefined ->
9091
ok;
91-
{_, []} ->
92-
ok;
93-
{Cont, [Msg | Rest]} ->
94-
put(Ref, State#{messages => Rest, continuation => undefined}),
95-
Cont(Msg)
92+
Cont ->
93+
CancelToken = maps:get(cancel_token, State, undefined),
94+
case fable_cancellation:is_cancellation_requested(CancelToken) of
95+
true ->
96+
put(Ref, State#{continuation => undefined}),
97+
ok;
98+
false ->
99+
case maps:get(messages, State) of
100+
[] ->
101+
ok;
102+
[Msg | Rest] ->
103+
put(Ref, State#{messages => Rest, continuation => undefined}),
104+
Cont(Msg)
105+
end
106+
end
96107
end.

src/fable-library-py/fable_library/mailbox_processor.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from collections.abc import Callable
44
from queue import SimpleQueue
55
from threading import RLock
6-
from typing import Any, Generic, TypeVar
6+
from typing import Any
77

88
from .async_ import from_continuations, start_immediate
99
from .async_builder import (
@@ -14,33 +14,29 @@
1414
)
1515

1616

17-
_Msg = TypeVar("_Msg")
18-
_Reply = TypeVar("_Reply")
19-
20-
21-
class AsyncReplyChannel(Generic[_Reply]):
17+
class AsyncReplyChannel[Reply]:
2218
def __init__(self, fn: Callable[[Any], None]) -> None:
2319
self.fn = fn
2420

2521
def reply(self, r: Any) -> None:
2622
self.fn(r)
2723

2824

29-
class MailboxProcessor(Generic[_Msg]):
25+
class MailboxProcessor[Msg]:
3026
def __init__(
3127
self,
32-
body: Callable[[MailboxProcessor[_Msg]], Async[None]],
28+
body: Callable[[MailboxProcessor[Msg]], Async[None]],
3329
cancellation_token: CancellationToken | None = None,
3430
):
35-
self.messages: SimpleQueue[_Msg] = SimpleQueue()
31+
self.messages: SimpleQueue[Msg] = SimpleQueue()
3632
self.token = cancellation_token or CancellationToken()
3733
self.lock = RLock()
3834
self.body = body
3935

4036
# Holds the continuation i.e the `done` callback of Async.from_continuations returned by `receive`.
4137
self.continuation: Continuations[Any] | None = None
4238

43-
def post(self, msg: _Msg) -> None:
39+
def post(self, msg: Msg) -> None:
4440
"""Post a message synchronously to the mailbox processor.
4541
4642
This method is not asynchronous since it's very fast to execute.
@@ -56,7 +52,7 @@ def post(self, msg: _Msg) -> None:
5652
self.messages.put(msg)
5753
self.__process_events()
5854

59-
def post_and_async_reply(self, build_message: Callable[[AsyncReplyChannel[_Reply]], _Msg]) -> Async[_Reply]:
55+
def post_and_async_reply[Reply](self, build_message: Callable[[AsyncReplyChannel[Reply]], Msg]) -> Async[Reply]:
6056
"""Post a message asynchronously to the mailbox processor and
6157
wait for the reply.
6258
@@ -70,7 +66,7 @@ def post_and_async_reply(self, build_message: Callable[[AsyncReplyChannel[_Reply
7066
The reply from mailbox processor.
7167
"""
7268

73-
result: _Reply | None = None
69+
result: Reply | None = None
7470
continuation: Continuations[Any] | None = (
7571
None # This is the continuation for the `done` callback of the awaiting poster.
7672
)
@@ -79,12 +75,12 @@ def check_completion() -> None:
7975
if result is not None and continuation is not None:
8076
continuation[0](result)
8177

82-
def reply_callback(res: _Reply):
78+
def reply_callback(res: Reply):
8379
nonlocal result
8480
result = res
8581
check_completion()
8682

87-
reply_channel: AsyncReplyChannel[_Reply] = AsyncReplyChannel(reply_callback)
83+
reply_channel: AsyncReplyChannel[Reply] = AsyncReplyChannel(reply_callback)
8884
self.messages.put(build_message(reply_channel))
8985
self.__process_events()
9086

@@ -95,7 +91,7 @@ def callback(conts: Continuations[Any]) -> None:
9591

9692
return from_continuations(callback)
9793

98-
def receive(self) -> Async[_Msg]:
94+
def receive(self) -> Async[Msg]:
9995
"""Receive message from mailbox.
10096
10197
Returns:
@@ -139,31 +135,37 @@ def __process_events(self) -> None:
139135
@classmethod
140136
def start(
141137
cls,
142-
body: Callable[[MailboxProcessor[_Msg]], Async[None]],
138+
body: Callable[[MailboxProcessor[Msg]], Async[None]],
143139
cancellation_token: CancellationToken | None = None,
144-
) -> MailboxProcessor[_Msg]:
145-
mbox: MailboxProcessor[_Msg] = MailboxProcessor(body, cancellation_token)
140+
) -> MailboxProcessor[Msg]:
141+
mbox: MailboxProcessor[Msg] = MailboxProcessor(body, cancellation_token)
146142
start_immediate(body(mbox))
147143
return mbox
148144

149145

150-
def receive(mbox: MailboxProcessor[_Msg]) -> Async[_Msg]:
146+
def receive[Msg](mbox: MailboxProcessor[Msg]) -> Async[Msg]:
151147
return mbox.receive()
152148

153149

154-
def post(mbox: MailboxProcessor[_Msg], msg: _Msg):
150+
def post[Msg](mbox: MailboxProcessor[Msg], msg: Msg):
155151
return mbox.post(msg)
156152

157153

154+
def post_and_async_reply[Msg, Reply](
155+
mbox: MailboxProcessor[Msg], build_message: Callable[[AsyncReplyChannel[Reply]], Msg]
156+
) -> Async[Reply]:
157+
return mbox.post_and_async_reply(build_message)
158+
159+
158160
def start_instance(mbox: MailboxProcessor[Any]) -> None:
159161
body = mbox.body(mbox)
160162
return start_immediate(body)
161163

162164

163-
def start(
164-
body: Callable[[MailboxProcessor[_Msg]], Async[None]],
165+
def start[Msg](
166+
body: Callable[[MailboxProcessor[Msg]], Async[None]],
165167
cancellationToken: CancellationToken | None = None,
166-
) -> MailboxProcessor[_Msg]:
168+
) -> MailboxProcessor[Msg]:
167169
mbox = MailboxProcessor(body, cancellationToken)
168170
start_instance(mbox)
169171
return mbox
@@ -173,6 +175,7 @@ def start(
173175
"AsyncReplyChannel",
174176
"MailboxProcessor",
175177
"post",
178+
"post_and_async_reply",
176179
"receive",
177180
"start",
178181
"start_instance",

tests/Beam/LoopTests.fs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,26 @@ let ``test while loop with counter works`` () =
9191
x <- x - 3
9292
count <- count + 1
9393
count |> equal 4
94+
95+
[<Fact>]
96+
let ``test for-in loop with negative step range works`` () =
97+
let mutable result = []
98+
for i in 5 .. -1 .. 1 do
99+
result <- result @ [i]
100+
result |> equal [5; 4; 3; 2; 1]
101+
102+
[<Fact>]
103+
let ``test for-in loop with large negative step range works`` () =
104+
let mutable count = 0
105+
for _i in 1000 .. -1 .. 0 do
106+
count <- count + 1
107+
count |> equal 1001
108+
109+
[<Fact>]
110+
let ``test for-in loop with negative step range works inside async`` () =
111+
async {
112+
let mutable count = 0
113+
for _i in 5 .. -1 .. 1 do
114+
count <- count + 1
115+
equal 5 count
116+
} |> Async.StartImmediate

0 commit comments

Comments
 (0)