Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions src/main/java/io/github/thunkware/vt/bridge/ExecutorTool.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.thunkware.vt.bridge;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -71,7 +72,7 @@ public static ExecutorService newThreadPerTaskExecutor(ThreadFactory threadFacto
}

/**
* Creates an Executor that starts a new virtual Thread on Java 21 + (or new platform thread on Java 8+) for each task.
* Creates an Executor that starts a new virtual Thread on Java 21+ (or new platform thread on Java 8+) for each task.
* The number of threads created by the Executor is unbounded.
*
* <p> This method is equivalent to invoking
Expand All @@ -85,7 +86,7 @@ public static ExecutorService newVirtualThreadPerTaskExecutor() {
}

/**
* Creates an Executor that starts a new virtual Thread on Java 21 + (or new platform thread on Java 8+) for each task.
* Creates an Executor that starts a new virtual Thread on Java 21+ (or new platform thread on Java 8+) for each task.
* The number of threads created by the Executor is unbounded.
*
* <p> This method is equivalent to invoking
Expand All @@ -96,7 +97,20 @@ public static ExecutorService newVirtualThreadPerTaskExecutor() {
* @return a new executor that creates a new virtual Thread for each task
*/
public static ExecutorService newVirtualThreadPerTaskExecutor(ThreadCustomizer threadCustomizer) {
return getThreadProvider().newVirtualThreadPerTaskExecutor(threadCustomizer);
return getThreadProvider().newVirtualThreadPerTaskExecutor(threadCustomizer, null);
}

/**
* Creates an Executor that starts a new virtual Thread on Java 21+ (or new platform thread on Java 8+) for each task.
* The number of threads created by the Executor is unbounded.
*
* @param threadFactory threadFactory to supply thread name, UncaughtExceptionHandler and ContextClassLoader.
* @return a new executor that creates a new virtual Thread for each task
*/
public static ExecutorService newVirtualThreadPerTaskExecutor(ThreadFactory threadFactory) {
Objects.requireNonNull(threadFactory);
ThreadCustomizer threadCustomizer = thread -> {};
return getThreadProvider().newVirtualThreadPerTaskExecutor(threadCustomizer, threadFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,11 @@ public interface ThreadProvider {
* that creates virtual threads.
*
* @param threadCustomizer ThreadCustomizer to customize new unstarted threads
* @param threadFactory ThreadFactory. can be null
* @return a new executor that creates a new virtual Thread for each task
*/
@ConfigFeature(feature = NEW_VIRTUAL_THREAD_PER_TASK_EXECUTOR)
ExecutorService newVirtualThreadPerTaskExecutor(ThreadCustomizer threadCustomizer);
ExecutorService newVirtualThreadPerTaskExecutor(ThreadCustomizer threadCustomizer, ThreadFactory threadFactory);

/**
* Returns a builder for creating a platform {@code Thread} or {@code ThreadFactory}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ public ExecutorService newVirtualThreadPerTaskExecutor() {
}

@Override
public ExecutorService newVirtualThreadPerTaskExecutor(ThreadCustomizer threadCustomizer) {
public ExecutorService newVirtualThreadPerTaskExecutor(ThreadCustomizer threadCustomizer, ThreadFactory threadFactory) {
config.enforceCompatibilityPolicy(NEW_VIRTUAL_THREAD_PER_TASK_EXECUTOR);

return newThreadPerTaskExecutor(threadCustomizer.asThreadFactory(this::unstartedVirtualThread));
ThreadFactory actualFactory = threadFactory == null ? this::unstartedVirtualThread : threadFactory;
return newThreadPerTaskExecutor(threadCustomizer.asThreadFactory(actualFactory));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ public Thread startVirtualThread(final Runnable task) {

@Override
public Thread unstartedVirtualThread(Runnable task) {
return unstartedVirtualThread(task, config.getThreadCustomizer());
return unstartedVirtualThread(task, config.getThreadCustomizer(), null);
}

private Thread unstartedVirtualThread(Runnable task, ThreadCustomizer threadCustomizer) {
private Thread unstartedVirtualThread(Runnable task, ThreadCustomizer threadCustomizer, ThreadFactory threadFactory) {
Thread thread = Thread.ofVirtual().unstarted(task);
if (threadFactory != null) {
Thread sample = threadFactory.newThread(task);
thread.setName(sample.getName());
thread.setUncaughtExceptionHandler(sample.getUncaughtExceptionHandler());
thread.setContextClassLoader(sample.getContextClassLoader());
}
threadCustomizer.customize(thread);
return thread;
}
Expand All @@ -61,8 +67,8 @@ public ExecutorService newVirtualThreadPerTaskExecutor() {
}

@Override
public ExecutorService newVirtualThreadPerTaskExecutor(ThreadCustomizer threadCustomizer) {
return newThreadPerTaskExecutor(runnable -> unstartedVirtualThread(runnable, threadCustomizer));
public ExecutorService newVirtualThreadPerTaskExecutor(ThreadCustomizer threadCustomizer, ThreadFactory threadFactory) {
return newThreadPerTaskExecutor(runnable -> unstartedVirtualThread(runnable, threadCustomizer, threadFactory));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,22 @@ void testNewVirtualThreadPerTaskExecutorGlobalThreadName() throws Exception {
}
}

@Test
void testNewVirtualThreadPerTaskExecutorThreadFactory() {
AtomicInteger counter = new AtomicInteger();
ThreadFactory threadFactory = runnable -> {
Thread thread = new Thread(runnable);
thread.setName("my-thread-" + counter.getAndIncrement());
return thread;
};
executor = ExecutorTool.newVirtualThreadPerTaskExecutor(threadFactory);
try {
verifyThreadNamePrefix(executor);
} finally {
executor.shutdown();
}
}

private void verifyThreadNamePrefix(ExecutorService executor) {
CountDownLatch latch = new CountDownLatch(2);
Future<?> future1 = executor.submit(() -> {
Expand Down
18 changes: 18 additions & 0 deletions src/test/java/io/github/thunkware/vt/bridge/ExecutorTool8Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.commons.lang3.JavaVersion.JAVA_20;
import static org.apache.commons.lang3.SystemUtils.isJavaVersionAtMost;
Expand Down Expand Up @@ -140,6 +142,22 @@ void testNewVirtualThreadPerTaskExecutorGlobalThreadName() throws Exception {
}
}

@Test
void testNewVirtualThreadPerTaskExecutorThreadFactory() {
AtomicInteger counter = new AtomicInteger();
ThreadFactory threadFactory = runnable -> {
Thread thread = new Thread(runnable);
thread.setName("my-thread-" + counter.getAndIncrement());
return thread;
};
ExecutorService executor = ExecutorTool.newVirtualThreadPerTaskExecutor(threadFactory);
try {
verifyThreadNamePrefix(executor);
} finally {
executor.shutdown();
}
}

private void verifyThreadNamePrefix(ExecutorService executor) {
CountDownLatch latch = new CountDownLatch(2);
Future<?> future1 = executor.submit(() -> {
Expand Down