Skip to content

[Bug] [connector-kafka] Unable to parse Avro-formatted Kafka data #10405

@moody1117

Description

@moody1117

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

The Flink program can successfully parse Avro-formatted Kafka data, but when parsing via Seatunnel, serialization fails. Is there an issue with the schema configuration?

SeaTunnel Version

2.3.12

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  Kafka {
    topic = "t_event"
    bootstrap.servers = "xxx:9092"
    consumer.group = "test_avro_kafka"
    start_mode = "timestamp"
    start_mode.timestamp = "1769504419000"
    start_mode.end_timestamp = "1769504479000"
    format = "avro"
    format_error_handle_way = "skip"  
    plugin_output = "kafka_table"

    kafka.config = {
      enable.auto.commit = "true"
      max.poll.records = "10000"
    }

    schema = {
      fields {
        dev_id        = string
        dev_ip        = string
        event_id      = string
        timestamp     = bigint
        s_ipv4        = string
        s_ipv6        = string
        s_port        = int
        d_ipv4        = string
        d_ipv6        = string
        d_port        = int
        sid           = int
        severity      = int
        alarm_name    = string
        alarm_type    = string
        classification = string
        protocol      = string
        domain_name   = string
        request_method = string
        protocol_version = string
        request_uri   = string
        request_url   = string
        referer       = string
        user_agent    = string
        request_header = string
        request_body  = string
        response_code = string
        response_header = string
        response_body = string
        XFF           = string
        extra_info    = string
        s_country     = string
        s_province    = string
        s_city        = string
        s_operator    = string
        d_country     = string
        d_province    = string
        d_city        = string
        d_operator    = string
        payload       = string
        attack_result = int
        attack_chain  = int
        description   = string
        solution      = string
        original_info = string
        pcap_info     = string
      }
    }
  }
}

sink {
  Console {
    plugin_input = "kafka_table"
  }
  LocalFile {
    plugin_input = "kafka_table"
    path = "/data/seatunnel/file"
    file_format_type = "json"
    custom_filename = true
    filename_time_format = "yyyy.MM.dd"
    is_enable_transaction = true
    file_name_expression = "${transactionId}_${now}"
   }
}

Running Command

seatunnel.sh --config $SEATUNNEL_HOME/job/saslkafka2file.conf  -m local  -DJvmOption="-Xms2G -Xmx2G"

Error Exception

2026-01-28 10:14:32,418 WARN  [a.s.c.s.k.s.KafkaRecordEmitter] [BlockingWorker-TaskGroupLocation{jobId=1068715656217624577, pipelineId=1, taskGroupId=3}] - Deserialize m

Zeta or Flink or Spark Version

zeta

Java or Scala Version

1.8

Screenshots

Image

avro schema:

{
  "type": "record",
  "name": "AvroEvent",
  "namespace": "safe.serialize",
  "fields": [
    {
      "name": "dev_id",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "dev_ip",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "event_id",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "timestamp",
      "type": [
        "long",
        "null"
      ]
    },
    {
      "name": "s_ipv4",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "s_ipv6",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "s_port",
      "type": [
        "int",
        "null"
      ]
    },
    {
      "name": "d_ipv4",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "d_ipv6",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "d_port",
      "type": [
        "int",
        "null"
      ]
    },
    {
      "name": "sid",
      "type": [
        "int",
        "null"
      ]
    },
    {
      "name": "severity",
      "type": [
        "int",
        "null"
      ]
    },
    {
      "name": "alarm_name",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "alarm_type",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "classification",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "protocol",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "domain_name",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "request_method",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "protocol_version",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "request_uri",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "request_url",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "referer",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "user_agent",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "request_header",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "request_body",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "response_code",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "response_header",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "response_body",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "XFF",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "extra_info",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "s_country",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "s_province",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "s_city",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "s_operator",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "d_country",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "d_province",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "d_city",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "d_operator",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "payload",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "attack_result",
      "type": [
        "int",
        "null"
      ]
    },
    {
      "name": "attack_chain",
      "type": [
        "int",
        "null"
      ]
    },
    {
      "name": "description",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "solution",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "original_info",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    },
    {
      "name": "pcap_info",
      "type": [
        {
          "type": "string",
          "avro.java.string": "String"
        },
        "null"
      ]
    }
  ]
}

Flink code:

Image

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions