Skip to content

Commit e58b9c3

Browse files
committed
Add a main_handler which is passed an argument to run the proper main loop
1 parent 729d7cc commit e58b9c3

2 files changed

Lines changed: 48 additions & 2 deletions

File tree

rayon-core/src/lib.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ pub struct ThreadPoolBuilder {
144144
/// Closure invoked on worker thread exit.
145145
exit_handler: Option<Box<ExitHandler>>,
146146

147+
/// Closure invoked on worker thread start.
148+
main_handler: Option<Box<MainHandler>>,
149+
147150
/// If false, worker threads will execute spawned jobs in a
148151
/// "depth-first" fashion. If true, they will do a "breadth-first"
149152
/// fashion. Depth-first is the default.
@@ -173,6 +176,12 @@ type StartHandler = Fn(usize) + Send + Sync;
173176
/// Note that this same closure may be invoked multiple times in parallel.
174177
type ExitHandler = Fn(usize) + Send + Sync;
175178

179+
/// The type for a closure that gets invoked with a
180+
/// function which runs rayon tasks.
181+
/// The closure is passed the index of the thread on which it is invoked.
182+
/// Note that this same closure may be invoked multiple times in parallel.
183+
type MainHandler = Fn(usize, &mut FnMut()) + Send + Sync;
184+
176185
impl ThreadPoolBuilder {
177186
/// Creates and returns a valid rayon thread pool builder, but does not initialize it.
178187
pub fn new() -> ThreadPoolBuilder {
@@ -383,6 +392,23 @@ impl ThreadPoolBuilder {
383392
self.exit_handler = Some(Box::new(exit_handler));
384393
self
385394
}
395+
396+
/// Takes the current thread main callback, leaving `None`.
397+
fn take_main_handler(&mut self) -> Option<Box<MainHandler>> {
398+
self.main_handler.take()
399+
}
400+
401+
/// Set a callback to be invoked on thread main.
402+
///
403+
/// The closure is passed the index of the thread on which it is invoked.
404+
/// Note that this same closure may be invoked multiple times in parallel.
405+
/// If this closure panics, the panic will be passed to the panic handler.
406+
pub fn main_handler<H>(mut self, main_handler: H) -> ThreadPoolBuilder
407+
where H: Fn(usize, &mut FnMut()) + Send + Sync + 'static
408+
{
409+
self.main_handler = Some(Box::new(main_handler));
410+
self
411+
}
386412
}
387413

388414
#[allow(deprecated)]
@@ -500,6 +526,7 @@ impl fmt::Debug for ThreadPoolBuilder {
500526
ref panic_handler,
501527
ref stack_size,
502528
ref start_handler,
529+
ref main_handler,
503530
ref exit_handler,
504531
ref breadth_first,
505532
} = *self;
@@ -516,6 +543,7 @@ impl fmt::Debug for ThreadPoolBuilder {
516543
let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
517544
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
518545
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
546+
let main_handler = main_handler.as_ref().map(|_| ClosurePlaceholder);
519547

520548
f.debug_struct("ThreadPoolBuilder")
521549
.field("num_threads", num_threads)
@@ -524,6 +552,7 @@ impl fmt::Debug for ThreadPoolBuilder {
524552
.field("stack_size", &stack_size)
525553
.field("start_handler", &start_handler)
526554
.field("exit_handler", &exit_handler)
555+
.field("main_handler", &main_handler)
527556
.field("breadth_first", &breadth_first)
528557
.finish()
529558
}

rayon-core/src/registry.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use std::thread;
1818
use std::usize;
1919
use unwind;
2020
use util::leak;
21-
use {ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder};
21+
use {ErrorKind, ExitHandler, PanicHandler, StartHandler,
22+
MainHandler, ThreadPoolBuildError, ThreadPoolBuilder};
2223

2324
pub struct Registry {
2425
thread_infos: Vec<ThreadInfo>,
@@ -28,6 +29,7 @@ pub struct Registry {
2829
panic_handler: Option<Box<PanicHandler>>,
2930
start_handler: Option<Box<StartHandler>>,
3031
exit_handler: Option<Box<ExitHandler>>,
32+
main_handler: Option<Box<MainHandler>>,
3133

3234
// When this latch reaches 0, it means that all work on this
3335
// registry must be complete. This is ensured in the following ways:
@@ -117,6 +119,7 @@ impl Registry {
117119
terminate_latch: CountLatch::new(),
118120
panic_handler: builder.take_panic_handler(),
119121
start_handler: builder.take_start_handler(),
122+
main_handler: builder.take_main_handler(),
120123
exit_handler: builder.take_exit_handler(),
121124
});
122125

@@ -689,7 +692,21 @@ unsafe fn main_loop(
689692
}
690693
}
691694

692-
worker_thread.wait_until(&registry.terminate_latch);
695+
let mut work = || {
696+
worker_thread.wait_until(&registry.terminate_latch);
697+
};
698+
699+
if let Some(ref handler) = registry.main_handler {
700+
match unwind::halt_unwinding(|| handler(index, &mut work)) {
701+
Ok(()) => {
702+
}
703+
Err(err) => {
704+
registry.handle_panic(err);
705+
}
706+
}
707+
} else {
708+
work();
709+
}
693710

694711
// Should not be any work left in our queue.
695712
debug_assert!(worker_thread.take_local_job().is_none());

0 commit comments

Comments
 (0)