-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Open
Description
It seems beginnig_offsets() can cause AttributeError in 2.3.0.
I was not able to repeat this robustly, but it came immediately after creating topic with many partitions, and then fetching offsets ie. Kafka was not fully ready with the topic.
I tried to follow the logic of _fetch_offsets_by_times and it seems that future.exception.invalid_metadata is accessed even with successful futures ie. whole exception is None here. Here is a full stack trace from one of my internal test run.
timestamps = {TopicPartition(topic='d94afc56', partition=0): -2}
timeout_ms = 305000
def _fetch_offsets_by_times(self, timestamps, timeout_ms=None):
if not timestamps:
return {}
timer = Timer(timeout_ms, "Failed to get offsets by timestamps in %s ms" % (timeout_ms,))
timestamps = copy.copy(timestamps)
fetched_offsets = dict()
while True:
if not timestamps:
return {}
future = self._send_list_offsets_requests(timestamps)
self._client.poll(future=future, timeout_ms=timer.timeout_ms)
# Timeout w/o future completion
if not future.is_done:
break
if future.succeeded():
fetched_offsets.update(future.value[0])
if not future.value[1]:
return fetched_offsets
timestamps = {tp: timestamps[tp] for tp in future.value[1]}
elif not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type
> if future.exception.invalid_metadata or self._client.cluster.need_update:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E AttributeError: 'NoneType' object has no attribute 'invalid_metadata'
fetched_offsets = {}
future = <kafka.future.Future object at 0x7f7264fa72f0>
refresh_future = <kafka.future.Future object at 0x7f72650776b0>
self = <kafka.consumer.fetcher.Fetcher object at 0x7f7264f375f0>
timeout_ms = 305000
timer = <kafka.util.Timer object at 0x7f72654012c0>
timestamps = {TopicPartition(topic='d94afc56', partition=0): -2}
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels