Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ ExecutorService executor = ExecutorTool.hasVirtualThreads()
: Executors.newCachedThreadPool();

ExecutorService concurrencyLimitedExecutor = ExecutorTool.hasVirtualThreads()
? ExecutorTool.newSempahoreVirtualExecutor(10)
? ExecutorTool.newSemaphoreVirtualExecutor(10)
: Executors.newFixedThreadPool(10);

Thread thread = ThreadTool.hasVirtualThreads()
Expand Down
46 changes: 43 additions & 3 deletions src/main/java/io/github/thunkware/vt/bridge/ExecutorTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,28 @@ public static ExecutorService newVirtualThreadPerTaskExecutor(ThreadCustomizer t
*
* @param permits number of sempahore permits
* @return a new executor with limited concurrency
* @deprecated Because of typo in method name
*/
@Deprecated
public static ExecutorService newSempahoreVirtualExecutor(int permits) {
return newSemaphoreVirtualExecutor(permits);
}

/**
* Creates an Executor that starts a new virtual Thread and limits concurrency to the number of semaphore permits
*
* @param permits number of semaphore permits
* @return a new executor with limited concurrency
*/
public static ExecutorService newSemaphoreVirtualExecutor(int permits) {
ExecutorService executor = getThreadProvider().newVirtualThreadPerTaskExecutor();
return new SempahoreExecutor(executor, permits);
return new SemaphoreExecutor(executor, permits);
}

/**
* Creates an Executor that starts a new virtual Thread and limits concurrency
* to the number of semaphore permits.
*
*
* <p>
* When executing a task with the created ExecutorService if one permit is
* available, the task is executed. If no permit is available, then the executor
Expand All @@ -99,12 +111,40 @@ public static ExecutorService newSempahoreVirtualExecutor(int permits) {
* @param acquireTimeout time to wait for acquire a new task when no permit is
* available
* @return a new executor with limited concurrency
* @deprecated Because of typo in method name
*/
@Deprecated
public static ExecutorService newSempahoreVirtualExecutor(int permits, Duration acquireTimeout) {
return newSemaphoreVirtualExecutor(permits, acquireTimeout);
}

/**
* Creates an Executor that starts a new virtual Thread and limits concurrency
* to the number of semaphore permits.
*
* <p>
* When executing a task with the created ExecutorService if one permit is
* available, the task is executed. If no permit is available, then the executor
* thread becomes disabled for thread scheduling purposes and lies dormant until
* one of three things happens:
* <ul>
* <li>a permit becomes available</li>
* <li>some other thread {@linkplain Thread#interrupt interrupts} the current
* thread; or</li>
* <li>the specified waiting time elapses, in which case a Timeout exception is thrown</li>
* </ul>
*
* @param permits number of semaphore permits
* @param acquireTimeout time to wait for acquire a new task when no permit is
* available
* @return a new executor with limited concurrency
*/
public static ExecutorService newSemaphoreVirtualExecutor(int permits, Duration acquireTimeout) {
ExecutorService executor = getThreadProvider().newVirtualThreadPerTaskExecutor();
return new SempahoreExecutor(executor, permits, acquireTimeout);
return new SemaphoreExecutor(executor, permits, acquireTimeout);
}


private ExecutorTool() {
throw new AssertionError();
}
Expand Down
150 changes: 150 additions & 0 deletions src/main/java/io/github/thunkware/vt/bridge/SemaphoreExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package io.github.thunkware.vt.bridge;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

/**
* Executor that limits concurrency to a number of semaphore permits
* @since 0.05
*/
public class SemaphoreExecutor implements ExecutorService {

private final ExecutorService delegate;
private final Semaphore semaphore;
private final Callable<Void> aquireStrategy;

public SemaphoreExecutor(ExecutorService delegate, int permits) {
this(delegate, new Semaphore(permits, true));
}

public SemaphoreExecutor(ExecutorService delegate, Semaphore semaphore) {
this.delegate = delegate;
this.semaphore = semaphore;
this.aquireStrategy = this::semaphoreAquireStrategy;
}

public SemaphoreExecutor(ExecutorService delegate, int permits, Duration acquireTimeout) {
this(delegate, new Semaphore(permits, true), acquireTimeout);
}

public SemaphoreExecutor(ExecutorService delegate, Semaphore semaphore, Duration acquireTimeout) {
this.delegate = delegate;
this.semaphore = semaphore;
this.aquireStrategy = () -> this.semaphoreTryAquireStrategy(acquireTimeout);
}

private <T> List<Callable<T>> toSemaphoreCallables(Collection<? extends Callable<T>> callables) {
return callables.stream()
.map(this::toSemaphoreCallable)
.collect(Collectors.toList());
}

private <T> Callable<T> toSemaphoreCallable(Callable<T> callable) {
return () -> {
try {
aquireStrategy.call();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException();
}

try {
return callable.call();
} finally {
semaphore.release();
}
};
}

private Runnable toSemaphoreRunnable(Runnable command) {
return () -> toSemaphoreCallable(() -> {
command.run();
return null;
});
}

private Void semaphoreAquireStrategy() throws InterruptedException {
semaphore.acquire();
return null;
}

private Void semaphoreTryAquireStrategy(Duration acquireTimeout) throws InterruptedException, TimeoutException {
boolean isAcquired = semaphore.tryAcquire(acquireTimeout.toNanos(), TimeUnit.NANOSECONDS);

if (!isAcquired) {
throw new TimeoutException(String.format("Permit not acquired before the acquireTimeout %s", acquireTimeout));
}

return null;
}

@Override
public void execute(Runnable command) {
delegate.execute(toSemaphoreRunnable(command));
}

@Override
public void shutdown() {
delegate.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}

@Override
public boolean isShutdown() {
return delegate.isShutdown();
}

@Override
public boolean isTerminated() {
return delegate.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(toSemaphoreCallable(task));
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(toSemaphoreRunnable(task), result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate.submit(toSemaphoreRunnable(task));
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(toSemaphoreCallables(tasks));
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return delegate.invokeAll(toSemaphoreCallables(tasks), timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(toSemaphoreCallables(tasks));
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(toSemaphoreCallables(tasks), timeout, unit);
}

}
127 changes: 6 additions & 121 deletions src/main/java/io/github/thunkware/vt/bridge/SempahoreExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,142 +14,27 @@

/**
* Executor that limits concurrency to a number of sempahore permits
*
* @deprecated Because of a typo in class name
*/
public class SempahoreExecutor implements ExecutorService {
@Deprecated
public class SempahoreExecutor extends SemaphoreExecutor {

private final ExecutorService delegate;
private final Semaphore semaphore;
private final Callable<Void> aquireStrategy;

public SempahoreExecutor(ExecutorService delegate, int permits) {
this(delegate, new Semaphore(permits, true));
}

public SempahoreExecutor(ExecutorService delegate, Semaphore semaphore) {
this.delegate = delegate;
this.semaphore = semaphore;
this.aquireStrategy = this::semaphoreAquireStrategy;
super(delegate, semaphore);
}

public SempahoreExecutor(ExecutorService delegate, int permits, Duration acquireTimeout) {
this(delegate, new Semaphore(permits, true), acquireTimeout);
}

public SempahoreExecutor(ExecutorService delegate, Semaphore semaphore, Duration acquireTimeout) {
this.delegate = delegate;
this.semaphore = semaphore;
this.aquireStrategy = () -> this.semaphoreTryAquireStrategy(acquireTimeout);
}

private <T> List<Callable<T>> toSemaphoreCallables(Collection<? extends Callable<T>> callables) {
return callables.stream()
.map(this::toSemaphoreCallable)
.collect(Collectors.toList());
}

private <T> Callable<T> toSemaphoreCallable(Callable<T> callable) {
return () -> {
try {
aquireStrategy.call();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException();
}

try {
return callable.call();
} finally {
semaphore.release();
}
};
}

private Runnable toSemaphoreRunnable(Runnable command) {
return () -> toSemaphoreCallable(() -> {
command.run();
return null;
});
}

private Void semaphoreAquireStrategy() throws InterruptedException {
semaphore.acquire();
return null;
}

private Void semaphoreTryAquireStrategy(Duration acquireTimeout) throws InterruptedException, TimeoutException {
boolean isAcquired = semaphore.tryAcquire(acquireTimeout.toNanos(), TimeUnit.NANOSECONDS);

if (!isAcquired) {
throw new TimeoutException(String.format("Permit not acquired before the acquireTimeout %s", acquireTimeout));
}

return null;
}

@Override
public void execute(Runnable command) {
delegate.execute(toSemaphoreRunnable(command));
}

@Override
public void shutdown() {
delegate.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}

@Override
public boolean isShutdown() {
return delegate.isShutdown();
}

@Override
public boolean isTerminated() {
return delegate.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(toSemaphoreCallable(task));
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(toSemaphoreRunnable(task), result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate.submit(toSemaphoreRunnable(task));
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(toSemaphoreCallables(tasks));
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return delegate.invokeAll(toSemaphoreCallables(tasks), timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(toSemaphoreCallables(tasks));
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(toSemaphoreCallables(tasks), timeout, unit);
super(delegate, semaphore, acquireTimeout);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ void testNewThreadPerTaskExecutor() throws InterruptedException {
}

@Test
void testNewSempahoreVirtualExecutorWithAquireTimeout() throws InterruptedException {
void testNewSemaphoreVirtualExecutorWithAquireTimeout() throws InterruptedException {

ExecutorService executor = ExecutorTool.newSempahoreVirtualExecutor(1, Duration.ofMillis(100));
ExecutorService executor = ExecutorTool.newSemaphoreVirtualExecutor(1, Duration.ofMillis(100));
assertThat(executor.isShutdown()).isFalse();
assertThat(executor.isTerminated()).isFalse();

Expand Down
Loading