diff --git a/README.md b/README.md index d52d296..76e1c56 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ ExecutorService executor = ExecutorTool.hasVirtualThreads() : Executors.newCachedThreadPool(); ExecutorService concurrencyLimitedExecutor = ExecutorTool.hasVirtualThreads() - ? ExecutorTool.newSempahoreVirtualExecutor(10) + ? ExecutorTool.newSemaphoreVirtualExecutor(10) : Executors.newFixedThreadPool(10); Thread thread = ThreadTool.hasVirtualThreads() diff --git a/src/main/java/io/github/thunkware/vt/bridge/ExecutorTool.java b/src/main/java/io/github/thunkware/vt/bridge/ExecutorTool.java index 8cb5442..bcb9f68 100644 --- a/src/main/java/io/github/thunkware/vt/bridge/ExecutorTool.java +++ b/src/main/java/io/github/thunkware/vt/bridge/ExecutorTool.java @@ -73,16 +73,28 @@ public static ExecutorService newVirtualThreadPerTaskExecutor(ThreadCustomizer t * * @param permits number of sempahore permits * @return a new executor with limited concurrency + * @deprecated Because of typo in method name */ + @Deprecated public static ExecutorService newSempahoreVirtualExecutor(int permits) { + return newSemaphoreVirtualExecutor(permits); + } + + /** + * Creates an Executor that starts a new virtual Thread and limits concurrency to the number of semaphore permits + * + * @param permits number of semaphore permits + * @return a new executor with limited concurrency + */ + public static ExecutorService newSemaphoreVirtualExecutor(int permits) { ExecutorService executor = getThreadProvider().newVirtualThreadPerTaskExecutor(); - return new SempahoreExecutor(executor, permits); + return new SemaphoreExecutor(executor, permits); } /** * Creates an Executor that starts a new virtual Thread and limits concurrency * to the number of semaphore permits. - * + * *

* When executing a task with the created ExecutorService if one permit is * available, the task is executed. If no permit is available, then the executor @@ -99,12 +111,40 @@ public static ExecutorService newSempahoreVirtualExecutor(int permits) { * @param acquireTimeout time to wait for acquire a new task when no permit is * available * @return a new executor with limited concurrency + * @deprecated Because of typo in method name */ + @Deprecated public static ExecutorService newSempahoreVirtualExecutor(int permits, Duration acquireTimeout) { + return newSemaphoreVirtualExecutor(permits, acquireTimeout); + } + + /** + * Creates an Executor that starts a new virtual Thread and limits concurrency + * to the number of semaphore permits. + * + *

+ * When executing a task with the created ExecutorService if one permit is + * available, the task is executed. If no permit is available, then the executor + * thread becomes disabled for thread scheduling purposes and lies dormant until + * one of three things happens: + *

+ * + * @param permits number of semaphore permits + * @param acquireTimeout time to wait for acquire a new task when no permit is + * available + * @return a new executor with limited concurrency + */ + public static ExecutorService newSemaphoreVirtualExecutor(int permits, Duration acquireTimeout) { ExecutorService executor = getThreadProvider().newVirtualThreadPerTaskExecutor(); - return new SempahoreExecutor(executor, permits, acquireTimeout); + return new SemaphoreExecutor(executor, permits, acquireTimeout); } + private ExecutorTool() { throw new AssertionError(); } diff --git a/src/main/java/io/github/thunkware/vt/bridge/SemaphoreExecutor.java b/src/main/java/io/github/thunkware/vt/bridge/SemaphoreExecutor.java new file mode 100644 index 0000000..9582ca7 --- /dev/null +++ b/src/main/java/io/github/thunkware/vt/bridge/SemaphoreExecutor.java @@ -0,0 +1,150 @@ +package io.github.thunkware.vt.bridge; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +/** + * Executor that limits concurrency to a number of semaphore permits + * @since 0.05 + */ +public class SemaphoreExecutor implements ExecutorService { + + private final ExecutorService delegate; + private final Semaphore semaphore; + private final Callable aquireStrategy; + + public SemaphoreExecutor(ExecutorService delegate, int permits) { + this(delegate, new Semaphore(permits, true)); + } + + public SemaphoreExecutor(ExecutorService delegate, Semaphore semaphore) { + this.delegate = delegate; + this.semaphore = semaphore; + this.aquireStrategy = this::semaphoreAquireStrategy; + } + + public SemaphoreExecutor(ExecutorService delegate, int permits, Duration acquireTimeout) { + this(delegate, new Semaphore(permits, true), acquireTimeout); + } + + public SemaphoreExecutor(ExecutorService delegate, Semaphore semaphore, Duration acquireTimeout) { + this.delegate = delegate; + this.semaphore = semaphore; + this.aquireStrategy = () -> this.semaphoreTryAquireStrategy(acquireTimeout); + } + + private List> toSemaphoreCallables(Collection> callables) { + return callables.stream() + .map(this::toSemaphoreCallable) + .collect(Collectors.toList()); + } + + private Callable toSemaphoreCallable(Callable callable) { + return () -> { + try { + aquireStrategy.call(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(); + } + + try { + return callable.call(); + } finally { + semaphore.release(); + } + }; + } + + private Runnable toSemaphoreRunnable(Runnable command) { + return () -> toSemaphoreCallable(() -> { + command.run(); + return null; + }); + } + + private Void semaphoreAquireStrategy() throws InterruptedException { + semaphore.acquire(); + return null; + } + + private Void semaphoreTryAquireStrategy(Duration acquireTimeout) throws InterruptedException, TimeoutException { + boolean isAcquired = semaphore.tryAcquire(acquireTimeout.toNanos(), TimeUnit.NANOSECONDS); + + if (!isAcquired) { + throw new TimeoutException(String.format("Permit not acquired before the acquireTimeout %s", acquireTimeout)); + } + + return null; + } + + @Override + public void execute(Runnable command) { + delegate.execute(toSemaphoreRunnable(command)); + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + return delegate.submit(toSemaphoreCallable(task)); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate.submit(toSemaphoreRunnable(task), result); + } + + @Override + public Future submit(Runnable task) { + return delegate.submit(toSemaphoreRunnable(task)); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return delegate.invokeAll(toSemaphoreCallables(tasks)); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return delegate.invokeAll(toSemaphoreCallables(tasks), timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return delegate.invokeAny(toSemaphoreCallables(tasks)); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(toSemaphoreCallables(tasks), timeout, unit); + } + +} \ No newline at end of file diff --git a/src/main/java/io/github/thunkware/vt/bridge/SempahoreExecutor.java b/src/main/java/io/github/thunkware/vt/bridge/SempahoreExecutor.java index f236b50..fa7371e 100644 --- a/src/main/java/io/github/thunkware/vt/bridge/SempahoreExecutor.java +++ b/src/main/java/io/github/thunkware/vt/bridge/SempahoreExecutor.java @@ -14,21 +14,19 @@ /** * Executor that limits concurrency to a number of sempahore permits + * + * @deprecated Because of a typo in class name */ -public class SempahoreExecutor implements ExecutorService { +@Deprecated +public class SempahoreExecutor extends SemaphoreExecutor { - private final ExecutorService delegate; - private final Semaphore semaphore; - private final Callable aquireStrategy; public SempahoreExecutor(ExecutorService delegate, int permits) { this(delegate, new Semaphore(permits, true)); } public SempahoreExecutor(ExecutorService delegate, Semaphore semaphore) { - this.delegate = delegate; - this.semaphore = semaphore; - this.aquireStrategy = this::semaphoreAquireStrategy; + super(delegate, semaphore); } public SempahoreExecutor(ExecutorService delegate, int permits, Duration acquireTimeout) { @@ -36,120 +34,7 @@ public SempahoreExecutor(ExecutorService delegate, int permits, Duration acquire } public SempahoreExecutor(ExecutorService delegate, Semaphore semaphore, Duration acquireTimeout) { - this.delegate = delegate; - this.semaphore = semaphore; - this.aquireStrategy = () -> this.semaphoreTryAquireStrategy(acquireTimeout); - } - - private List> toSemaphoreCallables(Collection> callables) { - return callables.stream() - .map(this::toSemaphoreCallable) - .collect(Collectors.toList()); - } - - private Callable toSemaphoreCallable(Callable callable) { - return () -> { - try { - aquireStrategy.call(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(); - } - - try { - return callable.call(); - } finally { - semaphore.release(); - } - }; - } - - private Runnable toSemaphoreRunnable(Runnable command) { - return () -> toSemaphoreCallable(() -> { - command.run(); - return null; - }); - } - - private Void semaphoreAquireStrategy() throws InterruptedException { - semaphore.acquire(); - return null; - } - - private Void semaphoreTryAquireStrategy(Duration acquireTimeout) throws InterruptedException, TimeoutException { - boolean isAcquired = semaphore.tryAcquire(acquireTimeout.toNanos(), TimeUnit.NANOSECONDS); - - if (!isAcquired) { - throw new TimeoutException(String.format("Permit not acquired before the acquireTimeout %s", acquireTimeout)); - } - - return null; - } - - @Override - public void execute(Runnable command) { - delegate.execute(toSemaphoreRunnable(command)); - } - - @Override - public void shutdown() { - delegate.shutdown(); - } - - @Override - public List shutdownNow() { - return delegate.shutdownNow(); - } - - @Override - public boolean isShutdown() { - return delegate.isShutdown(); - } - - @Override - public boolean isTerminated() { - return delegate.isTerminated(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return delegate.awaitTermination(timeout, unit); - } - - @Override - public Future submit(Callable task) { - return delegate.submit(toSemaphoreCallable(task)); - } - - @Override - public Future submit(Runnable task, T result) { - return delegate.submit(toSemaphoreRunnable(task), result); - } - - @Override - public Future submit(Runnable task) { - return delegate.submit(toSemaphoreRunnable(task)); - } - - @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { - return delegate.invokeAll(toSemaphoreCallables(tasks)); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return delegate.invokeAll(toSemaphoreCallables(tasks), timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return delegate.invokeAny(toSemaphoreCallables(tasks)); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return delegate.invokeAny(toSemaphoreCallables(tasks), timeout, unit); + super(delegate, semaphore, acquireTimeout); } } \ No newline at end of file diff --git a/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool21Test.java b/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool21Test.java index bb508d0..5fdea4e 100644 --- a/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool21Test.java +++ b/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool21Test.java @@ -98,9 +98,9 @@ void testNewThreadPerTaskExecutor() throws InterruptedException { } @Test - void testNewSempahoreVirtualExecutorWithAquireTimeout() throws InterruptedException { + void testNewSemaphoreVirtualExecutorWithAquireTimeout() throws InterruptedException { - ExecutorService executor = ExecutorTool.newSempahoreVirtualExecutor(1, Duration.ofMillis(100)); + ExecutorService executor = ExecutorTool.newSemaphoreVirtualExecutor(1, Duration.ofMillis(100)); assertThat(executor.isShutdown()).isFalse(); assertThat(executor.isTerminated()).isFalse(); diff --git a/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool8Test.java b/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool8Test.java index d960391..95729a3 100644 --- a/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool8Test.java +++ b/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool8Test.java @@ -95,9 +95,9 @@ void testNewThreadPerTaskExecutor() throws InterruptedException { } @Test - void testNewSempahoreVirtualExecutorWithAquireTimeout() throws InterruptedException { + void testNewSemaphoreVirtualExecutorWithAquireTimeout() throws InterruptedException { - ExecutorService executor = ExecutorTool.newSempahoreVirtualExecutor(1, Duration.ofMillis(100)); + ExecutorService executor = ExecutorTool.newSemaphoreVirtualExecutor(1, Duration.ofMillis(100)); assertThat(executor.isShutdown()).isFalse(); assertThat(executor.isTerminated()).isFalse();