@@ -10,9 +10,9 @@ use tonic::{
1010} ;
1111
1212use crate :: {
13- api:: UserInfo ,
13+ api:: { AsyncReceiver , UserInfo } ,
1414 errors:: { ConnectionResult , RemoteResult } ,
15- ext:: InternallyMutable ,
15+ ext:: { IgnorableError , InternallyMutable } ,
1616 network,
1717 workspace:: Workspace ,
1818} ;
@@ -45,6 +45,9 @@ struct ClientInner {
4545 auth : AuthClient < Channel > ,
4646 session : SessionClient < InterceptedService < Channel , network:: SessionInterceptor > > ,
4747 claims : InternallyMutable < Token > ,
48+ poll_tx : tokio:: sync:: mpsc:: UnboundedSender < tokio:: sync:: oneshot:: Sender < ( ) > > ,
49+ callback : tokio:: sync:: watch:: Sender < Option < crate :: api:: controller:: ControllerCallback < Client > > > ,
50+ events : tokio:: sync:: Mutex < tokio:: sync:: mpsc:: UnboundedReceiver < codemp_proto:: session:: session_event:: Event > > ,
4851}
4952
5053impl Client {
@@ -66,17 +69,40 @@ impl Client {
6669 let claims = InternallyMutable :: new ( resp. token ) ;
6770
6871 // TODO move this one into network.rs
69- let session =
72+ let mut session =
7073 SessionClient :: with_interceptor ( channel, network:: SessionInterceptor ( claims. channel ( ) ) ) ;
7174
72- Ok ( Client ( Arc :: new ( ClientInner {
75+ let ( ev_tx, ev_rx) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
76+ let ( poll_tx, poll_rx) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
77+ let ( cb_tx, cb_rx) = tokio:: sync:: watch:: channel ( None ) ;
78+
79+ let stream = session. attach ( Empty { } ) . await ?. into_inner ( ) ;
80+
81+ let worker = ClientWorker {
82+ callback : cb_rx,
83+ pollers : Vec :: new ( ) ,
84+ poll_rx,
85+ events : ev_tx,
86+ } ;
87+
88+ let inner = Arc :: new ( ClientInner {
7389 user : Arc :: new ( resp. user . into ( ) ) ,
7490 workspaces : DashMap :: default ( ) ,
91+ poll_tx,
92+ events : tokio:: sync:: Mutex :: new ( ev_rx) ,
7593 claims,
7694 auth,
7795 session,
7896 config,
79- } ) ) )
97+ callback : cb_tx,
98+ } ) ;
99+
100+ let weak = Arc :: downgrade ( & inner) ;
101+ let _t = tokio:: spawn ( async move {
102+ worker. work ( stream, weak) . await ;
103+ } ) ;
104+
105+ Ok ( Client ( inner) )
80106 }
81107
82108 /// Refresh session token.
@@ -271,3 +297,96 @@ impl Client {
271297 & self . 0 . user
272298 }
273299}
300+
301+ impl AsyncReceiver < codemp_proto:: session:: session_event:: Event > for Client {
302+ async fn try_recv ( & self ) -> crate :: errors:: ControllerResult < Option < codemp_proto:: session:: session_event:: Event > > {
303+ match self . 0 . events . lock ( ) . await . try_recv ( ) {
304+ Ok ( x) => Ok ( Some ( x) ) ,
305+ Err ( tokio:: sync:: mpsc:: error:: TryRecvError :: Empty ) => Ok ( None ) ,
306+ Err ( tokio:: sync:: mpsc:: error:: TryRecvError :: Disconnected ) => Err ( crate :: errors:: ControllerError :: Stopped ) ,
307+ }
308+ }
309+
310+ async fn poll ( & self ) -> crate :: errors:: ControllerResult < ( ) > {
311+ let ( tx, rx) = tokio:: sync:: oneshot:: channel ( ) ;
312+ self . 0 . poll_tx . send ( tx) ?;
313+ Ok ( rx. await ?)
314+ }
315+
316+ fn clear_callback ( & self ) {
317+ self . 0 . callback . send_replace ( None ) ;
318+ }
319+
320+ fn callback ( & self , cb : impl Into < crate :: api:: controller:: ControllerCallback < Self > > ) {
321+ self . 0 . callback . send_replace ( Some ( cb. into ( ) ) ) ;
322+ }
323+ }
324+
325+ struct ClientWorker {
326+ callback : tokio:: sync:: watch:: Receiver < Option < crate :: api:: controller:: ControllerCallback < Client > > > ,
327+ pollers : Vec < tokio:: sync:: oneshot:: Sender < ( ) > > ,
328+ poll_rx : tokio:: sync:: mpsc:: UnboundedReceiver < tokio:: sync:: oneshot:: Sender < ( ) > > ,
329+ events : tokio:: sync:: mpsc:: UnboundedSender < codemp_proto:: session:: session_event:: Event > ,
330+ }
331+
332+ impl ClientWorker {
333+ #[ tracing:: instrument( skip( self , stream, weak) ) ]
334+ pub ( crate ) async fn work (
335+ mut self ,
336+ mut stream : tonic:: Streaming < codemp_proto:: session:: SessionEvent > ,
337+ weak : std:: sync:: Weak < ClientInner > ,
338+ ) {
339+ tracing:: debug!( "client worker starting" ) ;
340+ loop {
341+ tokio:: select! {
342+ res = self . poll_rx. recv( ) => match res {
343+ None => break tracing:: debug!( "pollers channel closed: client has been dropped" ) ,
344+ Some ( x) => self . pollers. push( x) ,
345+ } ,
346+
347+ res = stream. message( ) => match res {
348+ Err ( e) => break tracing:: error!( "client stream closed: {e}" ) ,
349+ Ok ( None ) => break tracing:: info!( "closing client" ) ,
350+ Ok ( Some ( codemp_proto:: session:: SessionEvent { event: None } ) ) => {
351+ tracing:: warn!( "client received empty event" )
352+ }
353+ Ok ( Some ( codemp_proto:: session:: SessionEvent { event: Some ( ev) } ) ) => {
354+ let Some ( _inner) = weak. upgrade( ) else {
355+ break tracing:: debug!( "client worker clean exit" ) ;
356+ } ;
357+ tracing:: debug!( "received client event: {ev:?}" ) ;
358+ match ev. clone( ) {
359+ codemp_proto:: session:: session_event:: Event :: Invite ( invitation_event) => {
360+ tracing:: info!( "got invited to workspace: {invitation_event:?}" ) ;
361+ } ,
362+ codemp_proto:: session:: session_event:: Event :: Leave ( quit_event) => {
363+ tracing:: info!( "user left workspace: {quit_event:?}" ) ;
364+ } ,
365+ codemp_proto:: session:: session_event:: Event :: Join ( accept_event) => {
366+ tracing:: info!( "user accepted invite: {accept_event:?}" ) ;
367+ } ,
368+ codemp_proto:: session:: session_event:: Event :: Reject ( reject_event) => {
369+ tracing:: info!( "user rejected invite: {reject_event:?}" ) ;
370+ } ,
371+ }
372+
373+ if self . events. send( ev) . is_err( ) {
374+ tracing:: warn!( "no active controller to receive workspace event" ) ;
375+ }
376+ self . pollers. drain( ..) . for_each( |x| {
377+ x. send( ( ) ) . unwrap_or_warn( "poller dropped before completion" ) ;
378+ } ) ;
379+ if let Some ( cb) = self . callback. borrow( ) . as_ref( ) {
380+ if let Some ( ws) = weak. upgrade( ) {
381+ cb. call( Client ( ws) ) ;
382+ } else {
383+ break tracing:: debug!( "workspace worker clean (late) exit" ) ;
384+ }
385+ }
386+ }
387+ } ,
388+ }
389+ }
390+ tracing:: debug!( "workspace worker stopping" ) ;
391+ }
392+ }
0 commit comments