11-module (fable_async ).
2- -export ([start_immediate /1 , start_immediate /2 ,
3- run_synchronously /1 , run_synchronously /2 ,
4- start_with_continuations /4 , start_with_continuations /5 ,
5- sleep /1 , parallel /1 , sequential /1 ,
6- catch_async /1 , ignore /1 , from_continuations /1 ,
7- start_as_task /1 ,
8- cancellation_token /0 , create_cancellation_token /0 , create_cancellation_token /1 ,
9- wrap_error /1 ]).
10-
11- -type async_ctx () :: #{on_success := fun (), on_error := fun (), on_cancel := fun (), cancel_token := reference () | undefined }.
2+ -export ([
3+ start_immediate /1 , start_immediate /2 ,
4+ run_synchronously /1 , run_synchronously /2 ,
5+ start_with_continuations /4 , start_with_continuations /5 ,
6+ sleep /1 ,
7+ parallel /1 ,
8+ sequential /1 ,
9+ catch_async /1 ,
10+ ignore /1 ,
11+ from_continuations /1 ,
12+ start_as_task /1 ,
13+ cancellation_token /0 ,
14+ create_cancellation_token /0 , create_cancellation_token /1 ,
15+ wrap_error /1
16+ ]).
17+
18+ -type async_ctx () :: #{
19+ on_success := fun (),
20+ on_error := fun (),
21+ on_cancel := fun (),
22+ cancel_token := reference () | undefined
23+ }.
1224-type async (T ) :: fun ((async_ctx ()) -> T ).
1325
1426-spec start_immediate (async (term ())) -> term ().
1527-spec start_immediate (async (term ()), reference () | undefined ) -> term ().
1628-spec run_synchronously (async (term ())) -> term ().
1729-spec run_synchronously (async (term ()), reference () | undefined ) -> term ().
1830-spec start_with_continuations (async (term ()), fun (), fun (), fun ()) -> term ().
19- -spec start_with_continuations (async (term ()), fun (), fun (), fun (), reference () | undefined ) -> term ().
31+ -spec start_with_continuations (async (term ()), fun (), fun (), fun (), reference () | undefined ) ->
32+ term ().
2033-spec sleep (non_neg_integer ()) -> async (ok ).
2134-spec parallel (list () | reference ()) -> async (reference ()).
2235-spec sequential (list ()) -> async (list ()).
3043
3144% % Default context: run inline in current process
3245default_ctx (CancelToken ) ->
33- #{on_success => fun (_ ) -> ok end ,
34- on_error => fun (E ) -> erlang :error (E ) end ,
35- on_cancel => fun (_ ) -> ok end ,
36- cancel_token => CancelToken }.
46+ #{
47+ on_success => fun (_ ) -> ok end ,
48+ on_error => fun (E ) -> erlang :error (E ) end ,
49+ on_cancel => fun (_ ) -> ok end ,
50+ cancel_token => CancelToken
51+ }.
3752
3853% % StartImmediate: run with default context (fire-and-forget)
3954start_immediate (Computation ) -> start_immediate (Computation , undefined ).
@@ -45,12 +60,16 @@ run_synchronously(Computation) -> run_synchronously(Computation, undefined).
4560run_synchronously (Computation , CancelToken ) ->
4661 % % Use a unique ref as key to store result in process dict
4762 Ref = make_ref (),
48- Ctx = #{on_success => fun (V ) -> put (Ref , {ok , V }) end ,
49- on_error => fun (E ) -> put (Ref , {error , E }) end ,
50- on_cancel => fun (_ ) -> put (Ref , {cancelled }) end ,
51- cancel_token => CancelToken },
52- try Computation (Ctx )
53- catch _ :Err -> put (Ref , {error , Err })
63+ Ctx = #{
64+ on_success => fun (V ) -> put (Ref , {ok , V }) end ,
65+ on_error => fun (E ) -> put (Ref , {error , E }) end ,
66+ on_cancel => fun (_ ) -> put (Ref , {cancelled }) end ,
67+ cancel_token => CancelToken
68+ },
69+ try
70+ Computation (Ctx )
71+ catch
72+ _ :Err -> put (Ref , {error , Err })
5473 end ,
5574 Result = erase (Ref ),
5675 case Result of
@@ -64,10 +83,16 @@ run_synchronously(Computation, CancelToken) ->
6483start_with_continuations (Comp , OnSuccess , OnError , OnCancel ) ->
6584 start_with_continuations (Comp , OnSuccess , OnError , OnCancel , undefined ).
6685start_with_continuations (Comp , OnSuccess , OnError , OnCancel , Token ) ->
67- Ctx = #{on_success => OnSuccess , on_error => OnError ,
68- on_cancel => OnCancel , cancel_token => Token },
69- try Comp (Ctx )
70- catch _ :Err -> OnError (Err )
86+ Ctx = #{
87+ on_success => OnSuccess ,
88+ on_error => OnError ,
89+ on_cancel => OnCancel ,
90+ cancel_token => Token
91+ },
92+ try
93+ Comp (Ctx )
94+ catch
95+ _ :Err -> OnError (Err )
7196 end .
7297
7398% % Sleep: pause current process, with cancellation support
@@ -119,16 +144,20 @@ parallel(Computations) ->
119144 fun (Ctx ) ->
120145 Self = self (),
121146 Indexed = lists :zip (lists :seq (1 , length (Computations )), Computations ),
122- lists :foreach (fun ({Idx , Comp }) ->
123- spawn (fun () ->
124- try
125- Result = fable_async :run_synchronously (Comp ),
126- Self ! {async_parallel , Idx , {ok , Result }}
127- catch _ :Err ->
128- Self ! {async_parallel , Idx , {error , Err }}
129- end
130- end )
131- end , Indexed ),
147+ lists :foreach (
148+ fun ({Idx , Comp }) ->
149+ spawn (fun () ->
150+ try
151+ Result = fable_async :run_synchronously (Comp ),
152+ Self ! {async_parallel , Idx , {ok , Result }}
153+ catch
154+ _ :Err ->
155+ Self ! {async_parallel , Idx , {error , Err }}
156+ end
157+ end )
158+ end ,
159+ Indexed
160+ ),
132161 Results = collect_parallel (length (Computations ), #{}),
133162 case Results of
134163 {ok , Map } ->
@@ -139,7 +168,8 @@ parallel(Computations) ->
139168 end
140169 end .
141170
142- collect_parallel (0 , Acc ) -> {ok , Acc };
171+ collect_parallel (0 , Acc ) ->
172+ {ok , Acc };
143173collect_parallel (N , Acc ) ->
144174 receive
145175 {async_parallel , Idx , {ok , Val }} ->
@@ -165,8 +195,10 @@ catch_async(Computation) ->
165195 on_cancel => maps :get (on_cancel , Ctx ),
166196 cancel_token => maps :get (cancel_token , Ctx )
167197 },
168- try Computation (Ctx2 )
169- catch _ :Err -> (maps :get (on_success , Ctx ))({choice2_of2 , wrap_error (Err )})
198+ try
199+ Computation (Ctx2 )
200+ catch
201+ _ :Err -> (maps :get (on_success , Ctx ))({choice2_of2 , wrap_error (Err )})
170202 end
171203 end .
172204
0 commit comments