Skip to content

java.io.NotSerializableException: Non-serializable lambda #50

@GregoryKueski

Description

@GregoryKueski

When trying to start Kinesis Data Analytics with my flink application, I'm getting the following error org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot serialize operator object class org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory. caused by java.io.NotSerializableException: Non-serializable lambda. I'm using exactly the same code provided in the examples for creating the kinesis data streams sink. I also use the same lines in the build.sbt. What could be the inconvenient?

Seems like it's the lambda from the setPartitionKeyGenerator method.

What could be a workaround?

def createSink: KinesisStreamsSink[String] = {
  val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties
  val outputProperties = applicationProperties.get("ProducerConfigProperties")

  KinesisStreamsSink.builder[String]
    .setKinesisClientProperties(outputProperties)
    .setSerializationSchema(new SimpleStringSchema)
    .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName))
    .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode))
    .build
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions