Skip to content

AttributeError: 'NoneType' object has no attribute 'invalid_metadata' from beginning_offsets() #2734

@tvainika

Description

@tvainika

It seems beginnig_offsets() can cause AttributeError in 2.3.0.

I was not able to repeat this robustly, but it came immediately after creating topic with many partitions, and then fetching offsets ie. Kafka was not fully ready with the topic.

I tried to follow the logic of _fetch_offsets_by_times and it seems that future.exception.invalid_metadata is accessed even with successful futures ie. whole exception is None here. Here is a full stack trace from one of my internal test run.

timestamps = {TopicPartition(topic='d94afc56', partition=0): -2}
timeout_ms = 305000

    def _fetch_offsets_by_times(self, timestamps, timeout_ms=None):
        if not timestamps:
            return {}
    
        timer = Timer(timeout_ms, "Failed to get offsets by timestamps in %s ms" % (timeout_ms,))
        timestamps = copy.copy(timestamps)
        fetched_offsets = dict()
        while True:
            if not timestamps:
                return {}
    
            future = self._send_list_offsets_requests(timestamps)
            self._client.poll(future=future, timeout_ms=timer.timeout_ms)
    
            # Timeout w/o future completion
            if not future.is_done:
                break
    
            if future.succeeded():
                fetched_offsets.update(future.value[0])
                if not future.value[1]:
                    return fetched_offsets
    
                timestamps = {tp: timestamps[tp] for tp in future.value[1]}
    
            elif not future.retriable():
                raise future.exception  # pylint: disable-msg=raising-bad-type
    
>           if future.exception.invalid_metadata or self._client.cluster.need_update:
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E           AttributeError: 'NoneType' object has no attribute 'invalid_metadata'

fetched_offsets = {}
future     = <kafka.future.Future object at 0x7f7264fa72f0>
refresh_future = <kafka.future.Future object at 0x7f72650776b0>
self       = <kafka.consumer.fetcher.Fetcher object at 0x7f7264f375f0>
timeout_ms = 305000
timer      = <kafka.util.Timer object at 0x7f72654012c0>
timestamps = {TopicPartition(topic='d94afc56', partition=0): -2}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions