diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 78596ee3eba..4e5d63e2d86 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -12,14 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. - +import json import logging import os import time from collections import OrderedDict from typing import Dict, Union from typing import List +from urllib.parse import urlparse +import fastavro import grpc import pandas as pd import pyarrow as pa @@ -38,7 +40,7 @@ from feast.feature_set import FeatureSet, Entity from feast.job import Job from feast.loaders.abstract_producer import get_producer -from feast.loaders.file import export_dataframe_to_staging_location +from feast.loaders.file import export_source_to_staging_location from feast.loaders.ingest import KAFKA_CHUNK_PRODUCTION_TIMEOUT from feast.loaders.ingest import get_feature_row_chunks from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse @@ -322,22 +324,28 @@ def list_entities(self) -> Dict[str, Entity]: return entities_dict def get_batch_features( - self, feature_ids: List[str], entity_rows: pd.DataFrame + self, feature_ids: List[str], entity_rows: Union[pd.DataFrame, str] ) -> Job: """ Retrieves historical features from a Feast Serving deployment. Args: - feature_ids: List of feature ids that will be returned for each - entity. Each feature id should have the following format + feature_ids (List[str]): + List of feature ids that will be returned for each entity. + Each feature id should have the following format "feature_set_name:version:feature_name". - entity_rows: Pandas dataframe containing entities and a 'datetime' - column. Each entity in a feature set must be present as a column - in this dataframe. The datetime column must + + entity_rows (Union[pd.DataFrame, str]): + Pandas dataframe containing entities and a 'datetime' column. + Each entity in a feature set must be present as a column in this + dataframe. The datetime column must contain timestamps in + datetime64 format. Returns: - Returns a job object that can be used to monitor retrieval progress - asynchronously, and can be used to materialize the results + feast.job.Job: + Returns a job object that can be used to monitor retrieval + progress asynchronously, and can be used to materialize the + results. Examples: >>> from feast import Client @@ -360,21 +368,11 @@ def get_batch_features( fs_request = _build_feature_set_request(feature_ids) - # Validate entity rows based on entities in Feast Core - self._validate_entity_rows_for_batch_retrieval(entity_rows, fs_request) - - # Remove timezone from datetime column - if isinstance( - entity_rows["datetime"].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype - ): - entity_rows["datetime"] = pd.DatetimeIndex( - entity_rows["datetime"] - ).tz_localize(None) - # Retrieve serving information to determine store type and # staging location serving_info = self._serving_service_stub.GetFeastServingInfo( - GetFeastServingInfoRequest(), timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT + GetFeastServingInfoRequest(), + timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT ) # type: GetFeastServingInfoResponse if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH: @@ -383,17 +381,50 @@ def get_batch_features( f"does not support batch retrieval " ) - # Export and upload entity row dataframe to staging location + if isinstance(entity_rows, pd.DataFrame): + # Pandas DataFrame detected + # Validate entity rows to based on entities in Feast Core + self._validate_dataframe_for_batch_retrieval( + entity_rows=entity_rows, + feature_sets_request=fs_request + ) + + # Remove timezone from datetime column + if isinstance( + entity_rows["datetime"].dtype, + pd.core.dtypes.dtypes.DatetimeTZDtype + ): + entity_rows["datetime"] = pd.DatetimeIndex( + entity_rows["datetime"] + ).tz_localize(None) + elif isinstance(entity_rows, str): + # String based source + if entity_rows.endswith((".avro", "*")): + # Validate Avro entity rows to based on entities in Feast Core + self._validate_avro_for_batch_retrieval( + source=entity_rows, + feature_sets_request=fs_request + ) + else: + raise Exception( + f"Only .avro and wildcard paths are accepted as entity_rows" + ) + else: + raise Exception(f"Only pandas.DataFrame and str types are allowed" + f" as entity_rows, but got {type(entity_rows)}.") + + # Export and upload entity row DataFrame to staging location # provided by Feast - staged_file = export_dataframe_to_staging_location( + staged_files = export_source_to_staging_location( entity_rows, serving_info.job_staging_location - ) # type: str + ) # type: List[str] request = GetBatchFeaturesRequest( feature_sets=fs_request, dataset_source=DatasetSource( file_source=DatasetSource.FileSource( - file_uris=[staged_file], data_format=DataFormat.DATA_FORMAT_AVRO + file_uris=staged_files, + data_format=DataFormat.DATA_FORMAT_AVRO ) ), ) @@ -402,28 +433,107 @@ def get_batch_features( response = self._serving_service_stub.GetBatchFeatures(request) return Job(response.job, self._serving_service_stub) - def _validate_entity_rows_for_batch_retrieval( - self, entity_rows, feature_sets_request + def _validate_dataframe_for_batch_retrieval( + self, entity_rows: pd.DataFrame, feature_sets_request ): """ - Validate whether an entity_row dataframe contains the correct - information for batch retrieval + Validate whether an the entity rows in a DataFrame contains the correct + information for batch retrieval. + + Datetime column must be present in the DataFrame. Args: - entity_rows: Pandas dataframe containing entities and datetime - column. Each entity in a feature set must be present as a - column in this dataframe. - feature_sets_request: Feature sets that will be requested + entity_rows (pd.DataFrame): + Pandas DataFrame containing entities and datetime column. Each + entity in a feature set must be present as a column in this + DataFrame. + + feature_sets_request: + Feature sets that will be requested. """ + self._validate_columns( + columns=entity_rows.columns, + feature_sets_request=feature_sets_request, + datetime_field="datetime" + ) + + def _validate_avro_for_batch_retrieval( + self, source: str, feature_sets_request + ): + """ + Validate whether the entity rows in an Avro source file contains the + correct information for batch retrieval. + + Only gs:// and local files (file://) uri schemes are allowed. + + Avro file must have a column named "event_timestamp". + + No checks will be done if a GCS path is provided. + + Args: + source (str): + File path to Avro. + + feature_sets_request: + Feature sets that will be requested. + """ + p = urlparse(source) + + if p.scheme == "gs": + # GCS path provided (Risk is delegated to user) + # No validation if GCS path is provided + return + elif p.scheme == "file" or not p.scheme: + # Local file (file://) provided + file_path = os.path.abspath(os.path.join(p.netloc, p.path)) + else: + raise Exception(f"Unsupported uri scheme provided {p.scheme}, only " + f"local files (file://), and gs:// schemes are " + f"allowed") + + with open(file_path, "rb") as f: + reader = fastavro.reader(f) + schema = json.loads(reader.metadata["avro.schema"]) + columns = [x["name"] for x in schema["fields"]] + self._validate_columns( + columns=columns, + feature_sets_request=feature_sets_request, + datetime_field="event_timestamp" + ) + + def _validate_columns( + self, columns: List[str], + feature_sets_request, + datetime_field: str + ) -> None: + """ + Check if the required column contains the correct values for batch + retrieval. + + Args: + columns (List[str]): + List of columns to validate against feature_sets_request. + + feature_sets_request (): + Feature sets that will be requested. + + datetime_field (str): + Name of the datetime field that must be enforced and present as + a column in the data source. + + Returns: + None: + None + """ # Ensure datetime column exists - if "datetime" not in entity_rows.columns: + if datetime_field not in columns: raise ValueError( - f'Entity rows does not contain "datetime" column in columns ' - f"{entity_rows.columns}" + f'Entity rows does not contain "{datetime_field}" column in ' + f'columns {columns}' ) - # Validate dataframe columns based on feature set entities + # Validate Avro columns based on feature set entities for feature_set in feature_sets_request: fs = self.get_feature_set( name=feature_set.name, version=feature_set.version @@ -434,10 +544,10 @@ def _validate_entity_rows_for_batch_retrieval( f"could not be found" ) for entity_type in fs.entities: - if entity_type.name not in entity_rows.columns: + if entity_type.name not in columns: raise ValueError( - f'Dataframe does not contain entity "{entity_type.name}"' - f' column in columns "{entity_rows.columns}"' + f'Input does not contain entity' + f' "{entity_type.name}" column in columns "{columns}"' ) def get_online_features( @@ -610,7 +720,9 @@ def ingest( return None -def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest]: +def _build_feature_set_request( + feature_ids: List[str] +) -> List[FeatureSetRequest]: """ Builds a list of FeatureSet objects from feature set ids in order to retrieve feature data from Feast Serving @@ -643,7 +755,7 @@ def _read_table_from_source( max_workers: int ) -> str: """ - Infers a data source type (path or Pandas Dataframe) and reads it in as + Infers a data source type (path or Pandas DataFrame) and reads it in as a PyArrow Table. The PyArrow Table that is read will be written to a parquet file with row @@ -688,7 +800,8 @@ def _read_table_from_source( else: table = pq.read_table(file_path) else: - raise ValueError(f"Unknown data source provided for ingestion: {source}") + raise ValueError( + f"Unknown data source provided for ingestion: {source}") # Ensure that PyArrow table is initialised assert isinstance(table, pa.lib.Table) diff --git a/sdk/python/feast/job.py b/sdk/python/feast/job.py index 4273f86ea84..26f6181ee2d 100644 --- a/sdk/python/feast/job.py +++ b/sdk/python/feast/job.py @@ -1,12 +1,11 @@ import tempfile import time from datetime import datetime, timedelta -from typing import List +from typing import Iterable from urllib.parse import urlparse import fastavro import pandas as pd -from fastavro import reader as fastavro_reader from google.cloud import storage from feast.serving.ServingService_pb2 import GetJobRequest @@ -62,15 +61,18 @@ def reload(self): """ self.job_proto = self.serving_stub.GetJob(GetJobRequest(job=self.job_proto)).job - def result(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC): + def get_avro_files(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC): """ - Wait until job is done to get an iterable rows of result. - The row can only represent an Avro row in Feast 0.3. + Wait until job is done to get the file uri to Avro result files on + Google Cloud Storage. Args: - timeout_sec: max no of seconds to wait until job is done. If "timeout_sec" is exceeded, an exception will be raised. + timeout_sec (int): + Max no of seconds to wait until job is done. If "timeout_sec" + is exceeded, an exception will be raised. - Returns: Iterable of Avro rows + Returns: + str: Google Cloud Storage file uris of the returned Avro files. """ max_wait_datetime = datetime.now() + timedelta(seconds=timeout_sec) wait_duration_sec = 2 @@ -78,11 +80,13 @@ def result(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC): while self.status != JOB_STATUS_DONE: if datetime.now() > max_wait_datetime: raise Exception( - "Timeout exceeded while waiting for result. Please retry this method or use a longer timeout value." + "Timeout exceeded while waiting for result. Please retry " + "this method or use a longer timeout value." ) self.reload() time.sleep(wait_duration_sec) + # Backoff the wait duration exponentially up till MAX_WAIT_INTERVAL_SEC wait_duration_sec = min(wait_duration_sec * 2, MAX_WAIT_INTERVAL_SEC) @@ -95,7 +99,22 @@ def result(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC): "your Feast Serving deployment." ) - uris = [urlparse(uri) for uri in self.job_proto.file_uris] + return [urlparse(uri) for uri in self.job_proto.file_uris] + + def result(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC): + """ + Wait until job is done to get an iterable rows of result. The row can + only represent an Avro row in Feast 0.3. + + Args: + timeout_sec (int): + Max no of seconds to wait until job is done. If "timeout_sec" + is exceeded, an exception will be raised. + + Returns: + Iterable of Avro rows. + """ + uris = self.get_avro_files(timeout_sec) for file_uri in uris: if file_uri.scheme == "gs": file_obj = tempfile.TemporaryFile() @@ -113,16 +132,64 @@ def result(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC): for record in avro_reader: yield record - def to_dataframe(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC): + def to_dataframe( + self, + timeout_sec: int = DEFAULT_TIMEOUT_SEC + ) -> pd.DataFrame: """ - Wait until job is done to get an interable rows of result + Wait until a job is done to get an iterable rows of result. This method + will split the response into chunked DataFrame of a specified size to + to be yielded to the instance calling it. Args: - timeout_sec: max no of seconds to wait until job is done. If "timeout_sec" is exceeded, an exception will be raised. - Returns: pandas Dataframe of the feature values + max_chunk_size (int): + Maximum number of rows that the DataFrame should contain. + + timeout_sec (int): + Max no of seconds to wait until job is done. If "timeout_sec" + is exceeded, an exception will be raised. + + Returns: + pd.DataFrame: + Pandas DataFrame of the feature values. """ records = [r for r in self.result(timeout_sec=timeout_sec)] return pd.DataFrame.from_records(records) + def to_chunked_dataframe( + self, + max_chunk_size: int = -1, + timeout_sec: int = DEFAULT_TIMEOUT_SEC + ) -> pd.DataFrame: + """ + Wait until a job is done to get an iterable rows of result. This method + will split the response into chunked DataFrame of a specified size to + to be yielded to the instance calling it. + + Args: + max_chunk_size (int): + Maximum number of rows that the DataFrame should contain. + + timeout_sec (int): + Max no of seconds to wait until job is done. If "timeout_sec" + is exceeded, an exception will be raised. + + Returns: + pd.DataFrame: + Pandas DataFrame of the feature values. + """ + # Max chunk size defined by user + records = [] + for result in self.result(timeout_sec=timeout_sec): + result.append(records) + if len(records) == max_chunk_size: + df = pd.DataFrame.from_records(records) + records.clear() # Empty records array + yield df + + # Handle for last chunk that is < max_chunk_size + if not records: + yield pd.DataFrame.from_records(records) + def __iter__(self): return iter(self.result()) diff --git a/sdk/python/feast/loaders/file.py b/sdk/python/feast/loaders/file.py index 8dd6b503a74..108f2790dd8 100644 --- a/sdk/python/feast/loaders/file.py +++ b/sdk/python/feast/loaders/file.py @@ -1,70 +1,159 @@ +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re import shutil import tempfile -from typing import Optional -from urllib.parse import urlparse import uuid -import pandas as pd from datetime import datetime +from typing import List, Optional, Tuple, Union +from urllib.parse import urlparse, ParseResult + +import pandas as pd from google.cloud import storage from pandavro import to_avro -def export_dataframe_to_staging_location( - df: pd.DataFrame, staging_location_uri: str -) -> str: +def export_source_to_staging_location( + source: Union[pd.DataFrame, str], staging_location_uri: str +) -> List[str]: """ - Uploads a dataframe to a remote staging location + Uploads a DataFrame as an Avro file to a remote staging location. + + The local staging location specified in this function is used for E2E + tests, please do not use it. Args: - df: Pandas dataframe - staging_location_uri: Remote staging location where dataframe should be written + source (Union[pd.DataFrame, str]: + Source of data to be staged. Can be a pandas DataFrame or a file + path. + + Only three types of source are allowed: + * Pandas DataFrame + * Local Avro file + * GCS Avro file + + + staging_location_uri (str): + Remote staging location where DataFrame should be written. Examples: - gs://bucket/path/ - file:///data/subfolder/ + * gs://bucket/path/ + * file:///data/subfolder/ Returns: - Returns the full path to the file in the remote staging location + List[str]: + Returns a list containing the full path to the file(s) in the + remote staging location. """ - # Validate staging location uri = urlparse(staging_location_uri) + + # Prepare Avro file to be exported to staging location + if isinstance(source, pd.DataFrame): + # DataFrame provided as a source + if uri.scheme == "file": + uri_path = uri.path + else: + uri_path = None + + # Remote gs staging location provided by serving + dir_path, file_name, source_path = export_dataframe_to_local( + source, + uri_path + ) + elif urlparse(source).scheme in ["", "file"]: + # Local file provided as a source + dir_path = None + file_name = os.path.basename(source) + source_path = os.path.abspath(os.path.join( + urlparse(source).netloc, urlparse(source).path)) + elif urlparse(source).scheme == "gs": + # Google Cloud Storage path provided + input_source_uri = urlparse(source) + if "*" in source: + # Wildcard path + return _get_files( + bucket=input_source_uri.hostname, + uri=input_source_uri + ) + else: + return [source] + else: + raise Exception(f"Only string and DataFrame types are allowed as a " + f"source, {type(source)} was provided.") + + # Push data to required staging location if uri.scheme == "gs": - dir_path, file_name, source_path = export_dataframe_to_local(df) + # Staging location is a Google Cloud Storage path upload_file_to_gcs( - source_path, uri.hostname, str(uri.path).strip("/") + "/" + file_name + source_path, + uri.hostname, + str(uri.path).strip("/") + "/" + file_name ) - if len(str(dir_path)) < 5: - raise Exception(f"Export location {dir_path} dangerous. Stopping.") - shutil.rmtree(dir_path) elif uri.scheme == "file": - dir_path, file_name, source_path = export_dataframe_to_local(df, uri.path) + # Staging location is a file path + # Used for end-to-end test + pass else: raise Exception( - f"Staging location {staging_location_uri} does not have a valid URI. Only gs:// and file:// are supported" + f"Staging location {staging_location_uri} does not have a " + f"valid URI. Only gs:// and file:// uri scheme are supported." ) - return staging_location_uri.rstrip("/") + "/" + file_name + # Clean up, remove local staging file + if isinstance(source, pd.DataFrame) and len(str(dir_path)) > 4: + shutil.rmtree(dir_path) + + return [staging_location_uri.rstrip("/") + "/" + file_name] -def export_dataframe_to_local(df: pd.DataFrame, dir_path: Optional[str] = None): +def export_dataframe_to_local( + df: pd.DataFrame, + dir_path: Optional[str] = None +) -> Tuple[str, str, str]: """ - Exports a pandas dataframe to the local filesystem + Exports a pandas DataFrame to the local filesystem. Args: - df: Pandas dataframe to save - dir_path: (optional) Absolute directory path '/data/project/subfolder/' + df (pd.DataFrame): + Pandas DataFrame to save. + + dir_path (Optional[str]): + Absolute directory path '/data/project/subfolder/'. + + Returns: + Tuple[str, str, str]: + Tuple of directory path, file name and destination path. The + destination path can be obtained by concatenating the directory + path and file name. """ # Create local staging location if not provided if dir_path is None: dir_path = tempfile.mkdtemp() - file_name = f'{datetime.now().strftime("%d-%m-%Y_%I-%M-%S_%p")}_{str(uuid.uuid4())[:8]}.avro' + file_name = _get_file_name() dest_path = f"{dir_path}/{file_name}" # Temporarily rename datetime column to event_timestamp. Ideally we would # force the schema with our avro writer instead. - df.columns = ["event_timestamp" if col == "datetime" else col for col in df.columns] + df.columns = [ + "event_timestamp" + if col == "datetime" else col + for col in df.columns + ] try: # Export dataset to file in local path @@ -74,23 +163,84 @@ def export_dataframe_to_local(df: pd.DataFrame, dir_path: Optional[str] = None): finally: # Revert event_timestamp column to datetime df.columns = [ - "datetime" if col == "event_timestamp" else col for col in df.columns + "datetime" + if col == "event_timestamp" else col + for col in df.columns ] return dir_path, file_name, dest_path -def upload_file_to_gcs(local_path: str, bucket: str, remote_path: str): +def upload_file_to_gcs(local_path: str, bucket: str, remote_path: str) -> None: """ - Upload a file from the local file system to Google Cloud Storage (GCS) + Upload a file from the local file system to Google Cloud Storage (GCS). Args: - local_path: Local filesystem path of file to upload - bucket: GCS bucket to upload to - remote_path: Path within GCS bucket to upload file to, includes file name + local_path (str): + Local filesystem path of file to upload. + + bucket (str): + GCS bucket destination to upload to. + + remote_path (str): + Path within GCS bucket to upload file to, includes file name. + + Returns: + None: + None """ storage_client = storage.Client(project=None) bucket = storage_client.get_bucket(bucket) blob = bucket.blob(remote_path) blob.upload_from_filename(local_path) + + +def _get_files(bucket: str, uri: ParseResult) -> List[str]: + """ + List all available files within a Google storage bucket that matches a wild + card path. + + Args: + bucket (str): + Google Storage bucket to reference. + + uri (urllib.parse.ParseResult): + Wild card uri path containing the "*" character. + Example: + * gs://feast/staging_location/* + * gs://feast/staging_location/file_*.avro + + Returns: + List[str]: + List of all available files matching the wildcard path. + """ + + storage_client = storage.Client(project=None) + bucket = storage_client.get_bucket(bucket) + path = uri.path + + if "*" in path: + regex = re.compile(path.replace("*", ".*?").strip("/")) + blob_list = bucket.list_blobs( + prefix=path.strip("/").split("*")[0], + delimiter="/" + ) + # File path should not be in path (file path must be longer than path) + return [f"{uri.scheme}://{uri.hostname}/{file}" + for file in [x.name for x in blob_list] + if re.match(regex, file) and file not in path] + else: + raise Exception(f"{path} is not a wildcard path") + + +def _get_file_name() -> str: + """ + Create a random file name. + + Returns: + str: + Randomised file name. + """ + + return f'{datetime.now().strftime("%d-%m-%Y_%I-%M-%S_%p")}_{str(uuid.uuid4())[:8]}.avro' diff --git a/tests/e2e/bq-batch-retrieval.py b/tests/e2e/bq-batch-retrieval.py index 639ca9f5595..3458eb4740b 100644 --- a/tests/e2e/bq-batch-retrieval.py +++ b/tests/e2e/bq-batch-retrieval.py @@ -2,6 +2,7 @@ import time from datetime import datetime from datetime import timedelta +from urllib.parse import urlparse import numpy as np import pandas as pd @@ -12,7 +13,9 @@ from feast.feature import Feature from feast.feature_set import FeatureSet from feast.type_map import ValueType +from google.cloud import storage from google.protobuf.duration_pb2 import Duration +from pandavro import to_avro @pytest.fixture(scope="module") @@ -30,6 +33,11 @@ def allow_dirty(pytestconfig): return True if pytestconfig.getoption("allow_dirty").lower() == "true" else False +@pytest.fixture(scope="module") +def gcs_path(pytestconfig): + return pytestconfig.getoption("gcs_path") + + @pytest.fixture(scope="module") def client(core_url, serving_url, allow_dirty): # Get client for core and serving @@ -44,6 +52,94 @@ def client(core_url, serving_url, allow_dirty): return client +def test_get_batch_features_with_file(client): + file_fs1 = FeatureSet( + "file_feature_set", + features=[Feature("feature_value", ValueType.STRING)], + entities=[Entity("entity_id", ValueType.INT64)], + max_age=Duration(seconds=100), + ) + + client.apply(file_fs1) + file_fs1 = client.get_feature_set(name="file_feature_set", version=1) + + N_ROWS = 10 + time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) + features_1_df = pd.DataFrame( + { + "datetime": [time_offset] * N_ROWS, + "entity_id": [i for i in range(N_ROWS)], + "feature_value": [f"{i}" for i in range(N_ROWS)], + } + ) + client.ingest(file_fs1, features_1_df) + + # Rename column (datetime -> event_timestamp) + features_1_df = features_1_df.rename(columns={"datetime": "event_timestamp"}) + + to_avro(df=features_1_df, file_path_or_buffer="file_feature_set.avro") + + feature_retrieval_job = client.get_batch_features( + entity_rows="file://file_feature_set.avro", feature_ids=["file_feature_set:1:feature_value"] + ) + + output = feature_retrieval_job.to_dataframe() + print(output.head()) + + assert output["entity_id"].to_list() == [int(i) for i in output["file_feature_set_v1_feature_value"].to_list()] + + +def test_get_batch_features_with_gs_path(client, gcs_path): + gcs_fs1 = FeatureSet( + "gcs_feature_set", + features=[Feature("feature_value", ValueType.STRING)], + entities=[Entity("entity_id", ValueType.INT64)], + max_age=Duration(seconds=100), + ) + + client.apply(gcs_fs1) + gcs_fs1 = client.get_feature_set(name="gcs_feature_set", version=1) + + N_ROWS = 10 + time_offset = datetime.utcnow().replace(tzinfo=pytz.utc) + features_1_df = pd.DataFrame( + { + "datetime": [time_offset] * N_ROWS, + "entity_id": [i for i in range(N_ROWS)], + "feature_value": [f"{i}" for i in range(N_ROWS)], + } + ) + client.ingest(gcs_fs1, features_1_df) + + # Rename column (datetime -> event_timestamp) + features_1_df = features_1_df.rename(columns={"datetime": "event_timestamp"}) + + # Output file to local + file_name = "gcs_feature_set.avro" + to_avro(df=features_1_df, file_path_or_buffer=file_name) + + uri = urlparse(gcs_path) + bucket = uri.hostname + ts = int(time.time()) + remote_path = str(uri.path).strip("/") + f"{ts}/{file_name}" + + # Upload file to gcs + storage_client = storage.Client(project=None) + bucket = storage_client.get_bucket(bucket) + blob = bucket.blob(remote_path) + blob.upload_from_filename(file_name) + + feature_retrieval_job = client.get_batch_features( + entity_rows=f"{gcs_path}{ts}/*", + feature_ids=["gcs_feature_set:1:feature_value"] + ) + + output = feature_retrieval_job.to_dataframe() + print(output.head()) + + assert output["entity_id"].to_list() == [int(i) for i in output["gcs_feature_set_v1_feature_value"].to_list()] + + def test_order_by_creation_time(client): proc_time_fs = FeatureSet( "processing_time", diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index b37770a83f9..8ea472b6620 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -2,3 +2,4 @@ def pytest_addoption(parser): parser.addoption("--core_url", action="store", default="localhost:6565") parser.addoption("--serving_url", action="store", default="localhost:6566") parser.addoption("--allow_dirty", action="store", default="False") + parser.addoption("--gcs_path", action="store", default="gs://feast-templocation-kf-feast/") diff --git a/tests/e2e/requirements.txt b/tests/e2e/requirements.txt index 6b999421c04..0ba345a000f 100644 --- a/tests/e2e/requirements.txt +++ b/tests/e2e/requirements.txt @@ -1,6 +1,7 @@ mock==2.0.0 numpy==1.16.4 pandas==0.24.2 +pandavro==1.5.* pytest==5.2.1 pytest-benchmark==3.2.2 pytest-mock==1.10.4