When trying to start Kinesis Data Analytics with my flink application, I'm getting the following error org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot serialize operator object class org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory. caused by java.io.NotSerializableException: Non-serializable lambda. I'm using exactly the same code provided in the examples for creating the kinesis data streams sink. I also use the same lines in the build.sbt. What could be the inconvenient?
Seems like it's the lambda from the setPartitionKeyGenerator method.
What could be a workaround?
def createSink: KinesisStreamsSink[String] = {
val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties
val outputProperties = applicationProperties.get("ProducerConfigProperties")
KinesisStreamsSink.builder[String]
.setKinesisClientProperties(outputProperties)
.setSerializationSchema(new SimpleStringSchema)
.setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName))
.setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode))
.build
}
When trying to start Kinesis Data Analytics with my flink application, I'm getting the following error
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot serialize operator object class org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory.caused byjava.io.NotSerializableException: Non-serializable lambda. I'm using exactly the same code provided in the examples for creating the kinesis data streams sink. I also use the same lines in thebuild.sbt. What could be the inconvenient?Seems like it's the lambda from the
setPartitionKeyGeneratormethod.What could be a workaround?