Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,24 @@ public final class ByteBuddyProxyFactory implements ProxyFactory {
public <T> @Nullable T createProxy(Class<T> clazz, MethodInterceptor interceptor) {
// Cannot proxy final classes
if (Modifier.isFinal(clazz.getModifiers())) {
throw new IllegalArgumentException("Class " + clazz + " is final, cannot be proxied.");
if (ReflectionUtils.isKotlinClass(clazz)) {
throw new IllegalArgumentException(
clazz
+
"""
is not open, cannot be proxied. Suggestions:
* Extract the @Handler annotated functions in an interface
* Make the class and all its @Handler annotated functions 'open'
* Use the Kotlin allopen compiler plugin https://kotlinlang.org/docs/all-open-plugin.html with the following configuration:

allOpen {
annotations("dev.restate.sdk.annotation.Service", "dev.restate.sdk.annotation.VirtualObject", "dev.restate.sdk.annotation.Workflow")
}
""");
}
throw new IllegalArgumentException(
clazz
+ " is final, cannot be proxied. Remove the final keyword, or refactor it extracting the restate interface out of it.");
}

try {
Expand Down Expand Up @@ -88,7 +105,7 @@ public Method getMethod() {

return proxyInstance;
} catch (Exception e) {
throw new IllegalArgumentException("Cannot create proxy for class " + clazz, e);
throw new IllegalArgumentException("Cannot create proxy for " + clazz, e);
}
}

Expand All @@ -109,10 +126,12 @@ private <T> Class<?> generateProxyClass(Class<T> clazz) throws NoSuchFieldExcept
: byteBuddy.subclass(clazz);

var annotationMatcher =
isAnnotatedWith(Handler.class)
.or(isAnnotatedWith(Exclusive.class))
.or(isAnnotatedWith(Shared.class))
.or(isAnnotatedWith(Workflow.class));
not(isStatic())
.and(
isAnnotatedWith(Handler.class)
.or(isAnnotatedWith(Exclusive.class))
.or(isAnnotatedWith(Shared.class))
.or(isAnnotatedWith(Workflow.class)));
try (var unloaded =
builder
// Add a field to store the interceptor
Expand Down
8 changes: 7 additions & 1 deletion client-kotlin/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ plugins {

description = "Restate Client to interact with services from within other Kotlin applications"

configurations.all {
// Gonna conflict with sdk-serde-kotlinx
exclude(group = "dev.restate", module = "sdk-serde-jackson")
}

dependencies {
api(project(":client")) { exclude("dev.restate", "sdk-serde-jackson") }
api(project(":client"))
api(project(":sdk-serde-kotlinx"))

implementation(project(":common-kotlin"))
implementation(libs.kotlinx.coroutines.core)
}
308 changes: 308 additions & 0 deletions client-kotlin/src/main/kotlin/dev/restate/client/kotlin/ingress.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,20 @@ import dev.restate.client.RequestOptions
import dev.restate.client.Response
import dev.restate.client.ResponseHead
import dev.restate.client.SendResponse
import dev.restate.common.InvocationOptions
import dev.restate.common.Output
import dev.restate.common.Request
import dev.restate.common.Target
import dev.restate.common.WorkflowRequest
import dev.restate.common.reflection.kotlin.RequestCaptureProxy
import dev.restate.common.reflection.kotlin.captureInvocation
import dev.restate.common.reflections.ProxySupport
import dev.restate.common.reflections.ReflectionUtils
import dev.restate.serde.TypeTag
import dev.restate.serde.kotlinx.typeTag
import kotlin.coroutines.Continuation
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.startCoroutine
import kotlin.time.Duration
import kotlin.time.toJavaDuration
import kotlinx.coroutines.future.await
Expand Down Expand Up @@ -262,3 +270,303 @@ val <Res> Response<Res>.response: Res
/** @see SendResponse.sendStatus */
val <Res> SendResponse<Res>.sendStatus: SendResponse.SendStatus
get() = this.sendStatus()

/**
* Create a proxy client for a Restate service.
*
* Example usage:
* ```kotlin
* val greeter = client.service<Greeter>()
* val response = greeter.greet("Alice")
* ```
*
* @param SVC the service class annotated with @Service
* @return a proxy client to invoke the service
*/
@org.jetbrains.annotations.ApiStatus.Experimental
inline fun <reified SVC : Any> Client.service(): SVC {
return service(this, SVC::class.java)
}

/**
* Create a proxy client for a Restate virtual object.
*
* Example usage:
* ```kotlin
* val counter = client.virtualObject<Counter>("my-key")
* val value = counter.increment()
* ```
*
* @param SVC the virtual object class annotated with @VirtualObject
* @param key the key identifying the specific virtual object instance
* @return a proxy client to invoke the virtual object
*/
@org.jetbrains.annotations.ApiStatus.Experimental
inline fun <reified SVC : Any> Client.virtualObject(key: String): SVC {
return virtualObject(this, SVC::class.java, key)
}

/**
* Create a proxy client for a Restate workflow.
*
* Example usage:
* ```kotlin
* val wf = client.workflow<MyWorkflow>("wf-123")
* val result = wf.run("input")
* ```
*
* @param SVC the workflow class annotated with @Workflow
* @param key the key identifying the specific workflow instance
* @return a proxy client to invoke the workflow
*/
@org.jetbrains.annotations.ApiStatus.Experimental
inline fun <reified SVC : Any> Client.workflow(key: String): SVC {
return workflow(this, SVC::class.java, key)
}

/**
* Create a proxy for a service that uses the ingress client to make calls.
*
* @param client the ingress client to use for calls
* @param clazz the service class
* @return a proxy that intercepts method calls and executes them via the client
*/
@PublishedApi
internal fun <SVC : Any> service(client: Client, clazz: Class<SVC>): SVC {
ReflectionUtils.mustHaveServiceAnnotation(clazz)
require(ReflectionUtils.isKotlinClass(clazz)) {
"Using Java classes with Kotlin's API is not supported"
}

val serviceName = ReflectionUtils.extractServiceName(clazz)
return ProxySupport.createProxy(clazz) { invocation ->
val request = invocation.captureInvocation(serviceName, null).toRequest()
@Suppress("UNCHECKED_CAST") val continuation = invocation.arguments.last() as Continuation<Any?>

// Start a coroutine that calls the client and resumes the continuation
val suspendBlock: suspend () -> Any? = { client.callAsync(request).await().response() }
suspendBlock.startCoroutine(continuation)
COROUTINE_SUSPENDED
}
}

/**
* Create a proxy for a virtual object that uses the ingress client to make calls.
*
* @param client the ingress client to use for calls
* @param clazz the virtual object class
* @param key the virtual object key
* @return a proxy that intercepts method calls and executes them via the client
*/
@PublishedApi
internal fun <SVC : Any> virtualObject(client: Client, clazz: Class<SVC>, key: String): SVC {
ReflectionUtils.mustHaveVirtualObjectAnnotation(clazz)
require(ReflectionUtils.isKotlinClass(clazz)) {
"Using Java classes with Kotlin's API is not supported"
}

val serviceName = ReflectionUtils.extractServiceName(clazz)
return ProxySupport.createProxy(clazz) { invocation ->
val request = invocation.captureInvocation(serviceName, key).toRequest()
@Suppress("UNCHECKED_CAST") val continuation = invocation.arguments.last() as Continuation<Any?>

// Start a coroutine that calls the client and resumes the continuation
val suspendBlock: suspend () -> Any? = { client.callAsync(request).await().response() }
suspendBlock.startCoroutine(continuation)
COROUTINE_SUSPENDED
}
}

/**
* Create a proxy for a workflow that uses the ingress client to make calls.
*
* @param client the ingress client to use for calls
* @param clazz the workflow class
* @param key the workflow key
* @return a proxy that intercepts method calls and executes them via the client
*/
@PublishedApi
internal fun <SVC : Any> workflow(client: Client, clazz: Class<SVC>, key: String): SVC {
ReflectionUtils.mustHaveWorkflowAnnotation(clazz)
require(ReflectionUtils.isKotlinClass(clazz)) {
"Using Java classes with Kotlin's API is not supported"
}

val serviceName = ReflectionUtils.extractServiceName(clazz)
return ProxySupport.createProxy(clazz) { invocation ->
val request = invocation.captureInvocation(serviceName, key).toRequest()
@Suppress("UNCHECKED_CAST") val continuation = invocation.arguments.last() as Continuation<Any?>

// Start a coroutine that calls the client and resumes the continuation
val suspendBlock: suspend () -> Any? = { client.callAsync(request).await().response() }
suspendBlock.startCoroutine(continuation)
COROUTINE_SUSPENDED
}
}

/**
* Builder for creating type-safe requests.
*
* This builder allows the response type to be inferred from the lambda passed to [request].
*
* @param SVC the service/virtual object/workflow class
*/
@org.jetbrains.annotations.ApiStatus.Experimental
class KClientRequestBuilder<SVC : Any>
@PublishedApi
internal constructor(
private val client: Client,
private val clazz: Class<SVC>,
private val key: String?,
) {
/**
* Create a request by invoking a method on the target.
*
* The response type is inferred from the return type of the invoked method.
*
* @param Res the response type (inferred from the lambda)
* @param block a suspend lambda that invokes a method on the target
* @return a [KClientRequest] with the correct response type
*/
@Suppress("UNCHECKED_CAST")
fun <Res> request(block: suspend SVC.() -> Res): KClientRequest<Any?, Res> {
return KClientRequestImpl(
client,
RequestCaptureProxy(clazz, key).capture(block as suspend SVC.() -> Any?).toRequest(),
)
as KClientRequest<Any?, Res>
}
}

/**
* Kotlin-idiomatic request for invoking Restate services from an ingress client.
*
* Example usage:
* ```kotlin
* client.toService<CounterKt>()
* .request { add(1) }
* .options { idempotencyKey = "123" }
* .call()
* ```
*
* @param Req the request type
* @param Res the response type
*/
@org.jetbrains.annotations.ApiStatus.Experimental
interface KClientRequest<Req, Res> : Request<Req, Res> {

/**
* Configure invocation options using a DSL.
*
* @param block builder block for options
* @return a new request with the configured options
*/
fun options(block: InvocationOptions.Builder.() -> Unit): KClientRequest<Req, Res>

/**
* Call the target handler and wait for the response.
*
* @return the response
*/
suspend fun call(): Response<Res>

/**
* Send the request without waiting for the response.
*
* @param delay optional delay before the invocation is executed
* @return the send response with invocation handle
*/
suspend fun send(delay: Duration? = null): SendResponse<Res>
}

/**
* Create a builder for invoking a Restate service.
*
* Example usage:
* ```kotlin
* val response = client.toService<Greeter>()
* .request { greet("Alice") }
* .call()
* ```
*
* @param SVC the service class annotated with @Service
* @return a builder for creating typed requests
*/
@org.jetbrains.annotations.ApiStatus.Experimental
inline fun <reified SVC : Any> Client.toService(): KClientRequestBuilder<SVC> {
ReflectionUtils.mustHaveServiceAnnotation(SVC::class.java)
require(ReflectionUtils.isKotlinClass(SVC::class.java)) {
"Using Java classes with Kotlin's API is not supported"
}
return KClientRequestBuilder(this, SVC::class.java, null)
}

/**
* Create a builder for invoking a Restate virtual object.
*
* Example usage:
* ```kotlin
* val response = client.toVirtualObject<Counter>("my-counter")
* .request { add(1) }
* .call()
* ```
*
* @param SVC the virtual object class annotated with @VirtualObject
* @param key the key identifying the specific virtual object instance
* @return a builder for creating typed requests
*/
@org.jetbrains.annotations.ApiStatus.Experimental
inline fun <reified SVC : Any> Client.toVirtualObject(key: String): KClientRequestBuilder<SVC> {
ReflectionUtils.mustHaveVirtualObjectAnnotation(SVC::class.java)
require(ReflectionUtils.isKotlinClass(SVC::class.java)) {
"Using Java classes with Kotlin's API is not supported"
}
return KClientRequestBuilder(this, SVC::class.java, key)
}

/**
* Create a builder for invoking a Restate workflow.
*
* Example usage:
* ```kotlin
* val response = client.toWorkflow<MyWorkflow>("workflow-123")
* .request { run("input") }
* .call()
* ```
*
* @param SVC the workflow class annotated with @Workflow
* @param key the key identifying the specific workflow instance
* @return a builder for creating typed requests
*/
@org.jetbrains.annotations.ApiStatus.Experimental
inline fun <reified SVC : Any> Client.toWorkflow(key: String): KClientRequestBuilder<SVC> {
ReflectionUtils.mustHaveWorkflowAnnotation(SVC::class.java)
require(ReflectionUtils.isKotlinClass(SVC::class.java)) {
"Using Java classes with Kotlin's API is not supported"
}
return KClientRequestBuilder(this, SVC::class.java, key)
}

/** Implementation of [KClientRequest] for ingress client. */
private class KClientRequestImpl<Req, Res>(
private val client: Client,
private val request: Request<Req, Res>,
) : KClientRequest<Req, Res>, Request<Req, Res> by request {

override fun options(block: InvocationOptions.Builder.() -> Unit): KClientRequest<Req, Res> {
val builder = InvocationOptions.builder()
builder.block()
return KClientRequestImpl(
client,
this.toBuilder().headers(builder.headers).idempotencyKey(builder.idempotencyKey).build(),
)
}

override suspend fun call(): Response<Res> {
return client.callSuspend(request)
}

override suspend fun send(delay: Duration?): SendResponse<Res> {
return client.sendSuspend(request, delay)
}
}
Loading