diff --git a/README.md b/README.md
index d52d296..76e1c56 100644
--- a/README.md
+++ b/README.md
@@ -13,7 +13,7 @@ ExecutorService executor = ExecutorTool.hasVirtualThreads()
: Executors.newCachedThreadPool();
ExecutorService concurrencyLimitedExecutor = ExecutorTool.hasVirtualThreads()
- ? ExecutorTool.newSempahoreVirtualExecutor(10)
+ ? ExecutorTool.newSemaphoreVirtualExecutor(10)
: Executors.newFixedThreadPool(10);
Thread thread = ThreadTool.hasVirtualThreads()
diff --git a/src/main/java/io/github/thunkware/vt/bridge/ExecutorTool.java b/src/main/java/io/github/thunkware/vt/bridge/ExecutorTool.java
index 8cb5442..bcb9f68 100644
--- a/src/main/java/io/github/thunkware/vt/bridge/ExecutorTool.java
+++ b/src/main/java/io/github/thunkware/vt/bridge/ExecutorTool.java
@@ -73,16 +73,28 @@ public static ExecutorService newVirtualThreadPerTaskExecutor(ThreadCustomizer t
*
* @param permits number of sempahore permits
* @return a new executor with limited concurrency
+ * @deprecated Because of typo in method name
*/
+ @Deprecated
public static ExecutorService newSempahoreVirtualExecutor(int permits) {
+ return newSemaphoreVirtualExecutor(permits);
+ }
+
+ /**
+ * Creates an Executor that starts a new virtual Thread and limits concurrency to the number of semaphore permits
+ *
+ * @param permits number of semaphore permits
+ * @return a new executor with limited concurrency
+ */
+ public static ExecutorService newSemaphoreVirtualExecutor(int permits) {
ExecutorService executor = getThreadProvider().newVirtualThreadPerTaskExecutor();
- return new SempahoreExecutor(executor, permits);
+ return new SemaphoreExecutor(executor, permits);
}
/**
* Creates an Executor that starts a new virtual Thread and limits concurrency
* to the number of semaphore permits.
- *
+ *
*
* When executing a task with the created ExecutorService if one permit is
* available, the task is executed. If no permit is available, then the executor
@@ -99,12 +111,40 @@ public static ExecutorService newSempahoreVirtualExecutor(int permits) {
* @param acquireTimeout time to wait for acquire a new task when no permit is
* available
* @return a new executor with limited concurrency
+ * @deprecated Because of typo in method name
*/
+ @Deprecated
public static ExecutorService newSempahoreVirtualExecutor(int permits, Duration acquireTimeout) {
+ return newSemaphoreVirtualExecutor(permits, acquireTimeout);
+ }
+
+ /**
+ * Creates an Executor that starts a new virtual Thread and limits concurrency
+ * to the number of semaphore permits.
+ *
+ *
+ * When executing a task with the created ExecutorService if one permit is
+ * available, the task is executed. If no permit is available, then the executor
+ * thread becomes disabled for thread scheduling purposes and lies dormant until
+ * one of three things happens:
+ *
+ * - a permit becomes available
+ * - some other thread {@linkplain Thread#interrupt interrupts} the current
+ * thread; or
+ * - the specified waiting time elapses, in which case a Timeout exception is thrown
+ *
+ *
+ * @param permits number of semaphore permits
+ * @param acquireTimeout time to wait for acquire a new task when no permit is
+ * available
+ * @return a new executor with limited concurrency
+ */
+ public static ExecutorService newSemaphoreVirtualExecutor(int permits, Duration acquireTimeout) {
ExecutorService executor = getThreadProvider().newVirtualThreadPerTaskExecutor();
- return new SempahoreExecutor(executor, permits, acquireTimeout);
+ return new SemaphoreExecutor(executor, permits, acquireTimeout);
}
+
private ExecutorTool() {
throw new AssertionError();
}
diff --git a/src/main/java/io/github/thunkware/vt/bridge/SemaphoreExecutor.java b/src/main/java/io/github/thunkware/vt/bridge/SemaphoreExecutor.java
new file mode 100644
index 0000000..9582ca7
--- /dev/null
+++ b/src/main/java/io/github/thunkware/vt/bridge/SemaphoreExecutor.java
@@ -0,0 +1,150 @@
+package io.github.thunkware.vt.bridge;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
+/**
+ * Executor that limits concurrency to a number of semaphore permits
+ * @since 0.05
+ */
+public class SemaphoreExecutor implements ExecutorService {
+
+ private final ExecutorService delegate;
+ private final Semaphore semaphore;
+ private final Callable aquireStrategy;
+
+ public SemaphoreExecutor(ExecutorService delegate, int permits) {
+ this(delegate, new Semaphore(permits, true));
+ }
+
+ public SemaphoreExecutor(ExecutorService delegate, Semaphore semaphore) {
+ this.delegate = delegate;
+ this.semaphore = semaphore;
+ this.aquireStrategy = this::semaphoreAquireStrategy;
+ }
+
+ public SemaphoreExecutor(ExecutorService delegate, int permits, Duration acquireTimeout) {
+ this(delegate, new Semaphore(permits, true), acquireTimeout);
+ }
+
+ public SemaphoreExecutor(ExecutorService delegate, Semaphore semaphore, Duration acquireTimeout) {
+ this.delegate = delegate;
+ this.semaphore = semaphore;
+ this.aquireStrategy = () -> this.semaphoreTryAquireStrategy(acquireTimeout);
+ }
+
+ private List> toSemaphoreCallables(Collection extends Callable> callables) {
+ return callables.stream()
+ .map(this::toSemaphoreCallable)
+ .collect(Collectors.toList());
+ }
+
+ private Callable toSemaphoreCallable(Callable callable) {
+ return () -> {
+ try {
+ aquireStrategy.call();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException();
+ }
+
+ try {
+ return callable.call();
+ } finally {
+ semaphore.release();
+ }
+ };
+ }
+
+ private Runnable toSemaphoreRunnable(Runnable command) {
+ return () -> toSemaphoreCallable(() -> {
+ command.run();
+ return null;
+ });
+ }
+
+ private Void semaphoreAquireStrategy() throws InterruptedException {
+ semaphore.acquire();
+ return null;
+ }
+
+ private Void semaphoreTryAquireStrategy(Duration acquireTimeout) throws InterruptedException, TimeoutException {
+ boolean isAcquired = semaphore.tryAcquire(acquireTimeout.toNanos(), TimeUnit.NANOSECONDS);
+
+ if (!isAcquired) {
+ throw new TimeoutException(String.format("Permit not acquired before the acquireTimeout %s", acquireTimeout));
+ }
+
+ return null;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ delegate.execute(toSemaphoreRunnable(command));
+ }
+
+ @Override
+ public void shutdown() {
+ delegate.shutdown();
+ }
+
+ @Override
+ public List shutdownNow() {
+ return delegate.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return delegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return delegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public Future submit(Callable task) {
+ return delegate.submit(toSemaphoreCallable(task));
+ }
+
+ @Override
+ public Future submit(Runnable task, T result) {
+ return delegate.submit(toSemaphoreRunnable(task), result);
+ }
+
+ @Override
+ public Future> submit(Runnable task) {
+ return delegate.submit(toSemaphoreRunnable(task));
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks) throws InterruptedException {
+ return delegate.invokeAll(toSemaphoreCallables(tasks));
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ return delegate.invokeAll(toSemaphoreCallables(tasks), timeout, unit);
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks) throws InterruptedException, ExecutionException {
+ return delegate.invokeAny(toSemaphoreCallables(tasks));
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return delegate.invokeAny(toSemaphoreCallables(tasks), timeout, unit);
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/io/github/thunkware/vt/bridge/SempahoreExecutor.java b/src/main/java/io/github/thunkware/vt/bridge/SempahoreExecutor.java
index f236b50..fa7371e 100644
--- a/src/main/java/io/github/thunkware/vt/bridge/SempahoreExecutor.java
+++ b/src/main/java/io/github/thunkware/vt/bridge/SempahoreExecutor.java
@@ -14,21 +14,19 @@
/**
* Executor that limits concurrency to a number of sempahore permits
+ *
+ * @deprecated Because of a typo in class name
*/
-public class SempahoreExecutor implements ExecutorService {
+@Deprecated
+public class SempahoreExecutor extends SemaphoreExecutor {
- private final ExecutorService delegate;
- private final Semaphore semaphore;
- private final Callable aquireStrategy;
public SempahoreExecutor(ExecutorService delegate, int permits) {
this(delegate, new Semaphore(permits, true));
}
public SempahoreExecutor(ExecutorService delegate, Semaphore semaphore) {
- this.delegate = delegate;
- this.semaphore = semaphore;
- this.aquireStrategy = this::semaphoreAquireStrategy;
+ super(delegate, semaphore);
}
public SempahoreExecutor(ExecutorService delegate, int permits, Duration acquireTimeout) {
@@ -36,120 +34,7 @@ public SempahoreExecutor(ExecutorService delegate, int permits, Duration acquire
}
public SempahoreExecutor(ExecutorService delegate, Semaphore semaphore, Duration acquireTimeout) {
- this.delegate = delegate;
- this.semaphore = semaphore;
- this.aquireStrategy = () -> this.semaphoreTryAquireStrategy(acquireTimeout);
- }
-
- private List> toSemaphoreCallables(Collection extends Callable> callables) {
- return callables.stream()
- .map(this::toSemaphoreCallable)
- .collect(Collectors.toList());
- }
-
- private Callable toSemaphoreCallable(Callable callable) {
- return () -> {
- try {
- aquireStrategy.call();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException();
- }
-
- try {
- return callable.call();
- } finally {
- semaphore.release();
- }
- };
- }
-
- private Runnable toSemaphoreRunnable(Runnable command) {
- return () -> toSemaphoreCallable(() -> {
- command.run();
- return null;
- });
- }
-
- private Void semaphoreAquireStrategy() throws InterruptedException {
- semaphore.acquire();
- return null;
- }
-
- private Void semaphoreTryAquireStrategy(Duration acquireTimeout) throws InterruptedException, TimeoutException {
- boolean isAcquired = semaphore.tryAcquire(acquireTimeout.toNanos(), TimeUnit.NANOSECONDS);
-
- if (!isAcquired) {
- throw new TimeoutException(String.format("Permit not acquired before the acquireTimeout %s", acquireTimeout));
- }
-
- return null;
- }
-
- @Override
- public void execute(Runnable command) {
- delegate.execute(toSemaphoreRunnable(command));
- }
-
- @Override
- public void shutdown() {
- delegate.shutdown();
- }
-
- @Override
- public List shutdownNow() {
- return delegate.shutdownNow();
- }
-
- @Override
- public boolean isShutdown() {
- return delegate.isShutdown();
- }
-
- @Override
- public boolean isTerminated() {
- return delegate.isTerminated();
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- return delegate.awaitTermination(timeout, unit);
- }
-
- @Override
- public Future submit(Callable task) {
- return delegate.submit(toSemaphoreCallable(task));
- }
-
- @Override
- public Future submit(Runnable task, T result) {
- return delegate.submit(toSemaphoreRunnable(task), result);
- }
-
- @Override
- public Future> submit(Runnable task) {
- return delegate.submit(toSemaphoreRunnable(task));
- }
-
- @Override
- public List> invokeAll(Collection extends Callable> tasks) throws InterruptedException {
- return delegate.invokeAll(toSemaphoreCallables(tasks));
- }
-
- @Override
- public List> invokeAll(Collection extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException {
- return delegate.invokeAll(toSemaphoreCallables(tasks), timeout, unit);
- }
-
- @Override
- public T invokeAny(Collection extends Callable> tasks) throws InterruptedException, ExecutionException {
- return delegate.invokeAny(toSemaphoreCallables(tasks));
- }
-
- @Override
- public T invokeAny(Collection extends Callable> tasks, long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return delegate.invokeAny(toSemaphoreCallables(tasks), timeout, unit);
+ super(delegate, semaphore, acquireTimeout);
}
}
\ No newline at end of file
diff --git a/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool21Test.java b/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool21Test.java
index bb508d0..5fdea4e 100644
--- a/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool21Test.java
+++ b/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool21Test.java
@@ -98,9 +98,9 @@ void testNewThreadPerTaskExecutor() throws InterruptedException {
}
@Test
- void testNewSempahoreVirtualExecutorWithAquireTimeout() throws InterruptedException {
+ void testNewSemaphoreVirtualExecutorWithAquireTimeout() throws InterruptedException {
- ExecutorService executor = ExecutorTool.newSempahoreVirtualExecutor(1, Duration.ofMillis(100));
+ ExecutorService executor = ExecutorTool.newSemaphoreVirtualExecutor(1, Duration.ofMillis(100));
assertThat(executor.isShutdown()).isFalse();
assertThat(executor.isTerminated()).isFalse();
diff --git a/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool8Test.java b/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool8Test.java
index d960391..95729a3 100644
--- a/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool8Test.java
+++ b/src/test/java/io/github/thunkware/vt/bridge/ExecutorTool8Test.java
@@ -95,9 +95,9 @@ void testNewThreadPerTaskExecutor() throws InterruptedException {
}
@Test
- void testNewSempahoreVirtualExecutorWithAquireTimeout() throws InterruptedException {
+ void testNewSemaphoreVirtualExecutorWithAquireTimeout() throws InterruptedException {
- ExecutorService executor = ExecutorTool.newSempahoreVirtualExecutor(1, Duration.ofMillis(100));
+ ExecutorService executor = ExecutorTool.newSemaphoreVirtualExecutor(1, Duration.ofMillis(100));
assertThat(executor.isShutdown()).isFalse();
assertThat(executor.isTerminated()).isFalse();