field : fields.entrySet()) {
+ featureRow.addFields(
+ Field.newBuilder().setName(field.getKey()).setValue(field.getValue()).build());
+ }
+ return featureRow.build();
+ } else {
+ String missingFields =
+ requiredFields.stream()
+ .filter(f -> !fields.keySet().contains(f))
+ .collect(Collectors.joining(","));
+ throw new IllegalArgumentException(
+ "FeatureRow is missing some fields defined in FeatureSetSpec: " + missingFields);
+ }
+ }
+
/**
* Create a Feature Row with random value according to the FeatureSetSpec
*
@@ -352,15 +476,16 @@ public static Field field(String name, Object value, ValueType.Enum valueType) {
/**
* This blocking method waits until an ImportJob pipeline has written all elements to the store.
- *
- * The pipeline must be in the RUNNING state before calling this method.
*
- * @param pipelineResult result of running the Pipeline
+ *
The pipeline must be in the RUNNING state before calling this method.
+ *
+ * @param pipelineResult result of running the Pipeline
* @param maxWaitDuration wait until this max amount of duration
* @throws InterruptedException if the thread is interruped while waiting
*/
- public static void waitUntilAllElementsAreWrittenToStore(PipelineResult pipelineResult,
- Duration maxWaitDuration, Duration checkInterval) throws InterruptedException {
+ public static void waitUntilAllElementsAreWrittenToStore(
+ PipelineResult pipelineResult, Duration maxWaitDuration, Duration checkInterval)
+ throws InterruptedException {
if (pipelineResult.getState().isTerminal()) {
return;
}
@@ -409,4 +534,12 @@ public static void waitUntilAllElementsAreWrittenToStore(PipelineResult pipeline
}
}
}
+
+ public static Value intValue(int val) {
+ return Value.newBuilder().setInt64Val(val).build();
+ }
+
+ public static Value strValue(String val) {
+ return Value.newBuilder().setStringVal(val).build();
+ }
}
diff --git a/pom.xml b/pom.xml
index edf2a0244e9..98586740678 100644
--- a/pom.xml
+++ b/pom.xml
@@ -192,7 +192,7 @@
com.google.guava
guava
- 26.0-jre
+ 25.0-jre
com.google.protobuf
diff --git a/protos/feast/core/Store.proto b/protos/feast/core/Store.proto
index e1b8c581a38..9e1dc33143e 100644
--- a/protos/feast/core/Store.proto
+++ b/protos/feast/core/Store.proto
@@ -17,6 +17,8 @@
syntax = "proto3";
package feast.core;
+import "google/protobuf/duration.proto";
+
option java_package = "feast.core";
option java_outer_classname = "StoreProto";
option go_package = "github.com/gojek/feast/sdk/go/protos/feast/core";
@@ -103,7 +105,20 @@ message Store {
//
BIGQUERY = 2;
- // Unsupported in Feast 0.3
+ // Cassandra stores entities as a string partition key, feature as clustering column.
+ // NOTE: This store currently uses max_age defined in FeatureSet for ttl
+ //
+ // Columns:
+ // - entities: concatenated string of feature set name and all entities' keys and values
+ // entities concatenated format - [feature_set]:[entity_name1=entity_value1]|[entity_name2=entity_value2]
+ // TODO: string representation of float or double types may have different value in different runtime or platform
+ // - feature: clustering column where each feature is a column
+ // - value: byte array of Value (refer to feast.types.Value)
+ //
+ // Internal columns:
+ // - writeTime: timestamp of the written record. This is used to ensure that new records are not replaced
+ // by older ones
+ // - ttl: expiration time the record. Currently using max_age from feature set spec as ttl
CASSANDRA = 3;
}
@@ -118,8 +133,22 @@ message Store {
}
message CassandraConfig {
- string host = 1;
+ // - bootstrapHosts: [comma delimited value of hosts]
+ string bootstrap_hosts = 1;
int32 port = 2;
+ string keyspace = 3;
+
+ // Please note that table name must be "feature_store" as is specified in the @Table annotation of the
+ // datastax object mapper
+ string table_name = 4;
+
+ // This specifies the replication strategy to use. Please refer to docs for more details:
+ // https://docs.datastax.com/en/dse/6.7/cql/cql/cql_reference/cql_commands/cqlCreateKeyspace.html#cqlCreateKeyspace__cqlCreateKeyspacereplicationmap-Pr3yUQ7t
+ map replication_options = 5;
+
+ // Default expiration in seconds to use when FeatureSetSpec does not have max_age defined.
+ // Specify 0 for no default expiration
+ google.protobuf.Duration default_ttl = 6;
}
message Subscription {
diff --git a/serving/pom.xml b/serving/pom.xml
index ab9efaff26e..3d41c18b8dc 100644
--- a/serving/pom.xml
+++ b/serving/pom.xml
@@ -141,7 +141,19 @@
redis.clients
jedis
-
+
+
+ com.datastax.cassandra
+ cassandra-driver-core
+ 3.4.0
+
+
+ io.netty
+ *
+
+
+
+
com.google.guava
guava
@@ -227,6 +239,13 @@
spring-boot-starter-test
test
+
+
+ org.cassandraunit
+ cassandra-unit-shaded
+ 3.11.2.0
+ test
+
diff --git a/serving/sample_cassandra_config.yml b/serving/sample_cassandra_config.yml
new file mode 100644
index 00000000000..ca3d4cbbcca
--- /dev/null
+++ b/serving/sample_cassandra_config.yml
@@ -0,0 +1,13 @@
+name: serving
+type: CASSANDRA
+cassandra_config:
+ bootstrap_hosts: localhost
+ port: 9042
+ keyspace: feast
+ table_name: feature_store
+ replication_options:
+ class: SimpleStrategy
+ replication_factor: 1
+subscriptions:
+ - name: "*"
+ version: ">0"
diff --git a/serving/src/main/java/feast/serving/FeastProperties.java b/serving/src/main/java/feast/serving/FeastProperties.java
index e511835b0aa..356f808127b 100644
--- a/serving/src/main/java/feast/serving/FeastProperties.java
+++ b/serving/src/main/java/feast/serving/FeastProperties.java
@@ -85,6 +85,15 @@ public static class StoreProperties {
private String configPath;
private int redisPoolMaxSize;
private int redisPoolMaxIdle;
+ private int cassandraPoolCoreLocalConnections;
+ private int cassandraPoolMaxLocalConnections;
+ private int cassandraPoolCoreRemoteConnections;
+ private int cassandraPoolMaxRemoteConnections;
+ private int cassandraPoolMaxRequestsLocalConnection;
+ private int cassandraPoolMaxRequestsRemoteConnection;
+ private int cassandraPoolNewLocalConnectionThreshold;
+ private int cassandraPoolNewRemoteConnectionThreshold;
+ private int cassandraPoolTimeoutMillis;
public String getConfigPath() {
return this.configPath;
@@ -98,6 +107,42 @@ public int getRedisPoolMaxIdle() {
return this.redisPoolMaxIdle;
}
+ public int getCassandraPoolCoreLocalConnections() {
+ return this.cassandraPoolCoreLocalConnections;
+ }
+
+ public int getCassandraPoolMaxLocalConnections() {
+ return this.cassandraPoolMaxLocalConnections;
+ }
+
+ public int getCassandraPoolCoreRemoteConnections() {
+ return this.cassandraPoolCoreRemoteConnections;
+ }
+
+ public int getCassandraPoolMaxRemoteConnections() {
+ return this.cassandraPoolMaxRemoteConnections;
+ }
+
+ public int getCassandraPoolMaxRequestsLocalConnection() {
+ return this.cassandraPoolMaxRequestsLocalConnection;
+ }
+
+ public int getCassandraPoolMaxRequestsRemoteConnection() {
+ return this.cassandraPoolMaxRequestsRemoteConnection;
+ }
+
+ public int getCassandraPoolNewLocalConnectionThreshold() {
+ return this.cassandraPoolNewLocalConnectionThreshold;
+ }
+
+ public int getCassandraPoolNewRemoteConnectionThreshold() {
+ return this.cassandraPoolNewRemoteConnectionThreshold;
+ }
+
+ public int getCassandraPoolTimeoutMillis() {
+ return this.cassandraPoolTimeoutMillis;
+ }
+
public void setConfigPath(String configPath) {
this.configPath = configPath;
}
@@ -109,6 +154,46 @@ public void setRedisPoolMaxSize(int redisPoolMaxSize) {
public void setRedisPoolMaxIdle(int redisPoolMaxIdle) {
this.redisPoolMaxIdle = redisPoolMaxIdle;
}
+
+ public void setCassandraPoolCoreLocalConnections(int cassandraPoolCoreLocalConnections) {
+ this.cassandraPoolCoreLocalConnections = cassandraPoolCoreLocalConnections;
+ }
+
+ public void setCassandraPoolMaxLocalConnections(int cassandraPoolMaxLocalConnections) {
+ this.cassandraPoolMaxLocalConnections = cassandraPoolMaxLocalConnections;
+ }
+
+ public void setCassandraPoolCoreRemoteConnections(int cassandraPoolCoreRemoteConnections) {
+ this.cassandraPoolCoreRemoteConnections = cassandraPoolCoreRemoteConnections;
+ }
+
+ public void setCassandraPoolMaxRemoteConnections(int cassandraPoolMaxRemoteConnections) {
+ this.cassandraPoolMaxRemoteConnections = cassandraPoolMaxRemoteConnections;
+ }
+
+ public void setCassandraPoolMaxRequestsLocalConnection(
+ int cassandraPoolMaxRequestsLocalConnection) {
+ this.cassandraPoolMaxRequestsLocalConnection = cassandraPoolMaxRequestsLocalConnection;
+ }
+
+ public void setCassandraPoolMaxRequestsRemoteConnection(
+ int cassandraPoolMaxRequestsRemoteConnection) {
+ this.cassandraPoolMaxRequestsRemoteConnection = cassandraPoolMaxRequestsRemoteConnection;
+ }
+
+ public void setCassandraPoolNewLocalConnectionThreshold(
+ int cassandraPoolNewLocalConnectionThreshold) {
+ this.cassandraPoolNewLocalConnectionThreshold = cassandraPoolNewLocalConnectionThreshold;
+ }
+
+ public void setCassandraPoolNewRemoteConnectionThreshold(
+ int cassandraPoolNewRemoteConnectionThreshold) {
+ this.cassandraPoolNewRemoteConnectionThreshold = cassandraPoolNewRemoteConnectionThreshold;
+ }
+
+ public void setCassandraPoolTimeoutMillis(int cassandraPoolTimeoutMillis) {
+ this.cassandraPoolTimeoutMillis = cassandraPoolTimeoutMillis;
+ }
}
public static class JobProperties {
diff --git a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java
index 08b9655e3e1..81994941d52 100644
--- a/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java
+++ b/serving/src/main/java/feast/serving/configuration/ServingServiceConfig.java
@@ -16,6 +16,10 @@
*/
package feast.serving.configuration;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.Session;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.storage.Storage;
@@ -23,19 +27,26 @@
import feast.core.StoreProto.Store;
import feast.core.StoreProto.Store.BigQueryConfig;
import feast.core.StoreProto.Store.Builder;
+import feast.core.StoreProto.Store.CassandraConfig;
import feast.core.StoreProto.Store.RedisConfig;
import feast.core.StoreProto.Store.StoreType;
import feast.core.StoreProto.Store.Subscription;
import feast.serving.FeastProperties;
import feast.serving.FeastProperties.JobProperties;
+import feast.serving.FeastProperties.StoreProperties;
import feast.serving.service.BigQueryServingService;
import feast.serving.service.CachedSpecService;
+import feast.serving.service.CassandraServingService;
import feast.serving.service.JobService;
import feast.serving.service.NoopJobService;
import feast.serving.service.RedisServingService;
import feast.serving.service.ServingService;
import io.opentracing.Tracer;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -76,6 +87,13 @@ private Store setStoreConfig(Store.Builder builder, Map options)
.build();
return builder.setBigqueryConfig(bqConfig).build();
case CASSANDRA:
+ CassandraConfig cassandraConfig =
+ CassandraConfig.newBuilder()
+ .setBootstrapHosts(options.get("host"))
+ .setPort(Integer.parseInt(options.get("port")))
+ .setKeyspace(options.get("keyspace"))
+ .build();
+ return builder.setCassandraConfig(cassandraConfig).build();
default:
throw new IllegalArgumentException(
String.format(
@@ -135,6 +153,47 @@ public ServingService servingService(
storage);
break;
case CASSANDRA:
+ StoreProperties storeProperties = feastProperties.getStore();
+ PoolingOptions poolingOptions = new PoolingOptions();
+ poolingOptions.setCoreConnectionsPerHost(
+ HostDistance.LOCAL, storeProperties.getCassandraPoolCoreLocalConnections());
+ poolingOptions.setCoreConnectionsPerHost(
+ HostDistance.REMOTE, storeProperties.getCassandraPoolCoreRemoteConnections());
+ poolingOptions.setMaxConnectionsPerHost(
+ HostDistance.LOCAL, storeProperties.getCassandraPoolMaxLocalConnections());
+ poolingOptions.setMaxConnectionsPerHost(
+ HostDistance.REMOTE, storeProperties.getCassandraPoolMaxRemoteConnections());
+ poolingOptions.setMaxRequestsPerConnection(
+ HostDistance.LOCAL, storeProperties.getCassandraPoolMaxRequestsLocalConnection());
+ poolingOptions.setMaxRequestsPerConnection(
+ HostDistance.REMOTE, storeProperties.getCassandraPoolMaxRequestsRemoteConnection());
+ poolingOptions.setNewConnectionThreshold(
+ HostDistance.LOCAL, storeProperties.getCassandraPoolNewLocalConnectionThreshold());
+ poolingOptions.setNewConnectionThreshold(
+ HostDistance.REMOTE, storeProperties.getCassandraPoolNewRemoteConnectionThreshold());
+ poolingOptions.setPoolTimeoutMillis(storeProperties.getCassandraPoolTimeoutMillis());
+ CassandraConfig cassandraConfig = store.getCassandraConfig();
+ List contactPoints =
+ Arrays.stream(cassandraConfig.getBootstrapHosts().split(","))
+ .map(h -> new InetSocketAddress(h, cassandraConfig.getPort()))
+ .collect(Collectors.toList());
+ Cluster cluster =
+ Cluster.builder()
+ .addContactPointsWithPorts(contactPoints)
+ .withPoolingOptions(poolingOptions)
+ .build();
+ // Session in Cassandra is thread-safe and maintains connections to cluster nodes internally
+ // Recommended to use one session per keyspace instead of open and close connection for each
+ // request
+ Session session = cluster.connect();
+ servingService =
+ new CassandraServingService(
+ session,
+ cassandraConfig.getKeyspace(),
+ cassandraConfig.getTableName(),
+ specService,
+ tracer);
+ break;
case UNRECOGNIZED:
case INVALID:
throw new IllegalArgumentException(
diff --git a/serving/src/main/java/feast/serving/service/CassandraServingService.java b/serving/src/main/java/feast/serving/service/CassandraServingService.java
new file mode 100644
index 00000000000..e9a7dff8ac3
--- /dev/null
+++ b/serving/src/main/java/feast/serving/service/CassandraServingService.java
@@ -0,0 +1,153 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2019 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.serving.service;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Timestamp;
+import feast.serving.ServingAPIProto.FeatureSetRequest;
+import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow;
+import feast.serving.util.ValueUtil;
+import feast.types.FeatureRowProto.FeatureRow;
+import feast.types.FieldProto.Field;
+import feast.types.ValueProto.Value;
+import io.opentracing.Scope;
+import io.opentracing.Tracer;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class CassandraServingService extends OnlineServingService {
+
+ private final Session session;
+ private final String keyspace;
+ private final String tableName;
+ private final Tracer tracer;
+
+ public CassandraServingService(
+ Session session,
+ String keyspace,
+ String tableName,
+ CachedSpecService specService,
+ Tracer tracer) {
+ super(specService, tracer);
+ this.session = session;
+ this.keyspace = keyspace;
+ this.tableName = tableName;
+ this.tracer = tracer;
+ }
+
+ @Override
+ List createLookupKeys(
+ List featureSetEntityNames,
+ List entityRows,
+ FeatureSetRequest featureSetRequest) {
+ try (Scope scope = tracer.buildSpan("Cassandra-makeCassandraKeys").startActive(true)) {
+ String featureSetId =
+ String.format("%s:%s", featureSetRequest.getName(), featureSetRequest.getVersion());
+ return entityRows.stream()
+ .map(row -> createCassandraKey(featureSetId, featureSetEntityNames, row))
+ .collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ protected boolean isEmpty(ResultSet response) {
+ return response.isExhausted();
+ }
+
+ /**
+ * Send a list of get request as an mget
+ *
+ * @param keys list of string keys
+ * @return list of {@link FeatureRow} in primitive byte representation for each key
+ */
+ @Override
+ protected List getAll(List keys) {
+ List results = new ArrayList<>();
+ for (String key : keys) {
+ results.add(
+ session.execute(
+ QueryBuilder.select()
+ .column("entities")
+ .column("feature")
+ .column("value")
+ .writeTime("value")
+ .as("writetime")
+ .from(keyspace, tableName)
+ .where(QueryBuilder.eq("entities", key))));
+ }
+ return results;
+ }
+
+ @Override
+ FeatureRow parseResponse(ResultSet resultSet) {
+ List fields = new ArrayList<>();
+ Instant instant = Instant.now();
+ while (!resultSet.isExhausted()) {
+ Row row = resultSet.one();
+ long microSeconds = row.getLong("writetime");
+ instant =
+ Instant.ofEpochSecond(
+ TimeUnit.MICROSECONDS.toSeconds(microSeconds),
+ TimeUnit.MICROSECONDS.toNanos(
+ Math.floorMod(microSeconds, TimeUnit.SECONDS.toMicros(1))));
+ try {
+ fields.add(
+ Field.newBuilder()
+ .setName(row.getString("feature"))
+ .setValue(Value.parseFrom(ByteBuffer.wrap(row.getBytes("value").array())))
+ .build());
+ } catch (InvalidProtocolBufferException e) {
+ e.printStackTrace();
+ }
+ }
+ return FeatureRow.newBuilder()
+ .addAllFields(fields)
+ .setEventTimestamp(
+ Timestamp.newBuilder()
+ .setSeconds(instant.getEpochSecond())
+ .setNanos(instant.getNano())
+ .build())
+ .build();
+ }
+
+ /**
+ * Create cassandra keys
+ *
+ * @param featureSet featureSet reference of the feature. E.g. feature_set_1:1
+ * @param featureSetEntityNames entity names that belong to the featureSet
+ * @param entityRow entityRow to build the key from
+ * @return String
+ */
+ private static String createCassandraKey(
+ String featureSet, List featureSetEntityNames, EntityRow entityRow) {
+ Map fieldsMap = entityRow.getFieldsMap();
+ List res = new ArrayList<>();
+ for (String entityName : featureSetEntityNames) {
+ res.add(entityName + "=" + ValueUtil.toString(fieldsMap.get(entityName)));
+ }
+ return featureSet + ":" + String.join("|", res);
+ }
+}
diff --git a/serving/src/main/java/feast/serving/service/OnlineServingService.java b/serving/src/main/java/feast/serving/service/OnlineServingService.java
new file mode 100644
index 00000000000..699d48c1214
--- /dev/null
+++ b/serving/src/main/java/feast/serving/service/OnlineServingService.java
@@ -0,0 +1,236 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2019 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.serving.service;
+
+import static feast.serving.util.Metrics.missingKeyCount;
+import static feast.serving.util.Metrics.requestCount;
+import static feast.serving.util.Metrics.requestLatency;
+import static feast.serving.util.Metrics.staleKeyCount;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.Duration;
+import com.google.protobuf.InvalidProtocolBufferException;
+import feast.core.FeatureSetProto.EntitySpec;
+import feast.core.FeatureSetProto.FeatureSetSpec;
+import feast.serving.ServingAPIProto.FeastServingType;
+import feast.serving.ServingAPIProto.FeatureSetRequest;
+import feast.serving.ServingAPIProto.GetBatchFeaturesRequest;
+import feast.serving.ServingAPIProto.GetBatchFeaturesResponse;
+import feast.serving.ServingAPIProto.GetFeastServingInfoRequest;
+import feast.serving.ServingAPIProto.GetFeastServingInfoResponse;
+import feast.serving.ServingAPIProto.GetJobRequest;
+import feast.serving.ServingAPIProto.GetJobResponse;
+import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest;
+import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow;
+import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse;
+import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues;
+import feast.types.FeatureRowProto.FeatureRow;
+import feast.types.ValueProto.Value;
+import io.grpc.Status;
+import io.opentracing.Scope;
+import io.opentracing.Tracer;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+abstract class OnlineServingService implements ServingService {
+
+ private final CachedSpecService specService;
+ private final Tracer tracer;
+
+ OnlineServingService(CachedSpecService specService, Tracer tracer) {
+ this.specService = specService;
+ this.tracer = tracer;
+ }
+
+ @Override
+ public GetFeastServingInfoResponse getFeastServingInfo(
+ GetFeastServingInfoRequest getFeastServingInfoRequest) {
+ return GetFeastServingInfoResponse.newBuilder()
+ .setType(FeastServingType.FEAST_SERVING_TYPE_ONLINE)
+ .build();
+ }
+
+ @Override
+ public GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeaturesRequest) {
+ throw Status.UNIMPLEMENTED.withDescription("Method not implemented").asRuntimeException();
+ }
+
+ @Override
+ public GetJobResponse getJob(GetJobRequest getJobRequest) {
+ throw Status.UNIMPLEMENTED.withDescription("Method not implemented").asRuntimeException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest request) {
+ try (Scope scope =
+ tracer.buildSpan("OnlineServingService-getOnlineFeatures").startActive(true)) {
+ long startTime = System.currentTimeMillis();
+ GetOnlineFeaturesResponse.Builder getOnlineFeaturesResponseBuilder =
+ GetOnlineFeaturesResponse.newBuilder();
+
+ List entityRows = request.getEntityRowsList();
+ Map> featureValuesMap =
+ entityRows.stream()
+ .collect(Collectors.toMap(er -> er, er -> Maps.newHashMap(er.getFieldsMap())));
+
+ List featureSetRequests = request.getFeatureSetsList();
+ for (FeatureSetRequest featureSetRequest : featureSetRequests) {
+
+ FeatureSetSpec featureSetSpec =
+ specService.getFeatureSet(featureSetRequest.getName(), featureSetRequest.getVersion());
+
+ List featureSetEntityNames =
+ featureSetSpec.getEntitiesList().stream()
+ .map(EntitySpec::getName)
+ .collect(Collectors.toList());
+
+ Duration defaultMaxAge = featureSetSpec.getMaxAge();
+ if (featureSetRequest.getMaxAge().equals(Duration.getDefaultInstance())) {
+ featureSetRequest = featureSetRequest.toBuilder().setMaxAge(defaultMaxAge).build();
+ }
+
+ sendAndProcessMultiGet(
+ createLookupKeys(featureSetEntityNames, entityRows, featureSetRequest),
+ entityRows,
+ featureValuesMap,
+ featureSetRequest);
+ }
+ List fieldValues =
+ featureValuesMap.values().stream()
+ .map(m -> FieldValues.newBuilder().putAllFields(m).build())
+ .collect(Collectors.toList());
+ requestLatency.labels("getOnlineFeatures").observe(System.currentTimeMillis() - startTime);
+ return getOnlineFeaturesResponseBuilder.addAllFieldValues(fieldValues).build();
+ }
+ }
+
+ /**
+ * Create lookup keys for corresponding data stores
+ *
+ * @param featureSetEntityNames list of entity names
+ * @param entityRows list of {@link EntityRow}
+ * @param featureSetRequest {@link FeatureSetRequest}
+ * @return list of {@link LookupKeyType}
+ */
+ abstract List createLookupKeys(
+ List featureSetEntityNames,
+ List entityRows,
+ FeatureSetRequest featureSetRequest);
+
+ /**
+ * Checks whether the response is empty, i.e. feature does not exist in the store
+ *
+ * @param response {@link ResponseType}
+ * @return boolean
+ */
+ protected abstract boolean isEmpty(ResponseType response);
+
+ /**
+ * Send a list of get requests
+ *
+ * @param keys list of {@link LookupKeyType}
+ * @return list of {@link ResponseType}
+ */
+ protected abstract List getAll(List keys);
+
+ /**
+ * Parse response from data store to FeatureRow
+ *
+ * @param response {@link ResponseType}
+ * @return {@link FeatureRow}
+ */
+ abstract FeatureRow parseResponse(ResponseType response) throws InvalidProtocolBufferException;
+
+ private List getResponses(List keys) {
+ try (Scope scope = tracer.buildSpan("OnlineServingService-sendMultiGet").startActive(true)) {
+ long startTime = System.currentTimeMillis();
+ try {
+ return getAll(keys);
+ } catch (Exception e) {
+ throw Status.NOT_FOUND
+ .withDescription("Unable to retrieve feature from online store")
+ .withCause(e)
+ .asRuntimeException();
+ } finally {
+ requestLatency.labels("sendMultiGet").observe(System.currentTimeMillis() - startTime);
+ }
+ }
+ }
+
+ private void sendAndProcessMultiGet(
+ List keys,
+ List entityRows,
+ Map> featureValuesMap,
+ FeatureSetRequest featureSetRequest) {
+ List responses = getResponses(keys);
+
+ long startTime = System.currentTimeMillis();
+ try (Scope scope = tracer.buildSpan("OnlineServingService-processResponse").startActive(true)) {
+ String featureSetId =
+ String.format("%s:%d", featureSetRequest.getName(), featureSetRequest.getVersion());
+ Map nullValues =
+ featureSetRequest.getFeatureNamesList().stream()
+ .collect(
+ Collectors.toMap(
+ name -> featureSetId + ":" + name, name -> Value.newBuilder().build()));
+ for (int i = 0; i < responses.size(); i++) {
+ EntityRow entityRow = entityRows.get(i);
+ Map featureValues = featureValuesMap.get(entityRow);
+ try {
+ ResponseType response = responses.get(i);
+ if (isEmpty(response)) {
+ missingKeyCount.labels(featureSetRequest.getName()).inc();
+ featureValues.putAll(nullValues);
+ continue;
+ }
+
+ FeatureRow featureRow = parseResponse(response);
+ boolean stale = isStale(featureSetRequest, entityRow, featureRow);
+ if (stale) {
+ staleKeyCount.labels(featureSetRequest.getName()).inc();
+ featureValues.putAll(nullValues);
+ continue;
+ }
+
+ requestCount.labels(featureSetRequest.getName()).inc();
+ featureRow.getFieldsList().stream()
+ .filter(f -> featureSetRequest.getFeatureNamesList().contains(f.getName()))
+ .forEach(f -> featureValues.put(featureSetId + ":" + f.getName(), f.getValue()));
+ } catch (InvalidProtocolBufferException e) {
+ e.printStackTrace();
+ }
+ }
+ } finally {
+ requestLatency.labels("processResponse").observe(System.currentTimeMillis() - startTime);
+ }
+ }
+
+ private static boolean isStale(
+ FeatureSetRequest featureSetRequest, EntityRow entityRow, FeatureRow featureRow) {
+ if (featureSetRequest.getMaxAge().equals(Duration.getDefaultInstance())) {
+ return false;
+ }
+ long givenTimestamp = entityRow.getEntityTimestamp().getSeconds();
+ if (givenTimestamp == 0) {
+ givenTimestamp = System.currentTimeMillis() / 1000;
+ }
+ long timeDifference = givenTimestamp - featureRow.getEventTimestamp().getSeconds();
+ return timeDifference > featureSetRequest.getMaxAge().getSeconds();
+ }
+}
diff --git a/serving/src/main/java/feast/serving/service/RedisServingService.java b/serving/src/main/java/feast/serving/service/RedisServingService.java
index 9eaeb17dea3..df97c25820f 100644
--- a/serving/src/main/java/feast/serving/service/RedisServingService.java
+++ b/serving/src/main/java/feast/serving/service/RedisServingService.java
@@ -16,29 +16,10 @@
*/
package feast.serving.service;
-import static feast.serving.util.Metrics.missingKeyCount;
-import static feast.serving.util.Metrics.requestCount;
-import static feast.serving.util.Metrics.requestLatency;
-import static feast.serving.util.Metrics.staleKeyCount;
-
-import com.google.common.collect.Maps;
import com.google.protobuf.AbstractMessageLite;
-import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
-import feast.core.FeatureSetProto.EntitySpec;
-import feast.core.FeatureSetProto.FeatureSetSpec;
-import feast.serving.ServingAPIProto.FeastServingType;
import feast.serving.ServingAPIProto.FeatureSetRequest;
-import feast.serving.ServingAPIProto.GetBatchFeaturesRequest;
-import feast.serving.ServingAPIProto.GetBatchFeaturesResponse;
-import feast.serving.ServingAPIProto.GetFeastServingInfoRequest;
-import feast.serving.ServingAPIProto.GetFeastServingInfoResponse;
-import feast.serving.ServingAPIProto.GetJobRequest;
-import feast.serving.ServingAPIProto.GetJobResponse;
-import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest;
import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow;
-import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse;
-import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues;
import feast.storage.RedisProto.RedisKey;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FieldProto.Field;
@@ -53,88 +34,18 @@
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
-public class RedisServingService implements ServingService {
+public class RedisServingService extends OnlineServingService {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedisServingService.class);
private final JedisPool jedisPool;
- private final CachedSpecService specService;
private final Tracer tracer;
public RedisServingService(JedisPool jedisPool, CachedSpecService specService, Tracer tracer) {
+ super(specService, tracer);
this.jedisPool = jedisPool;
- this.specService = specService;
this.tracer = tracer;
}
- /** {@inheritDoc} */
- @Override
- public GetFeastServingInfoResponse getFeastServingInfo(
- GetFeastServingInfoRequest getFeastServingInfoRequest) {
- return GetFeastServingInfoResponse.newBuilder()
- .setType(FeastServingType.FEAST_SERVING_TYPE_ONLINE)
- .build();
- }
-
- /** {@inheritDoc} */
- @Override
- public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest request) {
- try (Scope scope = tracer.buildSpan("Redis-getOnlineFeatures").startActive(true)) {
- long startTime = System.currentTimeMillis();
- GetOnlineFeaturesResponse.Builder getOnlineFeaturesResponseBuilder =
- GetOnlineFeaturesResponse.newBuilder();
-
- List entityRows = request.getEntityRowsList();
- Map> featureValuesMap =
- entityRows.stream()
- .collect(Collectors.toMap(er -> er, er -> Maps.newHashMap(er.getFieldsMap())));
-
- List featureSetRequests = request.getFeatureSetsList();
- for (FeatureSetRequest featureSetRequest : featureSetRequests) {
-
- FeatureSetSpec featureSetSpec =
- specService.getFeatureSet(featureSetRequest.getName(), featureSetRequest.getVersion());
-
- List featureSetEntityNames =
- featureSetSpec.getEntitiesList().stream()
- .map(EntitySpec::getName)
- .collect(Collectors.toList());
-
- Duration defaultMaxAge = featureSetSpec.getMaxAge();
- if (featureSetRequest.getMaxAge().equals(Duration.getDefaultInstance())) {
- featureSetRequest = featureSetRequest.toBuilder().setMaxAge(defaultMaxAge).build();
- }
-
- List redisKeys =
- getRedisKeys(featureSetEntityNames, entityRows, featureSetRequest);
-
- try {
- sendAndProcessMultiGet(redisKeys, entityRows, featureValuesMap, featureSetRequest);
- } catch (InvalidProtocolBufferException e) {
- throw Status.INTERNAL
- .withDescription("Unable to parse protobuf while retrieving feature")
- .withCause(e)
- .asRuntimeException();
- }
- }
- List fieldValues =
- featureValuesMap.values().stream()
- .map(m -> FieldValues.newBuilder().putAllFields(m).build())
- .collect(Collectors.toList());
- requestLatency.labels("getOnlineFeatures").observe(System.currentTimeMillis() - startTime);
- return getOnlineFeaturesResponseBuilder.addAllFieldValues(fieldValues).build();
- }
- }
-
- @Override
- public GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeaturesRequest) {
- throw Status.UNIMPLEMENTED.withDescription("Method not implemented").asRuntimeException();
- }
-
- @Override
- public GetJobResponse getJob(GetJobRequest getJobRequest) {
- throw Status.UNIMPLEMENTED.withDescription("Method not implemented").asRuntimeException();
- }
-
/**
* Build the redis keys for retrieval from the store.
*
@@ -143,7 +54,8 @@ public GetJobResponse getJob(GetJobRequest getJobRequest) {
* @param featureSetRequest details of the requested featureSet
* @return list of RedisKeys
*/
- private List getRedisKeys(
+ @Override
+ List createLookupKeys(
List featureSetEntityNames,
List entityRows,
FeatureSetRequest featureSetRequest) {
@@ -158,6 +70,33 @@ private List getRedisKeys(
}
}
+ @Override
+ protected boolean isEmpty(byte[] response) {
+ return response == null;
+ }
+
+ /**
+ * Send a list of get request as an mget
+ *
+ * @param keys list of {@link RedisKey}
+ * @return list of {@link FeatureRow} in primitive byte representation for each {@link RedisKey}
+ */
+ @Override
+ protected List getAll(List keys) {
+ Jedis jedis = jedisPool.getResource();
+ byte[][] binaryKeys =
+ keys.stream()
+ .map(AbstractMessageLite::toByteArray)
+ .collect(Collectors.toList())
+ .toArray(new byte[0][0]);
+ return jedis.mget(binaryKeys);
+ }
+
+ @Override
+ FeatureRow parseResponse(byte[] response) throws InvalidProtocolBufferException {
+ return FeatureRow.parseFrom(response);
+ }
+
/**
* Create {@link RedisKey}
*
@@ -188,93 +127,4 @@ private RedisKey makeRedisKey(
}
return builder.build();
}
-
- private void sendAndProcessMultiGet(
- List redisKeys,
- List entityRows,
- Map> featureValuesMap,
- FeatureSetRequest featureSetRequest)
- throws InvalidProtocolBufferException {
-
- List jedisResps = sendMultiGet(redisKeys);
- long startTime = System.currentTimeMillis();
- try (Scope scope = tracer.buildSpan("Redis-processResponse").startActive(true)) {
- String featureSetId =
- String.format("%s:%d", featureSetRequest.getName(), featureSetRequest.getVersion());
-
- Map nullValues =
- featureSetRequest.getFeatureNamesList().stream()
- .collect(
- Collectors.toMap(
- name -> featureSetId + ":" + name, name -> Value.newBuilder().build()));
-
- for (int i = 0; i < jedisResps.size(); i++) {
- EntityRow entityRow = entityRows.get(i);
- Map featureValues = featureValuesMap.get(entityRow);
-
- byte[] jedisResponse = jedisResps.get(i);
- if (jedisResponse == null) {
- missingKeyCount.labels(featureSetRequest.getName()).inc();
- featureValues.putAll(nullValues);
- continue;
- }
-
- FeatureRow featureRow = FeatureRow.parseFrom(jedisResponse);
-
- boolean stale = isStale(featureSetRequest, entityRow, featureRow);
- if (stale) {
- staleKeyCount.labels(featureSetRequest.getName()).inc();
- featureValues.putAll(nullValues);
- continue;
- }
-
- requestCount.labels(featureSetRequest.getName()).inc();
- featureRow.getFieldsList().stream()
- .filter(f -> featureSetRequest.getFeatureNamesList().contains(f.getName()))
- .forEach(f -> featureValues.put(featureSetId + ":" + f.getName(), f.getValue()));
- }
- } finally {
- requestLatency.labels("processResponse").observe(System.currentTimeMillis() - startTime);
- }
- }
-
- private boolean isStale(
- FeatureSetRequest featureSetRequest, EntityRow entityRow, FeatureRow featureRow) {
- if (featureSetRequest.getMaxAge().equals(Duration.getDefaultInstance())) {
- return false;
- }
- long givenTimestamp = entityRow.getEntityTimestamp().getSeconds();
- if (givenTimestamp == 0) {
- givenTimestamp = System.currentTimeMillis() / 1000;
- }
- long timeDifference = givenTimestamp - featureRow.getEventTimestamp().getSeconds();
- return timeDifference > featureSetRequest.getMaxAge().getSeconds();
- }
-
- /**
- * Send a list of get request as an mget
- *
- * @param keys list of {@link RedisKey}
- * @return list of {@link FeatureRow} in primitive byte representation for each {@link RedisKey}
- */
- private List sendMultiGet(List keys) {
- try (Scope scope = tracer.buildSpan("Redis-sendMultiGet").startActive(true)) {
- long startTime = System.currentTimeMillis();
- try (Jedis jedis = jedisPool.getResource()) {
- byte[][] binaryKeys =
- keys.stream()
- .map(AbstractMessageLite::toByteArray)
- .collect(Collectors.toList())
- .toArray(new byte[0][0]);
- return jedis.mget(binaryKeys);
- } catch (Exception e) {
- throw Status.NOT_FOUND
- .withDescription("Unable to retrieve feature from Redis")
- .withCause(e)
- .asRuntimeException();
- } finally {
- requestLatency.labels("sendMultiGet").observe(System.currentTimeMillis() - startTime);
- }
- }
- }
}
diff --git a/serving/src/main/java/feast/serving/util/ValueUtil.java b/serving/src/main/java/feast/serving/util/ValueUtil.java
new file mode 100644
index 00000000000..e3ede6af984
--- /dev/null
+++ b/serving/src/main/java/feast/serving/util/ValueUtil.java
@@ -0,0 +1,53 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2019 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.serving.util;
+
+import feast.types.ValueProto.Value;
+
+public class ValueUtil {
+
+ public static String toString(Value value) {
+ String strValue;
+ switch (value.getValCase()) {
+ case BYTES_VAL:
+ strValue = value.getBytesVal().toString();
+ break;
+ case STRING_VAL:
+ strValue = value.getStringVal();
+ break;
+ case INT32_VAL:
+ strValue = String.valueOf(value.getInt32Val());
+ break;
+ case INT64_VAL:
+ strValue = String.valueOf(value.getInt64Val());
+ break;
+ case DOUBLE_VAL:
+ strValue = String.valueOf(value.getDoubleVal());
+ break;
+ case FLOAT_VAL:
+ strValue = String.valueOf(value.getFloatVal());
+ break;
+ case BOOL_VAL:
+ strValue = String.valueOf(value.getBoolVal());
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("toString method not supported for type %s", value.getValCase()));
+ }
+ return strValue;
+ }
+}
diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml
index 2daa83fbfb2..e18f49207b0 100644
--- a/serving/src/main/resources/application.yml
+++ b/serving/src/main/resources/application.yml
@@ -1,7 +1,7 @@
feast:
# This value is retrieved from project.version properties in pom.xml
# https://docs.spring.io/spring-boot/docs/current/reference/html/
- version: @project.version@
+# version: @project.version@
# GRPC service address for Feast Core
# Feast Serving requires connection to Feast Core to retrieve and reload Feast metadata (e.g. FeatureSpecs, Store information)
core-host: ${FEAST_CORE_HOST:localhost}
@@ -24,6 +24,24 @@ feast:
redis-pool-max-size: ${FEAST_REDIS_POOL_MAX_SIZE:128}
# If serving redis, the redis pool max idle conns
redis-pool-max-idle: ${FEAST_REDIS_POOL_MAX_IDLE:16}
+ # If serving cassandra, minimum connection for local host (one in same data center)
+ cassandra-pool-core-local-connections: ${FEAST_CASSANDRA_CORE_LOCAL_CONNECTIONS:1}
+ # If serving cassandra, maximum connection for local host (one in same data center)
+ cassandra-pool-max-local-connections: ${FEAST_CASSANDRA_MAX_LOCAL_CONNECTIONS:1}
+ # If serving cassandra, minimum connection for remote host (one in remote data center)
+ cassandra-pool-core-remote-connections: ${FEAST_CASSANDRA_CORE_REMOTE_CONNECTIONS:1}
+ # If serving cassandra, maximum connection for remote host (one in same data center)
+ cassandra-pool-max-remote-connections: ${FEAST_CASSANDRA_MAX_REMOTE_CONNECTIONS:1}
+ # If serving cassandra, maximum number of concurrent requests per local connection (one in same data center)
+ cassandra-pool-max-requests-local-connection: ${FEAST_CASSANDRA_MAX_REQUESTS_LOCAL_CONNECTION:32768}
+ # If serving cassandra, maximum number of concurrent requests per remote connection (one in remote data center)
+ cassandra-pool-max-requests-remote-connection: ${FEAST_CASSANDRA_MAX_REQUESTS_REMOTE_CONNECTION:2048}
+ # If serving cassandra, number of requests which trigger opening of new local connection (if it is available)
+ cassandra-pool-new-local-connection-threshold: ${FEAST_CASSANDRA_NEW_LOCAL_CONNECTION_THRESHOLD:30000}
+ # If serving cassandra, number of requests which trigger opening of new remote connection (if it is available)
+ cassandra-pool-new-remote-connection-threshold: ${FEAST_CASSANDRA_NEW_REMOTE_CONNECTION_THRESHOLD:400}
+ # If serving cassandra, number of milliseconds to wait to acquire connection (after that go to next available host in query plan)
+ cassandra-pool-timeout-millis: ${FEAST_CASSANDRA_POOL_TIMEOUT_MILLIS:0}
jobs:
# job-staging-location specifies the URI to store intermediate files for batch serving.
diff --git a/serving/src/test/java/feast/serving/service/CassandraServingServiceITTest.java b/serving/src/test/java/feast/serving/service/CassandraServingServiceITTest.java
new file mode 100644
index 00000000000..a1778251a3d
--- /dev/null
+++ b/serving/src/test/java/feast/serving/service/CassandraServingServiceITTest.java
@@ -0,0 +1,244 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2019 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.serving.service;
+
+import static feast.serving.test.TestUtil.intValue;
+import static feast.serving.test.TestUtil.responseToMapList;
+import static feast.serving.test.TestUtil.strValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.utils.Bytes;
+import com.google.common.collect.Lists;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Timestamp;
+import feast.core.FeatureSetProto.EntitySpec;
+import feast.core.FeatureSetProto.FeatureSetSpec;
+import feast.serving.ServingAPIProto.FeatureSetRequest;
+import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest;
+import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow;
+import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse;
+import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues;
+import feast.serving.test.TestUtil.LocalCassandra;
+import feast.types.FeatureRowProto.FeatureRow;
+import feast.types.ValueProto.Value;
+import io.opentracing.Tracer;
+import io.opentracing.Tracer.SpanBuilder;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+public class CassandraServingServiceITTest {
+
+ @Mock CachedSpecService specService;
+
+ @Mock Tracer tracer;
+
+ private CassandraServingService cassandraServingService;
+ private Session session;
+
+ @BeforeClass
+ public static void startServer() throws InterruptedException, IOException, TTransportException {
+ LocalCassandra.start();
+ LocalCassandra.createKeyspaceAndTable();
+ }
+
+ @Before
+ public void setup() {
+ initMocks(this);
+ FeatureSetSpec featureSetSpec =
+ FeatureSetSpec.newBuilder()
+ .addEntities(EntitySpec.newBuilder().setName("entity1"))
+ .addEntities(EntitySpec.newBuilder().setName("entity2"))
+ .build();
+
+ when(specService.getFeatureSet("featureSet", 1)).thenReturn(featureSetSpec);
+ when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class));
+
+ session =
+ new Cluster.Builder()
+ .addContactPoints(LocalCassandra.getHost())
+ .withPort(LocalCassandra.getPort())
+ .build()
+ .connect();
+
+ populateTable(session);
+
+ cassandraServingService =
+ new CassandraServingService(session, "test", "feature_store", specService, tracer);
+ }
+
+ private void populateTable(Session session) {
+ session.execute(
+ insertQuery(
+ "test", "feature_store", "featureSet:1:entity1=1|entity2=a", "feature1", intValue(1)));
+ session.execute(
+ insertQuery(
+ "test", "feature_store", "featureSet:1:entity1=1|entity2=a", "feature2", intValue(1)));
+ session.execute(
+ insertQuery(
+ "test", "feature_store", "featureSet:1:entity1=2|entity2=b", "feature1", intValue(1)));
+ session.execute(
+ insertQuery(
+ "test", "feature_store", "featureSet:1:entity1=2|entity2=b", "feature2", intValue(1)));
+ }
+
+ @AfterClass
+ public static void cleanUp() {
+ LocalCassandra.stop();
+ }
+
+ @Test
+ public void shouldReturnResponseWithValuesIfKeysPresent() {
+ GetOnlineFeaturesRequest request =
+ GetOnlineFeaturesRequest.newBuilder()
+ .addFeatureSets(
+ FeatureSetRequest.newBuilder()
+ .setName("featureSet")
+ .setVersion(1)
+ .addAllFeatureNames(Lists.newArrayList("feature1", "feature2"))
+ .build())
+ .addEntityRows(
+ EntityRow.newBuilder()
+ .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100))
+ .putFields("entity1", intValue(1))
+ .putFields("entity2", strValue("a")))
+ .addEntityRows(
+ EntityRow.newBuilder()
+ .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100))
+ .putFields("entity1", intValue(2))
+ .putFields("entity2", strValue("b")))
+ .build();
+
+ GetOnlineFeaturesResponse expected =
+ GetOnlineFeaturesResponse.newBuilder()
+ .addFieldValues(
+ FieldValues.newBuilder()
+ .putFields("entity1", intValue(1))
+ .putFields("entity2", strValue("a"))
+ .putFields("featureSet:1:feature1", intValue(1))
+ .putFields("featureSet:1:feature2", intValue(1)))
+ .addFieldValues(
+ FieldValues.newBuilder()
+ .putFields("entity1", intValue(2))
+ .putFields("entity2", strValue("b"))
+ .putFields("featureSet:1:feature1", intValue(1))
+ .putFields("featureSet:1:feature2", intValue(1)))
+ .build();
+ GetOnlineFeaturesResponse actual = cassandraServingService.getOnlineFeatures(request);
+
+ assertThat(
+ responseToMapList(actual), containsInAnyOrder(responseToMapList(expected).toArray()));
+ }
+
+ @Test
+ public void shouldReturnResponseWithUnsetValuesIfKeysNotPresent() {
+ GetOnlineFeaturesRequest request =
+ GetOnlineFeaturesRequest.newBuilder()
+ .addFeatureSets(
+ FeatureSetRequest.newBuilder()
+ .setName("featureSet")
+ .setVersion(1)
+ .addAllFeatureNames(Lists.newArrayList("feature1", "feature2"))
+ .build())
+ .addEntityRows(
+ EntityRow.newBuilder()
+ .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100))
+ .putFields("entity1", intValue(1))
+ .putFields("entity2", strValue("a")))
+ // Non-existing entity keys
+ .addEntityRows(
+ EntityRow.newBuilder()
+ .setEntityTimestamp(Timestamp.newBuilder().setSeconds(100))
+ .putFields("entity1", intValue(55))
+ .putFields("entity2", strValue("ff")))
+ .build();
+
+ GetOnlineFeaturesResponse expected =
+ GetOnlineFeaturesResponse.newBuilder()
+ .addFieldValues(
+ FieldValues.newBuilder()
+ .putFields("entity1", intValue(1))
+ .putFields("entity2", strValue("a"))
+ .putFields("featureSet:1:feature1", intValue(1))
+ .putFields("featureSet:1:feature2", intValue(1)))
+ // Missing keys will return empty values
+ .addFieldValues(
+ FieldValues.newBuilder()
+ .putFields("entity1", intValue(55))
+ .putFields("entity2", strValue("ff"))
+ .putFields("featureSet:1:feature1", Value.newBuilder().build())
+ .putFields("featureSet:1:feature2", Value.newBuilder().build()))
+ .build();
+ GetOnlineFeaturesResponse actual = cassandraServingService.getOnlineFeatures(request);
+
+ assertThat(
+ responseToMapList(actual), containsInAnyOrder(responseToMapList(expected).toArray()));
+ }
+
+ // This test should fail if cassandra no longer stores write time as microseconds or if we change
+ // the way we parse microseconds to com.google.protobuf.Timestamp
+ @Test
+ public void shouldInsertAndParseWriteTimestampInMicroSeconds()
+ throws InvalidProtocolBufferException {
+ session.execute(
+ "INSERT INTO test.feature_store (entities, feature, value)\n"
+ + " VALUES ('ENT1', 'FEAT1',"
+ + Bytes.toHexString(Value.newBuilder().build().toByteArray())
+ + ")\n"
+ + " USING TIMESTAMP 1574318287123456;");
+
+ ResultSet resultSet =
+ session.execute(
+ QueryBuilder.select()
+ .column("entities")
+ .column("feature")
+ .column("value")
+ .writeTime("value")
+ .as("writetime")
+ .from("test", "feature_store")
+ .where(QueryBuilder.eq("entities", "ENT1")));
+ FeatureRow featureRow = cassandraServingService.parseResponse(resultSet);
+
+ Assert.assertEquals(
+ Timestamp.newBuilder().setSeconds(1574318287).setNanos(123456000).build(),
+ featureRow.getEventTimestamp());
+ }
+
+ private Insert insertQuery(
+ String database, String table, String key, String featureName, Value value) {
+ return QueryBuilder.insertInto(database, table)
+ .value("entities", key)
+ .value("feature", featureName)
+ .value("value", ByteBuffer.wrap(value.toByteArray()));
+ }
+}
diff --git a/serving/src/test/java/feast/serving/service/CassandraServingServiceTest.java b/serving/src/test/java/feast/serving/service/CassandraServingServiceTest.java
new file mode 100644
index 00000000000..f965b14a640
--- /dev/null
+++ b/serving/src/test/java/feast/serving/service/CassandraServingServiceTest.java
@@ -0,0 +1,117 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2019 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.serving.service;
+
+import static feast.serving.test.TestUtil.intValue;
+import static feast.serving.test.TestUtil.strValue;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.datastax.driver.core.Session;
+import feast.serving.ServingAPIProto.FeatureSetRequest;
+import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow;
+import io.opentracing.Tracer;
+import io.opentracing.Tracer.SpanBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+public class CassandraServingServiceTest {
+
+ @Mock Session session;
+
+ @Mock CachedSpecService specService;
+
+ @Mock Tracer tracer;
+
+ private CassandraServingService cassandraServingService;
+
+ @Before
+ public void setUp() {
+ initMocks(this);
+
+ when(tracer.buildSpan(ArgumentMatchers.any())).thenReturn(Mockito.mock(SpanBuilder.class));
+
+ cassandraServingService =
+ new CassandraServingService(session, "test", "feature_store", specService, tracer);
+ }
+
+ @Test
+ public void shouldConstructCassandraKeyCorrectly() {
+ List cassandraKeys =
+ cassandraServingService.createLookupKeys(
+ new ArrayList() {
+ {
+ add("entity1");
+ add("entity2");
+ }
+ },
+ new ArrayList() {
+ {
+ add(
+ EntityRow.newBuilder()
+ .putFields("entity1", intValue(1))
+ .putFields("entity2", strValue("a"))
+ .build());
+ add(
+ EntityRow.newBuilder()
+ .putFields("entity1", intValue(2))
+ .putFields("entity2", strValue("b"))
+ .build());
+ }
+ },
+ FeatureSetRequest.newBuilder().setName("featureSet").setVersion(1).build());
+
+ List expectedKeys =
+ new ArrayList() {
+ {
+ add("featureSet:1:entity1=1|entity2=a");
+ add("featureSet:1:entity1=2|entity2=b");
+ }
+ };
+
+ Assert.assertEquals(expectedKeys, cassandraKeys);
+ }
+
+ @Test(expected = Exception.class)
+ public void shouldThrowExceptionWhenCannotConstructCassandraKey() {
+ List cassandraKeys =
+ cassandraServingService.createLookupKeys(
+ new ArrayList() {
+ {
+ add("entity1");
+ add("entity2");
+ }
+ },
+ new ArrayList() {
+ {
+ add(EntityRow.newBuilder().putFields("entity1", intValue(1)).build());
+ add(
+ EntityRow.newBuilder()
+ .putFields("entity1", intValue(2))
+ .putFields("entity2", strValue("b"))
+ .build());
+ }
+ },
+ FeatureSetRequest.newBuilder().setName("featureSet").setVersion(1).build());
+ }
+}
diff --git a/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java b/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java
index 890699db6d1..dd448fdf2b2 100644
--- a/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java
+++ b/serving/src/test/java/feast/serving/service/RedisServingServiceTest.java
@@ -16,6 +16,9 @@
*/
package feast.serving.service;
+import static feast.serving.test.TestUtil.intValue;
+import static feast.serving.test.TestUtil.responseToMapList;
+import static feast.serving.test.TestUtil.strValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.mockito.Mockito.when;
@@ -39,7 +42,6 @@
import io.opentracing.Tracer;
import io.opentracing.Tracer.SpanBuilder;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
@@ -525,20 +527,6 @@ public void shouldFilterOutUndesiredRows() {
responseToMapList(actual), containsInAnyOrder(responseToMapList(expected).toArray()));
}
- private List