diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 1ba63cca4ba..f0855de2070 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -13,12 +13,17 @@ # limitations under the License. +import tempfile +import shutil +from datetime import datetime import os from collections import OrderedDict from typing import Dict, Union from typing import List import grpc import pandas as pd +from google.cloud import storage +from pandavro import to_avro from feast.exceptions import format_grpc_exception from feast.core.CoreService_pb2 import ( GetFeastCoreVersionRequest, @@ -35,9 +40,18 @@ GetBatchFeaturesRequest, GetFeastServingInfoRequest, GetOnlineFeaturesResponse, + DatasetSource, + DataFormat, + FeatureSetRequest, + FeastServingType, ) from feast.serving.ServingService_pb2_grpc import ServingServiceStub +from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse +from urllib.parse import urlparse +import uuid +import numpy as np +from feast.utils.loaders import export_dataframe_to_staging_location GRPC_CONNECTION_TIMEOUT_DEFAULT = 3 # type: int GRPC_CONNECTION_TIMEOUT_APPLY = 300 # type: int @@ -275,49 +289,113 @@ def list_entities(self) -> Dict[str, Entity]: return entities_dict def get_batch_features( - self, feature_ids: List[str], entity_rows: List[GetBatchFeaturesRequest] + self, feature_ids: List[str], entity_rows: pd.DataFrame ) -> Job: """ + Retrieves historical features from a Feast Serving deployment. Args: - feature_ids: List of feature ids in the following format - "feature_set_name:version:feature_name". + feature_ids: List of feature ids that will be returned for each entity. + Each feature id should have the following format "feature_set_name:version:feature_name". - entity_rows: List of GetFeaturesRequest.EntityRow where each row - contains entities and a timestamp. - - Feature values will be looked up based on feature_id - and entities, where entities are the keys. The latest - feature value will be retrieved based on the timestamp + entity_rows: Pandas dataframe containing entities and a 'datetime' column. Each entity in + a feature set must be present as a column in this dataframe. The datetime column must + contain timestamps in epoch form and must have the type np.int64 Returns: Feast batch retrieval job: feast.job.Job Example usage: ============================================================ - >>> from feast.client import Client + >>> from feast import Client >>> from datetime import datetime >>> >>> feast_client = Client(core_url="localhost:6565", serving_url="localhost:6566") - >>> feature_ids = ["driver:1:city"] - >>> entity_rows = [GetFeaturesRequest.EntityRow( - >>> fields={'driver_id': ProtoValue(int64_val='1234')}) - >>> for user_id in user_emb_df['user_id']] + >>> feature_ids = ["customer:1:bookings_7d"] + >>> entity_rows = pd.DataFrame( + >>> { + >>> "datetime": [np.int64(time.time_ns()) for _ in range(3)], + >>> "customer": [1001, 1002, 1003], + >>> } + >>> ) >>> feature_retrieval_job = feast_client.get_batch_features(feature_ids, entity_rows) >>> df = feature_retrieval_job.to_dataframe() >>> print(df) """ - self._connect_serving(skip_if_connected=True) + self._connect_serving() + + try: + fs_request = _build_feature_set_request(feature_ids) + + # Validate entity rows based on entities in Feast Core + self._validate_entity_rows_for_batch_retrieval(entity_rows, fs_request) + + # Retrieve serving information to determine store type and staging location + serving_info = ( + self._serving_service_stub.GetFeastServingInfo() + ) # type: GetFeastServingInfoResponse + + if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH: + raise Exception( + f'You are connected to a store "{self._serving_url}" which does not support batch retrieval' + ) + + # Export and upload entity row dataframe to staging location provided by Feast + staged_file = export_dataframe_to_staging_location( + entity_rows, serving_info.job_staging_location + ) # type: str + + request = GetBatchFeaturesRequest( + feature_sets=fs_request, + dataset_source=DatasetSource( + file_source=DatasetSource.FileSource( + file_uris=[staged_file], data_format=DataFormat.DATA_FORMAT_AVRO + ) + ), + ) + + # Retrieve Feast Job object to manage life cycle of retrieval + response = self._serving_service_stub.GetBatchFeatures(request) + return Job(response.job, self._serving_service_stub) - request = GetFeaturesRequest( - feature_sets=_build_online_feature_request(feature_ids), - entity_rows=entity_rows, - ) + except grpc.RpcError as e: + print(format_grpc_exception("GetBatchFeatures", e.code(), e.details())) - response = self._serving_service_stub.GetBatchFeatures(request) + def _validate_entity_rows_for_batch_retrieval( + self, entity_rows, feature_sets_request + ): + """ + Validate whether an entity_row dataframe contains the correct information for batch retrieval + :param entity_rows: Pandas dataframe containing entities and datetime column. Each entity in a feature set + must be present as a column in this dataframe. The datetime column must contain Unix Epoch values down to + nanoseconds + :param feature_sets_request: Feature sets that will + """ + # Ensure datetime column exists + if "datetime" not in entity_rows.columns: + raise ValueError( + f'Entity rows does not contain "datetime" column in columns {entity_rows.columns}' + ) + if entity_rows["datetime"].dtype != np.int64: + raise ValueError( + f"\"datetime\" column in entity rows is not of type np.int64, but {entity_rows['datetime'].dtype}" + ) - return Job(response.job, self._serving_service_stub) + # Validate dataframe columns based on feature set entities + for feature_set in feature_sets_request: + fs = self.get_feature_set( + name=feature_set.name, version=feature_set.version + ) + if fs is None: + raise ValueError( + f'Feature set "{feature_set.name}:{feature_set.version}" could not be found' + ) + for entity_type in fs.entities: + if entity_type.name not in entity_rows.columns: + raise ValueError( + f'Dataframe does not contain entity "{entity_type.name}" column in columns "{entity_rows.columns}"' + ) def get_online_features( self, @@ -338,12 +416,12 @@ def get_online_features( :return: Returns a list of maps where each item in the list contains the latest feature values for the provided entities """ - self._connect_serving(skip_if_connected=True) + self._connect_serving() try: response = self._serving_service_stub.GetOnlineFeatures( GetOnlineFeaturesRequest( - feature_sets=_build_online_feature_request(feature_ids), + feature_sets=_build_feature_set_request(feature_ids), entity_rows=entity_rows, ) ) # type: GetOnlineFeaturesResponse @@ -353,11 +431,11 @@ def get_online_features( return response -def _build_online_feature_request(feature_ids: List[str]) -> List[FeatureSet]: +def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest]: """ Builds a list of FeatureSet objects from feature set ids in order to retrieve feature data from Feast Serving """ - feature_set_request = dict() # type: Dict[str, GetOnlineFeaturesRequest.FeatureSet] + feature_set_request = dict() # type: Dict[str, FeatureSetRequest] for feature_id in feature_ids: fid_parts = feature_id.split(":") if len(fid_parts) == 3: @@ -368,7 +446,7 @@ def _build_online_feature_request(feature_ids: List[str]) -> List[FeatureSet]: ) if feature_set not in feature_set_request: - feature_set_request[feature_set] = GetOnlineFeaturesRequest.FeatureSet( + feature_set_request[feature_set] = FeatureSetRequest( name=feature_set, version=int(version) ) feature_set_request[feature_set].feature_names.append(feature) diff --git a/sdk/python/feast/job.py b/sdk/python/feast/job.py index aa0fd1a8457..2d86f2aaefc 100644 --- a/sdk/python/feast/job.py +++ b/sdk/python/feast/job.py @@ -1,7 +1,10 @@ import tempfile import time from datetime import datetime, timedelta +from typing import List +from urllib.parse import urlparse +import fastavro import pandas as pd from fastavro import reader as fastavro_reader from google.cloud import storage @@ -14,10 +17,6 @@ ) from feast.serving.ServingService_pb2_grpc import ServingServiceStub -# TODO: Need to profile and check the performance and memory consumption of -# the current approach to read files into pandas DataFrame or iterate the -# data row by row. - # Maximum no of seconds to wait until the jobs status is DONE in Feast # Currently set to the maximum query execution time limit in BigQuery DEFAULT_TIMEOUT_SEC: int = 21600 @@ -40,7 +39,7 @@ def __init__(self, job_proto: JobProto, serving_stub: ServingServiceStub): """ self.job_proto = job_proto self.serving_stub = serving_stub - self.storage_client = storage.Client() + self.storage_client = storage.Client(project=None) @property def id(self): @@ -91,18 +90,23 @@ def result(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC): "your Feast Serving deployment." ) - for file_uri in self.job_proto.file_uris: - if not file_uri.startswith("gs://"): + uris = [urlparse(uri) for uri in self.job_proto.file_uris] + for file_uri in uris: + if file_uri.scheme == "gs": + file_obj = tempfile.TemporaryFile() + self.storage_client.download_blob_to_file(file_uri, file_obj) + elif file_uri.scheme == "file": + file_obj = open(file_uri.path, "rb") + else: raise Exception( - "Feast only supports reading from Google Cloud " - "Storage for now. Please check your Feast Serving deployment." + f"Could not identify file URI {file_uri}. Only gs:// and file:// supported" ) - with tempfile.TemporaryFile() as file_obj: - self.storage_client.download_blob_to_file(file_uri, file_obj) - file_obj.seek(0) - avro_reader = fastavro_reader(file_obj) - for record in avro_reader: - yield record + + file_obj.seek(0) + avro_reader = fastavro.reader(file_obj) + + for record in avro_reader: + yield record def to_dataframe(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC): """ diff --git a/sdk/python/feast/utils/loaders.py b/sdk/python/feast/utils/loaders.py index 7706e1ba362..0cbb8f12d1d 100644 --- a/sdk/python/feast/utils/loaders.py +++ b/sdk/python/feast/utils/loaders.py @@ -1,8 +1,23 @@ +import shutil +import tempfile +from typing import Optional +from urllib.parse import urlparse +import uuid import yaml +import pandas as pd +from datetime import datetime +from google.cloud import storage +from pandavro import to_avro def yaml_loader(yml, load_single=False): - + """ + Loads one or more Feast resources from a YAML path or string. Multiple resources + can be divided by three hyphens '---' + :param yml: A path ending in .yaml or .yml, or a YAML string + :param load_single: Expect only a single YAML resource, fail otherwise + :return: Either a single YAML dictionary or a list of YAML dictionaries + """ if ( isinstance(yml, str) and yml.count("\n") == 0 @@ -15,7 +30,7 @@ def yaml_loader(yml, load_single=False): yml_content = yml else: raise Exception( - f"Invalid YAML provided. Please provide either a file path, YAML string, or dictionary: ${yml}" + f"Invalid YAML provided. Please provide either a file path or YAML string: ${yml}" ) yaml_strings = yml_content.strip("---").split("---") @@ -40,3 +55,66 @@ def yaml_to_dict(yaml_string): if not isinstance(yaml_dict, dict) or not "kind" in yaml_dict: raise Exception(f"Could not detect YAML kind from resource: ${yaml_string}") return yaml_dict + + +def export_dataframe_to_staging_location( + df: pd.DataFrame, staging_location_uri: str +) -> str: + """ + Uploads a dataframe to a remote staging location + :param df: Pandas dataframe + :param staging_location_uri: Remote staging location where dataframe should be written + Examples: gs://bucket/path/ + file:///data/subfolder/ + :return: Returns the full path to the file in the remote staging location + """ + # Validate staging location + uri = urlparse(staging_location_uri) + if uri.scheme == "gs": + dir_path, file_name, source_path = export_dataframe_to_local(df) + upload_file_to_gcs( + source_path, uri.hostname, str(uri.path).lstrip("/") + file_name + ) + if len(str(dir_path)) < 5: + raise Exception(f"Export location {dir_path} dangerous. Stopping.") + shutil.rmtree(dir_path) + elif uri.scheme == "file": + dir_path, file_name, source_path = export_dataframe_to_local(df, uri.path) + else: + raise Exception( + f"Staging location {staging_location_uri} does not have a valid URI. Only gs:// and file:// are supported" + ) + return staging_location_uri + file_name + + +def upload_file_to_gcs(local_path: str, bucket: str, remote_path: str): + """ + Upload a file from the local file system to Google Cloud Storage (GCS) + :param local_path: Local filesystem path of file to upload + :param bucket: GCS bucket to upload to + :param remote_path: Path within GCS bucket to upload file to, includes file name + """ + storage_client = storage.Client(project=None) + bucket = storage_client.get_bucket(bucket) + blob = bucket.blob(local_path) + blob.upload_from_filename(remote_path) + + +def export_dataframe_to_local(df: pd.DataFrame, dir_path: Optional[str] = None): + """ + Exports a pandas dataframe to the local filesystem + :param df: Pandas dataframe to save + :param dir_path: (optional) Absolute directory path '/data/project/subfolder/' + :return: + """ + # Create local staging location if not provided + if dir_path is None: + dir_path = tempfile.mkdtemp() + + file_name = f'{datetime.now().strftime("%d-%m-%Y_%I-%M-%S_%p")}_{str(uuid.uuid4())[:8]}.avro' + dest_path = f"{dir_path}/{file_name}" + + # Export dataset to file in local path + to_avro(df=df, file_path_or_buffer=dest_path) + + return dir_path, file_name, dest_path diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 4b0fbab8d9e..b78cb8a6c86 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -35,6 +35,7 @@ "google-cloud-bigquery-storage==0.*", "grpcio==1.*", "pandas==0.*", + "pandavro==1.5.1", "protobuf==3.*", "PyYAML==5.1.2", "fastavro==0.*", diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index 366d5110372..7966fabe8d9 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -1,4 +1,4 @@ -# Copyright 2018 The Feast Authors +# Copyright 2019 The Feast Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,10 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import tempfile +import time import grpc import pandas as pd import numpy as np +from pandavro import to_avro import feast.core.CoreService_pb2_grpc as Core import feast.serving.ServingService_pb2_grpc as Serving from feast.core.FeatureSet_pb2 import FeatureSetSpec, FeatureSpec, EntitySpec @@ -27,18 +29,22 @@ GetFeastServingInfoResponse, GetOnlineFeaturesResponse, GetOnlineFeaturesRequest, + GetBatchFeaturesResponse, + Job as BatchFeaturesJob, + JobType, + JobStatus, + DataFormat, + GetJobResponse, + FeastServingType, ) -from google.protobuf.timestamp_pb2 import Timestamp import pytest from feast.client import Client from concurrent import futures from feast_core_server import CoreServicer from feast_serving_server import ServingServicer -from feast.types import FeatureRow_pb2 as FeatureRowProto -from feast.types import Field_pb2 as FieldProto from feast.types import Value_pb2 as ValueProto from feast.value_type import ValueType - +from feast.job import Job CORE_URL = "core.feast.example.com" SERVING_URL = "serving.example.com" @@ -100,7 +106,7 @@ def test_version(self, mock_client, mocker): and status["serving"]["version"] == "0.3.0" ) - def test_get_feature(self, mock_client, mocker): + def test_get_online_features(self, mock_client, mocker): ROW_COUNT = 300 mock_client._serving_service_stub = Serving.ServingServiceStub( @@ -202,3 +208,118 @@ def test_get_feature_set(self, mock_client, mocker): and len(feature_set.features) == 2 and len(feature_set.entities) == 1 ) + + def test_get_batch_features(self, mock_client, mocker): + + mock_client._serving_service_stub = Serving.ServingServiceStub( + grpc.insecure_channel("") + ) + mock_client._core_service_stub = Core.CoreServiceStub(grpc.insecure_channel("")) + + mocker.patch.object( + mock_client._core_service_stub, + "GetFeatureSets", + return_value=GetFeatureSetsResponse( + feature_sets=[ + FeatureSetSpec( + name="customer_fs", + version=1, + entities=[ + EntitySpec( + name="customer", value_type=ValueProto.ValueType.INT64 + ), + EntitySpec( + name="transaction", + value_type=ValueProto.ValueType.INT64, + ), + ], + features=[ + FeatureSpec( + name="customer_feature_1", + value_type=ValueProto.ValueType.FLOAT, + ), + FeatureSpec( + name="customer_feature_2", + value_type=ValueProto.ValueType.STRING, + ), + ], + ) + ] + ), + ) + + expected_dataframe = pd.DataFrame( + { + "datetime": [np.int64(time.time_ns()) for _ in range(3)], + "customer": [1001, 1002, 1003], + "transaction": [1001, 1002, 1003], + "customer_fs:1:customer_feature_1": [1001, 1002, 1003], + "customer_fs:1:customer_feature_2": [1001, 1002, 1003], + } + ) + + final_results = tempfile.mktemp() + to_avro(file_path_or_buffer=final_results, df=expected_dataframe) + + mocker.patch.object( + mock_client._serving_service_stub, + "GetBatchFeatures", + return_value=GetBatchFeaturesResponse( + job=BatchFeaturesJob( + id="123", + type=JobType.JOB_TYPE_DOWNLOAD, + status=JobStatus.JOB_STATUS_DONE, + file_uris=[f"file://{final_results}"], + data_format=DataFormat.DATA_FORMAT_AVRO, + ) + ), + ) + + mocker.patch.object( + mock_client._serving_service_stub, + "GetJob", + return_value=GetJobResponse( + job=BatchFeaturesJob( + id="123", + type=JobType.JOB_TYPE_DOWNLOAD, + status=JobStatus.JOB_STATUS_DONE, + file_uris=[f"file://{final_results}"], + data_format=DataFormat.DATA_FORMAT_AVRO, + ) + ), + ) + + mocker.patch.object( + mock_client._serving_service_stub, + "GetFeastServingInfo", + return_value=GetFeastServingInfoResponse( + job_staging_location=f"file://{tempfile.mkdtemp()}", + type=FeastServingType.FEAST_SERVING_TYPE_BATCH, + ), + ) + + response = mock_client.get_batch_features( + entity_rows=pd.DataFrame( + { + "datetime": [np.int64(time.time_ns()) for _ in range(3)], + "customer": [1001, 1002, 1003], + "transaction": [1001, 1002, 1003], + } + ), + feature_ids=[ + "customer_fs:1:customer_feature_1", + "customer_fs:1:customer_feature_2", + ], + ) # type: Job + + assert response.id == "123" and response.status == JobStatus.JOB_STATUS_DONE + + actual_dataframe = response.to_dataframe() + + assert actual_dataframe[ + ["customer_fs:1:customer_feature_1", "customer_fs:1:customer_feature_2"] + ].equals( + expected_dataframe[ + ["customer_fs:1:customer_feature_1", "customer_fs:1:customer_feature_2"] + ] + )