-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Open
Labels
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
The Flink program can successfully parse Avro-formatted Kafka data, but when parsing via Seatunnel, serialization fails. Is there an issue with the schema configuration?
SeaTunnel Version
2.3.12
SeaTunnel Config
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Kafka {
topic = "t_event"
bootstrap.servers = "xxx:9092"
consumer.group = "test_avro_kafka"
start_mode = "timestamp"
start_mode.timestamp = "1769504419000"
start_mode.end_timestamp = "1769504479000"
format = "avro"
format_error_handle_way = "skip"
plugin_output = "kafka_table"
kafka.config = {
enable.auto.commit = "true"
max.poll.records = "10000"
}
schema = {
fields {
dev_id = string
dev_ip = string
event_id = string
timestamp = bigint
s_ipv4 = string
s_ipv6 = string
s_port = int
d_ipv4 = string
d_ipv6 = string
d_port = int
sid = int
severity = int
alarm_name = string
alarm_type = string
classification = string
protocol = string
domain_name = string
request_method = string
protocol_version = string
request_uri = string
request_url = string
referer = string
user_agent = string
request_header = string
request_body = string
response_code = string
response_header = string
response_body = string
XFF = string
extra_info = string
s_country = string
s_province = string
s_city = string
s_operator = string
d_country = string
d_province = string
d_city = string
d_operator = string
payload = string
attack_result = int
attack_chain = int
description = string
solution = string
original_info = string
pcap_info = string
}
}
}
}
sink {
Console {
plugin_input = "kafka_table"
}
LocalFile {
plugin_input = "kafka_table"
path = "/data/seatunnel/file"
file_format_type = "json"
custom_filename = true
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
file_name_expression = "${transactionId}_${now}"
}
}
Running Command
seatunnel.sh --config $SEATUNNEL_HOME/job/saslkafka2file.conf -m local -DJvmOption="-Xms2G -Xmx2G"Error Exception
2026-01-28 10:14:32,418 WARN [a.s.c.s.k.s.KafkaRecordEmitter] [BlockingWorker-TaskGroupLocation{jobId=1068715656217624577, pipelineId=1, taskGroupId=3}] - Deserialize m
Zeta or Flink or Spark Version
zeta
Java or Scala Version
1.8
Screenshots
avro schema:
{
"type": "record",
"name": "AvroEvent",
"namespace": "safe.serialize",
"fields": [
{
"name": "dev_id",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "dev_ip",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "event_id",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "timestamp",
"type": [
"long",
"null"
]
},
{
"name": "s_ipv4",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "s_ipv6",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "s_port",
"type": [
"int",
"null"
]
},
{
"name": "d_ipv4",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "d_ipv6",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "d_port",
"type": [
"int",
"null"
]
},
{
"name": "sid",
"type": [
"int",
"null"
]
},
{
"name": "severity",
"type": [
"int",
"null"
]
},
{
"name": "alarm_name",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "alarm_type",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "classification",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "protocol",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "domain_name",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "request_method",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "protocol_version",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "request_uri",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "request_url",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "referer",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "user_agent",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "request_header",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "request_body",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "response_code",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "response_header",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "response_body",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "XFF",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "extra_info",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "s_country",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "s_province",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "s_city",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "s_operator",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "d_country",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "d_province",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "d_city",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "d_operator",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "payload",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "attack_result",
"type": [
"int",
"null"
]
},
{
"name": "attack_chain",
"type": [
"int",
"null"
]
},
{
"name": "description",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "solution",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "original_info",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
},
{
"name": "pcap_info",
"type": [
{
"type": "string",
"avro.java.string": "String"
},
"null"
]
}
]
}
Flink code:
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Reactions are currently unavailable