Skip to content

Memory leak in query_data_frame #548

@karponi

Description

@karponi

Specifications

  • Client Version: 1.38
  • InfluxDB Version: 2.1.1
  • Platform: Linux

Code sample to reproduce problem

def query_influx(
    query, input_data, config, client
) -> pd.DataFrame:
    with InfluxHelper(influx_config=config.influx) as influx:
        logger.info(
            "%s: fetching values from influx from %s to %s",
            input_data.machine,
            input_data.dateFrom,
            input_data.dateTo,
        )

        return client.query_api.query_data_frame(
            query=query,
            params=dict(
                _bucket=influx.influx_config.bucket,
                machine=input_data.machine,
                start_date=input_data.dateFrom,
                stop_date=input_data.dateTo,
            ),
            org=influx.influx_config.org,
        )

def _retrieve_influx_data(input_data) -> pd.DataFrame:

    influx_get_query = """
                   from(bucket: _bucket)
                   |> range(start: start_date, stop: stop_date)
                   """ + (
        "|> filter(fn: (r) => r._measurement == machine and r._field =~/^("
        + "|".join(config.features)
        + ")$/)"
    )

    data_frame = query_influx(
        query=influx_get_query, input_data=input_data, config=config
    )

    logger.info("%s: processing influx results", input_data.machine)
    return process_influx_results(data_frame)

Expected behavior

The memory should not increase at every call of query_influx

Actual behavior

The memory is being increased at every call, leading to crashes

Additional info

here's the output of the memory_profiler:

Line     Mem usage    Increment  Occurrences   Line Contents
=============================================================
    24    185.8 MiB    185.8 MiB           1   @profile
    25                                         def query_influx(
    26                                             query, input_data, config, client
    27                                         ) -> pd.DataFrame:
    28                                             """
    29                                             Queries influx "common" bucket for data.
    30                                             Args:
    31                                                 input_data: the configuration sent by the request
    32                                                 config: the configuration file of the app
    33                                         
    34                                             Returns: the result of the query as a dataframe and the number of rows of the result
    35                                         
    36                                             """
    37                                         
    38    291.5 MiB      0.0 MiB           2       with InfluxHelper(influx_config=config.influx) as influx:
    39    185.8 MiB      0.0 MiB           2           logger.info(
    40    185.8 MiB      0.0 MiB           1               "%s: fetching values from influx from %s to %s",
    41    185.8 MiB      0.0 MiB           1               input_data.machine,
    42    185.8 MiB      0.0 MiB           1               input_data.dateFrom,
    43    185.8 MiB      0.0 MiB           1               input_data.dateTo,
    44                                                 )
    45                                         
    46    291.5 MiB    105.7 MiB           2           return client.query_api.query_data_frame(
    47    185.8 MiB      0.0 MiB           1               query=query,
    48    185.8 MiB      0.0 MiB           2               params=dict(
    49    185.8 MiB      0.0 MiB           1                   _bucket=influx.influx_config.bucket,
    50    185.8 MiB      0.0 MiB           1                   machine=input_data.machine,
    51    185.8 MiB      0.0 MiB           1                   start_date=input_data.dateFrom,
    52    185.8 MiB      0.0 MiB           1                   stop_date=input_data.dateTo,
    53                                                     ),
    54    185.8 MiB      0.0 MiB           1               org=influx.influx_config.org,
    55                                                 )


Line     Mem usage    Increment  Occurrences   Line Contents
=============================================================
    93    184.2 MiB    184.2 MiB           1   @profile
    94                                         def _retrieve_influx_data(input_data) -> pd.DataFrame:
    95                                         
    96    184.2 MiB      0.0 MiB           2       influx_get_query = """
    97                                                            from(bucket: _bucket)
    98                                                            |> range(start: start_date, stop: stop_date)
   100    184.2 MiB      0.0 MiB          3         """ + ( "|> filter(fn: (r) => r._measurement == machine and r._field =~/^("                     
   101    184.2 MiB      0.0 MiB           1           + "|".join(config.features)
   102    184.2 MiB      0.0 MiB           1           + ")$/)"
   103                                             )
   104                                         
   **105    291.5 MiB    107.3 MiB           2       data_frame = query_influx(
   106    184.2 MiB      0.0 MiB           1           query=influx_get_query, input_data=input_data, config=config
   107                                             )
   108                                         
   109    291.5 MiB      0.0 MiB           1       logger.info("%s: processing influx results", input_data.machine)
   110**                                                                             
   111    324.9 MiB     33.4 MiB           1       return process_influx_results(data_frame)

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions