Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 62 additions & 24 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.SourceProto.Source;
import feast.core.StoreProto.Store;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.transform.ReadFromSource;
import feast.ingestion.transform.ValidateFeatureRows;
import feast.ingestion.transform.WriteFailedElementToBigQuery;
import feast.ingestion.transform.WriteToStore;
import feast.ingestion.transform.metrics.WriteMetricsTransform;
Expand All @@ -14,20 +16,26 @@
import feast.ingestion.values.FailedElement;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;

public class ImportJob {

// Tag for main output containing Feature Row that has been successfully processed.
private static final TupleTag<FeatureRow> FEATURE_ROW_OUT = new TupleTag<FeatureRow>() {};
private static final TupleTag<FeatureRow> FEATURE_ROW_OUT = new TupleTag<FeatureRow>() {
};

// Tag for deadletter output containing elements and error messages from invalid input/transform.
private static final TupleTag<FailedElement> DEADLETTER_OUT = new TupleTag<FailedElement>() {};
private static final TupleTag<FailedElement> DEADLETTER_OUT = new TupleTag<FailedElement>() {
};
private static final Logger log = org.slf4j.LoggerFactory.getLogger(ImportJob.class);

/**
Expand All @@ -46,14 +54,17 @@ public static PipelineResult runPipeline(ImportOptions options)
/*
* Steps:
* 1. Read messages from Feast Source as FeatureRow
* 2. Write FeatureRow to the corresponding Store
* 3. Write elements that failed to be processed to a dead letter queue.
* 4. Write metrics to a metrics sink
* 2. Validate the feature rows to ensure the schema matches what is registered to the system
* 3. Write FeatureRow to the corresponding Store
* 4. Write elements that failed to be processed to a dead letter queue.
* 5. Write metrics to a metrics sink
*/

PipelineOptionsValidator.validate(ImportOptions.class, options);
Pipeline pipeline = Pipeline.create(options);

log.info("Starting import job with settings: \n{}", options.toString());

List<FeatureSetSpec> featureSetSpecs =
SpecUtil.parseFeatureSetSpecJsonList(options.getFeatureSetSpecJson());
List<Store> stores = SpecUtil.parseStoreJsonList(options.getStoreJson());
Expand All @@ -62,44 +73,71 @@ public static PipelineResult runPipeline(ImportOptions options)
List<FeatureSetSpec> subscribedFeatureSets =
SpecUtil.getSubscribedFeatureSets(store.getSubscriptionsList(), featureSetSpecs);

// Generate tags by key
Map<String, TupleTag<FeatureRow>> featureSetTagsByKey = subscribedFeatureSets.stream()
.map(fs -> {
String id = String.format("%s:%s", fs.getName(), fs.getVersion());
return Pair.of(id, new TupleTag<FeatureRow>(id) {
});
})
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));

// TODO: make the source part of the job initialisation options
Source source = subscribedFeatureSets.get(0).getSource();

// Step 1. Read messages from Feast Source as FeatureRow.
PCollectionTuple convertedFeatureRows =
pipeline.apply(
"ReadFeatureRowFromSource",
ReadFromSource.newBuilder()
.setSource(source)
.setFeatureSetTagByKey(featureSetTagsByKey)
.setFailureTag(DEADLETTER_OUT)
.build());

for (FeatureSetSpec featureSet : subscribedFeatureSets) {
// Ensure Store has valid configuration and Feast can access it.
StoreUtil.setupStore(store, featureSet);
String id = String.format("%s:%s", featureSet.getName(), featureSet.getVersion());

// Step 2. Validate incoming FeatureRows
PCollectionTuple validatedRows = convertedFeatureRows
.get(featureSetTagsByKey.get(id))
.apply(ValidateFeatureRows.newBuilder()
.setFeatureSetSpec(featureSet)
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());

// Step 1. Read messages from Feast Source as FeatureRow.
PCollectionTuple convertedFeatureRows =
pipeline.apply(
"ReadFeatureRowFromSource",
ReadFromSource.newBuilder()
.setSource(featureSet.getSource())
.setFieldByName(SpecUtil.getFieldByName(featureSet))
.setFeatureSetName(featureSet.getName())
.setFeatureSetVersion(featureSet.getVersion())
.setSuccessTag(FEATURE_ROW_OUT)
.setFailureTag(DEADLETTER_OUT)
.build());

// Step 2. Write FeatureRow to the corresponding Store.
convertedFeatureRows
// Step 3. Write FeatureRow to the corresponding Store.
validatedRows
.get(FEATURE_ROW_OUT)
.apply(
"WriteFeatureRowToStore",
WriteToStore.newBuilder().setFeatureSetSpec(featureSet).setStore(store).build());

// Step 3. Write FailedElements to a dead letter table in BigQuery.
// Step 4. Write FailedElements to a dead letter table in BigQuery.
if (options.getDeadLetterTableSpec() != null) {
convertedFeatureRows
.get(DEADLETTER_OUT)
.apply(
"WriteFailedElements",
"WriteFailedElements_ReadFromSource",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());
Comment thread
zhilingc marked this conversation as resolved.

validatedRows
.get(DEADLETTER_OUT)
.apply("WriteFailedElements_ValidateRows",
WriteFailedElementToBigQuery.newBuilder()
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
.setTableSpec(options.getDeadLetterTableSpec())
.build());
}

// Step 4. Write metrics to a metrics sink.
convertedFeatureRows
// Step 5. Write metrics to a metrics sink.
validatedRows
.apply("WriteMetrics", WriteMetricsTransform.newBuilder()
.setFeatureSetSpec(featureSet)
.setStoreName(store.getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,29 @@

import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.common.collect.Lists;
import feast.core.SourceProto.Source;
import feast.core.SourceProto.SourceType;
import feast.ingestion.transform.fn.KafkaRecordToFeatureRowDoFn;
import feast.ingestion.values.FailedElement;
import feast.ingestion.values.Field;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FieldProto;
import feast.types.ValueProto.Value.ValCase;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.exception.ExceptionUtils;

@AutoValue
public abstract class ReadFromSource extends PTransform<PBegin, PCollectionTuple> {

public abstract Source getSource();

public abstract Map<String, Field> getFieldByName();

public abstract String getFeatureSetName();

public abstract int getFeatureSetVersion();

public abstract TupleTag<FeatureRow> getSuccessTag();
public abstract Map<String, TupleTag<FeatureRow>> getFeatureSetTagByKey();

public abstract TupleTag<FailedElement> getFailureTag();

Expand All @@ -49,13 +37,8 @@ public abstract static class Builder {

public abstract Builder setSource(Source source);

public abstract Builder setFeatureSetName(String featureSetName);

public abstract Builder setFeatureSetVersion(int featureSetVersion);

public abstract Builder setFieldByName(Map<String, Field> fieldByName);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);
public abstract Builder setFeatureSetTagByKey(
Map<String, TupleTag<FeatureRow>> featureSetTagByKey);

public abstract Builder setFailureTag(TupleTag<FailedElement> failureTag);

Expand Down Expand Up @@ -93,13 +76,13 @@ public PCollectionTuple expand(PBegin input) {
.commitOffsetsInFinalize())
.apply(
"KafkaRecordToFeatureRow", ParDo.of(KafkaRecordToFeatureRowDoFn.newBuilder()
.setFeatureSetName(getFeatureSetName())
.setFeatureSetVersion(getFeatureSetVersion())
.setFieldByName(getFieldByName())
.setSuccessTag(getSuccessTag())
.setFeatureSetTagByKey(getFeatureSetTagByKey())
.setFailureTag(getFailureTag())
.build())
.withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
.withOutputTags(new TupleTag<FeatureRow>("placeholder") {},
TupleTagList.of(Lists
.newArrayList(getFeatureSetTagByKey().values()))
.and(getFailureTag())));
}

private String generateConsumerGroupId(String jobName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package feast.ingestion.transform;

import com.google.auto.value.AutoValue;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.ingestion.transform.fn.ValidateFeatureRowDoFn;
import feast.ingestion.utils.SpecUtil;
import feast.ingestion.values.FailedElement;
import feast.ingestion.values.Field;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Map;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;

@AutoValue
public abstract class ValidateFeatureRows extends
PTransform<PCollection<FeatureRow>, PCollectionTuple> {

public abstract FeatureSetSpec getFeatureSetSpec();

public abstract TupleTag<FeatureRow> getSuccessTag();

public abstract TupleTag<FailedElement> getFailureTag();

public static Builder newBuilder() {
return new AutoValue_ValidateFeatureRows.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setFeatureSetSpec(FeatureSetSpec featureSetSpec);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);

public abstract Builder setFailureTag(TupleTag<FailedElement> failureTag);

public abstract ValidateFeatureRows build();
}

@Override
public PCollectionTuple expand(PCollection<FeatureRow> input) {
Map<String, Field> fieldsByName = SpecUtil
.getFieldByName(getFeatureSetSpec());

return input.apply("ValidateFeatureRows",
ParDo.of(ValidateFeatureRowDoFn.newBuilder()
.setFeatureSetName(getFeatureSetSpec().getName())
.setFeatureSetVersion(getFeatureSetSpec().getVersion())
.setFieldByName(fieldsByName)
.setSuccessTag(getSuccessTag())
.setFailureTag(getFailureTag())
.build())
.withOutputTags(getSuccessTag(), TupleTagList.of(getFailureTag())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.google.auto.value.AutoValue;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.ingestion.transform.ReadFromSource;
import feast.ingestion.transform.ReadFromSource.Builder;
import feast.ingestion.values.FailedElement;
import feast.ingestion.values.Field;
import feast.types.FeatureRowProto.FeatureRow;
Expand All @@ -17,14 +19,7 @@
@AutoValue
public abstract class KafkaRecordToFeatureRowDoFn extends
DoFn<KafkaRecord<byte[], byte[]>, FeatureRow> {

public abstract String getFeatureSetName();

public abstract int getFeatureSetVersion();

public abstract Map<String, Field> getFieldByName();

public abstract TupleTag<FeatureRow> getSuccessTag();
public abstract Map<String, TupleTag<FeatureRow>> getFeatureSetTagByKey();

public abstract TupleTag<FailedElement> getFailureTag();

Expand All @@ -35,13 +30,7 @@ public static KafkaRecordToFeatureRowDoFn.Builder newBuilder() {
@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setFeatureSetName(String featureSetName);

public abstract Builder setFeatureSetVersion(int featureSetVersion);

public abstract Builder setFieldByName(Map<String, Field> fieldByName);

public abstract Builder setSuccessTag(TupleTag<FeatureRow> successTag);
public abstract Builder setFeatureSetTagByKey(Map<String, TupleTag<FeatureRow>> featureSetTagByKey);

public abstract Builder setFailureTag(TupleTag<FailedElement> failureTag);

Expand All @@ -67,55 +56,19 @@ public void processElement(ProcessContext context) {
.build());
return;
}

// If FeatureRow contains field names that do not exist as EntitySpec
// or FeatureSpec in FeatureSetSpec, mark the FeatureRow as FailedElement.
String error = null;
String featureSetId = String.format("%s:%d", getFeatureSetName(), getFeatureSetVersion());
if (featureRow.getFeatureSet().equals(featureSetId)) {

for (FieldProto.Field field : featureRow.getFieldsList()) {
if (!getFieldByName().containsKey(field.getName())) {
error =
String.format(
"FeatureRow contains field '%s' which do not exists in FeatureSet '%s' version '%d'. Please check the FeatureRow data.",
field.getName(), getFeatureSetName(), getFeatureSetVersion());
break;
}
// If value is set in the FeatureRow, make sure the value type matches
// that defined in FeatureSetSpec
if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) {
int expectedTypeFieldNumber =
getFieldByName().get(field.getName()).getType().getNumber();
int actualTypeFieldNumber = field.getValue().getValCase().getNumber();
if (expectedTypeFieldNumber != actualTypeFieldNumber) {
error =
String.format(
"FeatureRow contains field '%s' with invalid type '%s'. Feast expects the field type to match that in FeatureSet '%s'. Please check the FeatureRow data.",
field.getName(),
field.getValue().getValCase(),
getFieldByName().get(field.getName()).getType());
break;
}
}
}
} else {
error = String.format(
"FeatureRow contains invalid feature set id %s. Please check that the feature rows are being published to the correct topic on the feature stream.",
featureSetId);
}

if (error != null) {
TupleTag<FeatureRow> tag = getFeatureSetTagByKey()
.getOrDefault(featureRow.getFeatureSet(), null);
if (tag == null) {
context.output(
getFailureTag(),
FailedElement.newBuilder()
.setTransformName("KafkaRecordToFeatureRow")
.setJobName(context.getPipelineOptions().getJobName())
.setPayload(featureRow.toString())
.setErrorMessage(error)
.setPayload(new String(Base64.getEncoder().encode(value)))
.setErrorMessage(String.format("Got row with unexpected feature set id %s. Expected one of %s.", featureRow.getFeatureSet(), getFeatureSetTagByKey().keySet()))
.build());
} else {
context.output(featureRow);
return;
}
context.output(tag, featureRow);
}
}
Loading