Skip to content
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

from feast.client import Client
from feast.config import Config
from feast.core.IngestionJob_pb2 import IngestionJobStatus
from feast.feature_set import FeatureSet, FeatureSetRef
from feast.loaders.yaml import yaml_loader
from feast.core.IngestionJob_pb2 import IngestionJobStatus

_logger = logging.getLogger(__name__)

Expand Down
38 changes: 23 additions & 15 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@
GetFeatureSetResponse,
ListFeatureSetsRequest,
ListFeatureSetsResponse,
ListIngestionJobsRequest,
ListProjectsRequest,
ListProjectsResponse,
ListIngestionJobsRequest,
RestartIngestionJobRequest,
StopIngestionJobRequest,
)
from feast.core.CoreService_pb2_grpc import CoreServiceStub
from feast.core.FeatureSet_pb2 import FeatureSetStatus
from feast.feature_set import Entity, FeatureSet, FeatureSetRef
from feast.job import RetrievalJob, IngestJob
from feast.job import IngestJob, RetrievalJob
from feast.loaders.abstract_producer import get_producer
from feast.loaders.file import export_source_to_staging_location
from feast.loaders.ingest import KAFKA_CHUNK_PRODUCTION_TIMEOUT, get_feature_row_chunks
Expand Down Expand Up @@ -566,8 +566,8 @@ def get_batch_features(

if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH:
raise Exception(
f'You are connected to a store "{self._serving_url}" which '
f"does not support batch retrieval "
f'You are connected to a store "{self.serving_url}" which '
f"does not support batch retrieval"
)

if isinstance(entity_rows, pd.DataFrame):
Expand Down Expand Up @@ -597,7 +597,6 @@ def get_batch_features(
staged_files = export_source_to_staging_location(
entity_rows, serving_info.job_staging_location
) # type: List[str]

request = GetBatchFeaturesRequest(
features=feature_references,
dataset_source=DatasetSource(
Expand All @@ -608,7 +607,11 @@ def get_batch_features(
)

# Retrieve Feast Job object to manage life cycle of retrieval
response = self._serving_service_stub.GetBatchFeatures(request)
try:
response = self._serving_service_stub.GetBatchFeatures(request)
except grpc.RpcError as e:
raise grpc.RpcError(e.details())

return RetrievalJob(response.job, self._serving_service_stub)

def get_online_features(
Expand Down Expand Up @@ -639,17 +642,22 @@ def get_online_features(
"""
self._connect_serving()

return self._serving_service_stub.GetOnlineFeatures(
GetOnlineFeaturesRequest(
features=_build_feature_references(
feature_refs=feature_refs,
default_project=(
default_project if not self.project else self.project
try:
response = self._serving_service_stub.GetOnlineFeatures(
GetOnlineFeaturesRequest(
features=_build_feature_references(
feature_refs=feature_refs,
default_project=(
default_project if not self.project else self.project
),
),
),
entity_rows=entity_rows,
entity_rows=entity_rows,
)
)
)
except grpc.RpcError as e:
raise grpc.RpcError(e.details())

return response

def list_ingest_jobs(
self,
Expand Down
5 changes: 2 additions & 3 deletions sdk/python/feast/feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
# limitations under the License.
import warnings
from collections import OrderedDict
from typing import Dict
from typing import List, Optional
from typing import Dict, List, Optional

import pandas as pd
import pyarrow as pa
Expand All @@ -24,7 +23,6 @@
from google.protobuf.message import Message
from pandas.api.types import is_datetime64_ns_dtype
from pyarrow.lib import TimestampType
from tensorflow_metadata.proto.v0 import schema_pb2

from feast.core.FeatureSet_pb2 import FeatureSet as FeatureSetProto
from feast.core.FeatureSet_pb2 import FeatureSetMeta as FeatureSetMetaProto
Expand All @@ -41,6 +39,7 @@
pa_to_feast_value_type,
python_type_to_feast_value_type,
)
from tensorflow_metadata.proto.v0 import schema_pb2


class FeatureSet:
Expand Down
14 changes: 7 additions & 7 deletions sdk/python/feast/job.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import tempfile
import time
from datetime import datetime, timedelta
from urllib.parse import urlparse
from typing import List
from urllib.parse import urlparse

import fastavro
import pandas as pd
from google.cloud import storage
from google.protobuf.json_format import MessageToJson

from feast.core.CoreService_pb2 import ListIngestionJobsRequest
from feast.core.CoreService_pb2_grpc import CoreServiceStub
from feast.core.IngestionJob_pb2 import IngestionJob as IngestJobProto
from feast.core.IngestionJob_pb2 import IngestionJobStatus
from feast.core.Store_pb2 import Store
from feast.feature_set import FeatureSet
from feast.source import Source
from feast.serving.ServingService_pb2 import (
DATA_FORMAT_AVRO,
JOB_STATUS_DONE,
GetJobRequest,
)
from feast.serving.ServingService_pb2 import Job as JobProto
from feast.serving.ServingService_pb2_grpc import ServingServiceStub
from feast.core.Store_pb2 import Store
from feast.core.IngestionJob_pb2 import IngestionJob as IngestJobProto
from feast.core.IngestionJob_pb2 import IngestionJobStatus
from feast.core.CoreService_pb2_grpc import CoreServiceStub
from feast.core.CoreService_pb2 import ListIngestionJobsRequest
from feast.source import Source

# Maximum no of seconds to wait until the retrieval jobs status is DONE in Feast
# Currently set to the maximum query execution time limit in BigQuery
Expand Down
8 changes: 3 additions & 5 deletions sdk/python/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,16 @@
GetFeatureSetResponse,
ListIngestionJobsResponse,
)
from feast.core.Store_pb2 import Store
from feast.core.IngestionJob_pb2 import (
IngestionJob as IngestJobProto,
IngestionJobStatus,
)
from feast.core.FeatureSet_pb2 import EntitySpec as EntitySpecProto
from feast.core.FeatureSet_pb2 import FeatureSet as FeatureSetProto
from feast.core.FeatureSet_pb2 import FeatureSetMeta as FeatureSetMetaProto
from feast.core.FeatureSet_pb2 import FeatureSetSpec as FeatureSetSpecProto
from feast.core.FeatureSet_pb2 import FeatureSetStatus as FeatureSetStatusProto
from feast.core.FeatureSet_pb2 import FeatureSpec as FeatureSpecProto
from feast.core.IngestionJob_pb2 import IngestionJob as IngestJobProto
from feast.core.IngestionJob_pb2 import IngestionJobStatus
from feast.core.Source_pb2 import KafkaSourceConfig, Source, SourceType
from feast.core.Store_pb2 import Store
from feast.entity import Entity
from feast.feature_set import Feature, FeatureSet, FeatureSetRef
from feast.job import IngestJob
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/test_feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import pytest
import pytz
from google.protobuf import json_format
from tensorflow_metadata.proto.v0 import schema_pb2

import dataframes
import feast.core.CoreService_pb2_grpc as Core
Expand All @@ -34,6 +33,7 @@
)
from feast.value_type import ValueType
from feast_core_server import CoreServicer
from tensorflow_metadata.proto.v0 import schema_pb2

CORE_URL = "core.feast.local"
SERVING_URL = "serving.feast.local"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest;
import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse;
import feast.serving.ServingServiceGrpc.ServingServiceImplBase;
import feast.serving.exception.SpecRetrievalException;
import feast.serving.interceptors.GrpcMonitoringInterceptor;
import feast.serving.service.ServingService;
import feast.serving.util.RequestHelper;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.opentracing.Scope;
import io.opentracing.Span;
Expand Down Expand Up @@ -74,6 +76,10 @@ public void getOnlineFeatures(
GetOnlineFeaturesResponse onlineFeatures = servingService.getOnlineFeatures(request);
responseObserver.onNext(onlineFeatures);
responseObserver.onCompleted();
} catch (SpecRetrievalException e) {
log.error("Failed to retrieve specs in SpecService", e);
responseObserver.onError(
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException());
} catch (Exception e) {
log.warn("Failed to get Online Features", e);
responseObserver.onError(e);
Expand All @@ -89,6 +95,10 @@ public void getBatchFeatures(
GetBatchFeaturesResponse batchFeatures = servingService.getBatchFeatures(request);
responseObserver.onNext(batchFeatures);
responseObserver.onCompleted();
} catch (SpecRetrievalException e) {
log.error("Failed to retrieve specs in SpecService", e);
responseObserver.onError(
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException());
} catch (Exception e) {
log.warn("Failed to get Batch Features", e);
responseObserver.onError(e);
Expand Down
14 changes: 9 additions & 5 deletions serving/src/main/java/feast/serving/specs/CachedSpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,15 @@ public List<FeatureSetRequest> getFeatureSets(List<FeatureReference> featureRefe
.map(
featureReference -> {
String featureSet =
featureToFeatureSetMapping.getOrDefault(
generateFeatureStringRef(featureReference), "");
if (featureSet.isEmpty()) {
featureToFeatureSetMapping.get(generateFeatureStringRef(featureReference));
if (featureSet == null) {
throw new SpecRetrievalException(
String.format("Unable to retrieve feature %s", featureReference));
String.format(
"Unable to find feature set for feature ref: "
+ "(project: %s, name: %s, version: %d)",
featureReference.getProject(),
featureReference.getName(),
featureReference.getVersion()));
}
return Pair.of(featureSet, featureReference);
})
Expand All @@ -141,7 +145,7 @@ public List<FeatureSetRequest> getFeatureSets(List<FeatureReference> featureRefe
featureSetRequests.add(featureSetRequest);
} catch (ExecutionException e) {
throw new SpecRetrievalException(
String.format("Unable to retrieve featureSet with id %s", fsName), e);
String.format("Unable to find featureSet with name: %s", fsName), e);
}
});
return featureSetRequests;
Expand Down