Versions
Sarama Version: Sarama af0513c
Kafka Version: 0.10.0.1
Go Version: 1.7
Configuration
I've got a example snippet over here
For Kafka I'm using the following configuration:
INFO KafkaConfig values:
advertised.host.name = null
metric.reporters = []
quota.producer.default = 9223372036854775807
offsets.topic.num.partitions = 50
log.flush.interval.messages = 9223372036854775807
auto.create.topics.enable = true
controller.socket.timeout.ms = 30000
log.flush.interval.ms = null
principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
replica.socket.receive.buffer.bytes = 65536
min.insync.replicas = 1
replica.fetch.wait.max.ms = 500
num.recovery.threads.per.data.dir = 1
ssl.keystore.type = JKS
sasl.mechanism.inter.broker.protocol = GSSAPI
default.replication.factor = 1
ssl.truststore.password = null
log.preallocate = false
sasl.kerberos.principal.to.local.rules = [DEFAULT]
fetch.purgatory.purge.interval.requests = 1000
ssl.endpoint.identification.algorithm = null
replica.socket.timeout.ms = 30000
message.max.bytes = 1000012
num.io.threads = 8
offsets.commit.required.acks = -1
log.flush.offset.checkpoint.interval.ms = 60000
delete.topic.enable = false
quota.window.size.seconds = 1
ssl.truststore.type = JKS
offsets.commit.timeout.ms = 5000
quota.window.num = 11
zookeeper.connect = localhost:2181
authorizer.class.name =
num.replica.fetchers = 1
log.retention.ms = null
log.roll.jitter.hours = 0
log.cleaner.enable = true
offsets.load.buffer.size = 5242880
log.cleaner.delete.retention.ms = 86400000
ssl.client.auth = none
controlled.shutdown.max.retries = 3
queued.max.requests = 500
offsets.topic.replication.factor = 3
log.cleaner.threads = 1
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
socket.request.max.bytes = 104857600
ssl.trustmanager.algorithm = PKIX
zookeeper.session.timeout.ms = 6000
log.retention.bytes = -1
log.message.timestamp.type = CreateTime
sasl.kerberos.min.time.before.relogin = 60000
zookeeper.set.acl = false
connections.max.idle.ms = 600000
offsets.retention.minutes = 1440
replica.fetch.backoff.ms = 1000
inter.broker.protocol.version = 0.10.0-IV1
log.retention.hours = 168
num.partitions = 1
broker.id.generation.enable = true
listeners = null
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
log.roll.ms = null
log.flush.scheduler.interval.ms = 9223372036854775807
ssl.cipher.suites = null
log.index.size.max.bytes = 10485760
ssl.keymanager.algorithm = SunX509
security.inter.broker.protocol = PLAINTEXT
replica.fetch.max.bytes = 1048576
advertised.port = null
log.cleaner.dedupe.buffer.size = 134217728
replica.high.watermark.checkpoint.interval.ms = 5000
log.cleaner.io.buffer.size = 524288
sasl.kerberos.ticket.renew.window.factor = 0.8
zookeeper.connection.timeout.ms = 6000
controlled.shutdown.retry.backoff.ms = 5000
log.roll.hours = 168
log.cleanup.policy = delete
host.name =
log.roll.jitter.ms = null
max.connections.per.ip = 2147483647
offsets.topic.segment.bytes = 104857600
background.threads = 10
quota.consumer.default = 9223372036854775807
request.timeout.ms = 30000
log.message.format.version = 0.10.0-IV1
log.index.interval.bytes = 4096
log.dir = /tmp/kafka-logs
log.segment.bytes = 1073741824
log.cleaner.backoff.ms = 15000
offset.metadata.max.bytes = 4096
ssl.truststore.location = null
group.max.session.timeout.ms = 300000
ssl.keystore.password = null
zookeeper.sync.time.ms = 2000
port = 9092
log.retention.minutes = null
log.segment.delete.delay.ms = 60000
log.dirs = /tmp/kafka-logs
controlled.shutdown.enable = true
compression.type = producer
max.connections.per.ip.overrides =
log.message.timestamp.difference.max.ms = 9223372036854775807
sasl.kerberos.kinit.cmd = /usr/bin/kinit
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 300
log.cleaner.min.cleanable.ratio = 0.5
replica.lag.time.max.ms = 10000
num.network.threads = 3
ssl.key.password = null
reserved.broker.max.id = 1000
metrics.num.samples = 2
socket.send.buffer.bytes = 102400
ssl.protocol = TLS
socket.receive.buffer.bytes = 102400
ssl.keystore.location = null
replica.fetch.min.bytes = 1
broker.rack = null
unclean.leader.election.enable = true
sasl.enabled.mechanisms = [GSSAPI]
group.min.session.timeout.ms = 6000
log.cleaner.io.buffer.load.factor = 0.9
offsets.retention.check.interval.ms = 600000
producer.purgatory.purge.interval.requests = 1000
metrics.sample.window.ms = 30000
broker.id = 0
offsets.topic.compression.codec = 0
log.retention.check.interval.ms = 300000
advertised.listeners = null
leader.imbalance.per.broker.percentage = 10
Logs
[sarama] 2016/10/04 16:11:36 Initializing new client
[sarama] 2016/10/04 16:11:36 client/metadata fetching metadata for all topics from broker localhost:9092
[sarama] 2016/10/04 16:11:36 Connected to broker at localhost:9092 (unregistered)
[sarama] 2016/10/04 16:11:36 client/brokers registered new broker #0 at 192.168.8.101:9092
[sarama] 2016/10/04 16:11:36 Successfully initialized new client
[sarama] 2016/10/04 16:11:36 Producer shutting down.
[sarama] 2016/10/04 16:11:36 producer/broker/0 starting up
[sarama] 2016/10/04 16:11:36 producer/broker/0 state change to [open] on bla/0
[sarama] 2016/10/04 16:11:36 Connected to broker at 192.168.8.101:9092 (registered as #0)
[sarama] 2016/10/04 16:11:36 Closing Client
Kafka logging
[2016-10-04 15:50:52,568] ERROR [Replica Manager on Broker 0]: Error processing append operation on partition test-1 (kafka.server.ReplicaManager)
java.lang.IllegalStateException: Compressed message has magic value 0 but inner message has magic value 1
at kafka.message.ByteBufferMessageSet$$anon$1.readMessageFromStream(ByteBufferMessageSet.scala:143)
at kafka.message.ByteBufferMessageSet$$anon$1.liftedTree2$1(ByteBufferMessageSet.scala:111)
at kafka.message.ByteBufferMessageSet$$anon$1.<init>(ByteBufferMessageSet.scala:109)
at kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
at kafka.message.ByteBufferMessageSet$$anon$2.makeNextOuter(ByteBufferMessageSet.scala:367)
at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:380)
at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:335)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:438)
at kafka.log.Log.liftedTree1$1(Log.scala:339)
at kafka.log.Log.append(Log.scala:338)
at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:441)
at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:427)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:237)
at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:427)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:409)
at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:395)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:395)
at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:331)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:405)
at kafka.server.KafkaApis.handle(KafkaApis.scala:76)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Problem Description
When I try to produce a message with Timestamp set and compression enabled for the producer it will give an error! When I try producing without compression enabled everything goes well and the right timestamp is applied to the kafka message
Versions
Sarama Version: Sarama af0513c
Kafka Version: 0.10.0.1
Go Version: 1.7
Configuration
I've got a example snippet over here
For Kafka I'm using the following configuration:
Logs
Kafka logging
Problem Description
When I try to produce a message with Timestamp set and compression enabled for the producer it will give an error! When I try producing without compression enabled everything goes well and the right timestamp is applied to the kafka message