Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 104 additions & 26 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@
# limitations under the License.


import tempfile
import shutil
from datetime import datetime
import os
from collections import OrderedDict
from typing import Dict, Union
from typing import List
import grpc
import pandas as pd
from google.cloud import storage
from pandavro import to_avro
from feast.exceptions import format_grpc_exception
from feast.core.CoreService_pb2 import (
GetFeastCoreVersionRequest,
Expand All @@ -35,9 +40,18 @@
GetBatchFeaturesRequest,
GetFeastServingInfoRequest,
GetOnlineFeaturesResponse,
DatasetSource,
DataFormat,
FeatureSetRequest,
FeastServingType,
)
from feast.serving.ServingService_pb2_grpc import ServingServiceStub
from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse
from urllib.parse import urlparse
import uuid
import numpy as np

from feast.utils.loaders import export_dataframe_to_staging_location

GRPC_CONNECTION_TIMEOUT_DEFAULT = 3 # type: int
GRPC_CONNECTION_TIMEOUT_APPLY = 300 # type: int
Expand Down Expand Up @@ -275,49 +289,113 @@ def list_entities(self) -> Dict[str, Entity]:
return entities_dict

def get_batch_features(
self, feature_ids: List[str], entity_rows: List[GetBatchFeaturesRequest]
self, feature_ids: List[str], entity_rows: pd.DataFrame
) -> Job:
"""
Retrieves historical features from a Feast Serving deployment.

Args:
feature_ids: List of feature ids in the following format
"feature_set_name:version:feature_name".
feature_ids: List of feature ids that will be returned for each entity.
Each feature id should have the following format "feature_set_name:version:feature_name".

entity_rows: List of GetFeaturesRequest.EntityRow where each row
contains entities and a timestamp.

Feature values will be looked up based on feature_id
and entities, where entities are the keys. The latest
feature value will be retrieved based on the timestamp
entity_rows: Pandas dataframe containing entities and a 'datetime' column. Each entity in
a feature set must be present as a column in this dataframe. The datetime column must
contain timestamps in epoch form and must have the type np.int64

Returns:
Feast batch retrieval job: feast.job.Job

Example usage:
============================================================
>>> from feast.client import Client
>>> from feast import Client
>>> from datetime import datetime
>>>
>>> feast_client = Client(core_url="localhost:6565", serving_url="localhost:6566")
>>> feature_ids = ["driver:1:city"]
>>> entity_rows = [GetFeaturesRequest.EntityRow(
>>> fields={'driver_id': ProtoValue(int64_val='1234')})
>>> for user_id in user_emb_df['user_id']]
>>> feature_ids = ["customer:1:bookings_7d"]
>>> entity_rows = pd.DataFrame(
>>> {
>>> "datetime": [np.int64(time.time_ns()) for _ in range(3)],
>>> "customer": [1001, 1002, 1003],
>>> }
>>> )
>>> feature_retrieval_job = feast_client.get_batch_features(feature_ids, entity_rows)
>>> df = feature_retrieval_job.to_dataframe()
>>> print(df)
"""

self._connect_serving(skip_if_connected=True)
self._connect_serving()

try:
fs_request = _build_feature_set_request(feature_ids)

# Validate entity rows based on entities in Feast Core
self._validate_entity_rows_for_batch_retrieval(entity_rows, fs_request)

# Retrieve serving information to determine store type and staging location
serving_info = (
self._serving_service_stub.GetFeastServingInfo()
) # type: GetFeastServingInfoResponse

if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH:
raise Exception(
f'You are connected to a store "{self._serving_url}" which does not support batch retrieval'
)

# Export and upload entity row dataframe to staging location provided by Feast
staged_file = export_dataframe_to_staging_location(
entity_rows, serving_info.job_staging_location
) # type: str

request = GetBatchFeaturesRequest(
feature_sets=fs_request,
dataset_source=DatasetSource(
file_source=DatasetSource.FileSource(
file_uris=[staged_file], data_format=DataFormat.DATA_FORMAT_AVRO
)
),
)

# Retrieve Feast Job object to manage life cycle of retrieval
response = self._serving_service_stub.GetBatchFeatures(request)
return Job(response.job, self._serving_service_stub)

request = GetFeaturesRequest(
feature_sets=_build_online_feature_request(feature_ids),
entity_rows=entity_rows,
)
except grpc.RpcError as e:
print(format_grpc_exception("GetBatchFeatures", e.code(), e.details()))

response = self._serving_service_stub.GetBatchFeatures(request)
def _validate_entity_rows_for_batch_retrieval(
self, entity_rows, feature_sets_request
):
"""
Validate whether an entity_row dataframe contains the correct information for batch retrieval
:param entity_rows: Pandas dataframe containing entities and datetime column. Each entity in a feature set
must be present as a column in this dataframe. The datetime column must contain Unix Epoch values down to
nanoseconds
:param feature_sets_request: Feature sets that will
"""
# Ensure datetime column exists
if "datetime" not in entity_rows.columns:
raise ValueError(
f'Entity rows does not contain "datetime" column in columns {entity_rows.columns}'
)
if entity_rows["datetime"].dtype != np.int64:
raise ValueError(
f"\"datetime\" column in entity rows is not of type np.int64, but {entity_rows['datetime'].dtype}"
)

return Job(response.job, self._serving_service_stub)
# Validate dataframe columns based on feature set entities
for feature_set in feature_sets_request:
fs = self.get_feature_set(
name=feature_set.name, version=feature_set.version
)
if fs is None:
raise ValueError(
f'Feature set "{feature_set.name}:{feature_set.version}" could not be found'
)
for entity_type in fs.entities:
if entity_type.name not in entity_rows.columns:
raise ValueError(
f'Dataframe does not contain entity "{entity_type.name}" column in columns "{entity_rows.columns}"'
)

def get_online_features(
self,
Expand All @@ -338,12 +416,12 @@ def get_online_features(
:return: Returns a list of maps where each item in the list contains
the latest feature values for the provided entities
"""
self._connect_serving(skip_if_connected=True)
self._connect_serving()

try:
response = self._serving_service_stub.GetOnlineFeatures(
GetOnlineFeaturesRequest(
feature_sets=_build_online_feature_request(feature_ids),
feature_sets=_build_feature_set_request(feature_ids),
entity_rows=entity_rows,
)
) # type: GetOnlineFeaturesResponse
Expand All @@ -353,11 +431,11 @@ def get_online_features(
return response


def _build_online_feature_request(feature_ids: List[str]) -> List[FeatureSet]:
def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest]:
"""
Builds a list of FeatureSet objects from feature set ids in order to retrieve feature data from Feast Serving
"""
feature_set_request = dict() # type: Dict[str, GetOnlineFeaturesRequest.FeatureSet]
feature_set_request = dict() # type: Dict[str, FeatureSetRequest]
for feature_id in feature_ids:
fid_parts = feature_id.split(":")
if len(fid_parts) == 3:
Expand All @@ -368,7 +446,7 @@ def _build_online_feature_request(feature_ids: List[str]) -> List[FeatureSet]:
)

if feature_set not in feature_set_request:
feature_set_request[feature_set] = GetOnlineFeaturesRequest.FeatureSet(
feature_set_request[feature_set] = FeatureSetRequest(
name=feature_set, version=int(version)
)
feature_set_request[feature_set].feature_names.append(feature)
Expand Down
34 changes: 19 additions & 15 deletions sdk/python/feast/job.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import tempfile
import time
from datetime import datetime, timedelta
from typing import List
from urllib.parse import urlparse

import fastavro
import pandas as pd
from fastavro import reader as fastavro_reader
from google.cloud import storage
Expand All @@ -14,10 +17,6 @@
)
from feast.serving.ServingService_pb2_grpc import ServingServiceStub

# TODO: Need to profile and check the performance and memory consumption of
# the current approach to read files into pandas DataFrame or iterate the
# data row by row.

# Maximum no of seconds to wait until the jobs status is DONE in Feast
# Currently set to the maximum query execution time limit in BigQuery
DEFAULT_TIMEOUT_SEC: int = 21600
Expand All @@ -40,7 +39,7 @@ def __init__(self, job_proto: JobProto, serving_stub: ServingServiceStub):
"""
self.job_proto = job_proto
self.serving_stub = serving_stub
self.storage_client = storage.Client()
self.storage_client = storage.Client(project=None)

@property
def id(self):
Expand Down Expand Up @@ -91,18 +90,23 @@ def result(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC):
"your Feast Serving deployment."
)

for file_uri in self.job_proto.file_uris:
if not file_uri.startswith("gs://"):
uris = [urlparse(uri) for uri in self.job_proto.file_uris]
for file_uri in uris:
if file_uri.scheme == "gs":
file_obj = tempfile.TemporaryFile()
self.storage_client.download_blob_to_file(file_uri, file_obj)
elif file_uri.scheme == "file":
file_obj = open(file_uri.path, "rb")
else:
raise Exception(
"Feast only supports reading from Google Cloud "
"Storage for now. Please check your Feast Serving deployment."
f"Could not identify file URI {file_uri}. Only gs:// and file:// supported"
)
with tempfile.TemporaryFile() as file_obj:
self.storage_client.download_blob_to_file(file_uri, file_obj)
file_obj.seek(0)
avro_reader = fastavro_reader(file_obj)
for record in avro_reader:
yield record

file_obj.seek(0)
avro_reader = fastavro.reader(file_obj)

for record in avro_reader:
yield record

def to_dataframe(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC):
"""
Expand Down
82 changes: 80 additions & 2 deletions sdk/python/feast/utils/loaders.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
import shutil
import tempfile
from typing import Optional
from urllib.parse import urlparse
import uuid
import yaml
import pandas as pd
from datetime import datetime
from google.cloud import storage
from pandavro import to_avro


def yaml_loader(yml, load_single=False):

"""
Loads one or more Feast resources from a YAML path or string. Multiple resources
can be divided by three hyphens '---'
:param yml: A path ending in .yaml or .yml, or a YAML string
:param load_single: Expect only a single YAML resource, fail otherwise
:return: Either a single YAML dictionary or a list of YAML dictionaries
"""
if (
isinstance(yml, str)
and yml.count("\n") == 0
Expand All @@ -15,7 +30,7 @@ def yaml_loader(yml, load_single=False):
yml_content = yml
else:
raise Exception(
f"Invalid YAML provided. Please provide either a file path, YAML string, or dictionary: ${yml}"
f"Invalid YAML provided. Please provide either a file path or YAML string: ${yml}"
)

yaml_strings = yml_content.strip("---").split("---")
Expand All @@ -40,3 +55,66 @@ def yaml_to_dict(yaml_string):
if not isinstance(yaml_dict, dict) or not "kind" in yaml_dict:
raise Exception(f"Could not detect YAML kind from resource: ${yaml_string}")
return yaml_dict


def export_dataframe_to_staging_location(
df: pd.DataFrame, staging_location_uri: str
) -> str:
"""
Uploads a dataframe to a remote staging location
:param df: Pandas dataframe
:param staging_location_uri: Remote staging location where dataframe should be written
Examples: gs://bucket/path/
file:///data/subfolder/
:return: Returns the full path to the file in the remote staging location
"""
# Validate staging location
uri = urlparse(staging_location_uri)
if uri.scheme == "gs":
dir_path, file_name, source_path = export_dataframe_to_local(df)
upload_file_to_gcs(
source_path, uri.hostname, str(uri.path).lstrip("/") + file_name
)
if len(str(dir_path)) < 5:
raise Exception(f"Export location {dir_path} dangerous. Stopping.")
shutil.rmtree(dir_path)
elif uri.scheme == "file":
dir_path, file_name, source_path = export_dataframe_to_local(df, uri.path)
else:
raise Exception(
f"Staging location {staging_location_uri} does not have a valid URI. Only gs:// and file:// are supported"
)
return staging_location_uri + file_name


def upload_file_to_gcs(local_path: str, bucket: str, remote_path: str):
"""
Upload a file from the local file system to Google Cloud Storage (GCS)
:param local_path: Local filesystem path of file to upload
:param bucket: GCS bucket to upload to
:param remote_path: Path within GCS bucket to upload file to, includes file name
"""
storage_client = storage.Client(project=None)
bucket = storage_client.get_bucket(bucket)
blob = bucket.blob(local_path)
blob.upload_from_filename(remote_path)


def export_dataframe_to_local(df: pd.DataFrame, dir_path: Optional[str] = None):
"""
Exports a pandas dataframe to the local filesystem
:param df: Pandas dataframe to save
:param dir_path: (optional) Absolute directory path '/data/project/subfolder/'
:return:
"""
# Create local staging location if not provided
if dir_path is None:
dir_path = tempfile.mkdtemp()

file_name = f'{datetime.now().strftime("%d-%m-%Y_%I-%M-%S_%p")}_{str(uuid.uuid4())[:8]}.avro'
dest_path = f"{dir_path}/{file_name}"

# Export dataset to file in local path
to_avro(df=df, file_path_or_buffer=dest_path)

return dir_path, file_name, dest_path
1 change: 1 addition & 0 deletions sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"google-cloud-bigquery-storage==0.*",
"grpcio==1.*",
"pandas==0.*",
"pandavro==1.5.1",
"protobuf==3.*",
"PyYAML==5.1.2",
"fastavro==0.*",
Expand Down
Loading