diff --git a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java index dab50ce5dfc..201e739c065 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java @@ -63,6 +63,7 @@ public PDone expand(PCollection input) { switch (storeType) { case REDIS: + RedisConfig redisConfig = getStore().getRedisConfig(); input .apply( @@ -75,10 +76,6 @@ public PDone expand(PCollection input) { case BIGQUERY: BigQueryConfig bigqueryConfig = getStore().getBigqueryConfig(); - TimePartitioning timePartitioning = - new TimePartitioning() - .setType("DAY") - .setField(FeatureRowToTableRow.getEventTimestampColumn()); WriteResult bigqueryWriteResult = input diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WindowRecords.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WindowRecords.java deleted file mode 100644 index 4796c83603c..00000000000 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WindowRecords.java +++ /dev/null @@ -1,36 +0,0 @@ -package feast.ingestion.transform.metrics; - -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; - -public class WindowRecords extends - PTransform, PCollection>>> { - - private final long windowSize; - - public WindowRecords(long windowSize) { - this.windowSize = windowSize; - } - - @Override - public PCollection>> expand(PCollection input) { - return input - .apply("Window records", - Window.into(FixedWindows.of(Duration.standardSeconds(windowSize)))) - .apply("Add key", ParDo.of(new DoFn>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.of(1, c.element())); - } - })) - .apply("Collect", GroupByKey.create()); - } -} diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteDeadletterRowMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteDeadletterRowMetricsDoFn.java index 7040463eec4..884bd7a173a 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteDeadletterRowMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteDeadletterRowMetricsDoFn.java @@ -4,15 +4,13 @@ import com.timgroup.statsd.NonBlockingStatsDClient; import com.timgroup.statsd.StatsDClient; import com.timgroup.statsd.StatsDClientException; -import feast.core.FeatureSetProto.FeatureSetSpec; import feast.ingestion.values.FailedElement; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; @AutoValue public abstract class WriteDeadletterRowMetricsDoFn extends - DoFn>, Void> { + DoFn { private static final Logger log = org.slf4j.LoggerFactory .getLogger(WriteDeadletterRowMetricsDoFn.class); @@ -59,17 +57,15 @@ public void setup() { @ProcessElement public void processElement(ProcessContext c) { - - for (FailedElement ignored : c.element().getValue()) { - try { - statsd.count("deadletter_row_count", 1, - STORE_TAG_KEY + ":" + getStoreName(), - FEATURE_SET_NAME_TAG_KEY + ":" + ignored.getFeatureSetName(), - FEATURE_SET_VERSION_TAG_KEY + ":" + ignored.getFeatureSetVersion(), - INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); - } catch (StatsDClientException e) { - log.warn("Unable to push metrics to server", e); - } + FailedElement ignored = c.element(); + try { + statsd.count("deadletter_row_count", 1, + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_NAME_TAG_KEY + ":" + ignored.getFeatureSetName(), + FEATURE_SET_VERSION_TAG_KEY + ":" + ignored.getFeatureSetVersion(), + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + } catch (StatsDClientException e) { + log.warn("Unable to push metrics to server", e); } } } diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java index 8fa7954e1cc..755089d363f 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java @@ -1,7 +1,6 @@ package feast.ingestion.transform.metrics; import com.google.auto.value.AutoValue; -import feast.core.FeatureSetProto.FeatureSetSpec; import feast.ingestion.options.ImportOptions; import feast.ingestion.values.FailedElement; import feast.types.FeatureRowProto.FeatureRow; @@ -17,9 +16,6 @@ @AutoValue public abstract class WriteMetricsTransform extends PTransform { - private static final long WINDOW_SIZE_SECONDS = 15; - private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteMetricsTransform.class); - public abstract String getStoreName(); public abstract TupleTag getSuccessTag(); @@ -49,9 +45,7 @@ public PDone expand(PCollectionTuple input) { case "statsd": input.get(getFailureTag()) - .apply("Window records", - new WindowRecords<>(WINDOW_SIZE_SECONDS)) - .apply("Write deadletter metrics", ParDo.of( + .apply("WriteDeadletterMetrics", ParDo.of( WriteDeadletterRowMetricsDoFn.newBuilder() .setStatsdHost(options.getStatsdHost()) .setStatsdPort(options.getStatsdPort()) @@ -59,9 +53,7 @@ public PDone expand(PCollectionTuple input) { .build())); input.get(getSuccessTag()) - .apply("Window records", - new WindowRecords<>(WINDOW_SIZE_SECONDS)) - .apply("Write row metrics", ParDo + .apply("WriteRowMetrics", ParDo .of(WriteRowMetricsDoFn.newBuilder() .setStatsdHost(options.getStatsdHost()) .setStatsdPort(options.getStatsdPort()) diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java index fc9fcd64cfb..e0bcf48ceaf 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java @@ -9,11 +9,10 @@ import feast.types.FieldProto.Field; import feast.types.ValueProto.Value.ValCase; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; @AutoValue -public abstract class WriteRowMetricsDoFn extends DoFn>, Void> { +public abstract class WriteRowMetricsDoFn extends DoFn { private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteRowMetricsDoFn.class); @@ -69,53 +68,52 @@ public void setup() { @ProcessElement public void processElement(ProcessContext c) { - long missingValueCount = 0; - try { - for (FeatureRow row : c.element().getValue()) { - long eventTimestamp = com.google.protobuf.util.Timestamps.toMillis(row.getEventTimestamp()); - - String[] split = row.getFeatureSet().split(":"); - String featureSetName = split[0]; - String featureSetVersion = split[1]; - statsd.histogram("feature_row_lag_ms", System.currentTimeMillis() - eventTimestamp, - STORE_TAG_KEY + ":" + getStoreName(), - FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, - FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, - INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); - - statsd.histogram("feature_row_event_time_epoch_ms", eventTimestamp, - STORE_TAG_KEY + ":" + getStoreName(), - FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, - FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, - INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); - - for (Field field : row.getFieldsList()) { - if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) { - statsd.histogram("feature_value_lag_ms", System.currentTimeMillis() - eventTimestamp, - STORE_TAG_KEY + ":" + getStoreName(), - FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, - FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, - FEATURE_TAG_KEY + ":" + field.getName(), - INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); - } else { - missingValueCount++; - } + FeatureRow row = c.element(); + long eventTimestamp = com.google.protobuf.util.Timestamps.toMillis(row.getEventTimestamp()); + + String[] split = row.getFeatureSet().split(":"); + String featureSetName = split[0]; + String featureSetVersion = split[1]; + + statsd.histogram("feature_row_lag_ms", System.currentTimeMillis() - eventTimestamp, + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, + FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + + statsd.histogram("feature_row_event_time_epoch_ms", eventTimestamp, + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, + FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + + for (Field field : row.getFieldsList()) { + if (!field.getValue().getValCase().equals(ValCase.VAL_NOT_SET)) { + statsd.histogram("feature_value_lag_ms", System.currentTimeMillis() - eventTimestamp, + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, + FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, + FEATURE_TAG_KEY + ":" + field.getName(), + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + } else { + statsd.count("feature_value_missing_count", 1, + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, + FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, + FEATURE_TAG_KEY + ":" + field.getName(), + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); } - statsd.count("feature_row_ingested_count", 1, - STORE_TAG_KEY + ":" + getStoreName(), - FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, - FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, - INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); - - statsd.count("feature_row_missing_value_count", missingValueCount, - STORE_TAG_KEY + ":" + getStoreName(), - FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, - FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, - INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); } - } catch (StatsDClientException e) { + statsd.count("feature_row_ingested_count", 1, + STORE_TAG_KEY + ":" + getStoreName(), + FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, + FEATURE_SET_VERSION_TAG_KEY + ":" + featureSetVersion, + INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName()); + + } catch ( + StatsDClientException e) { log.warn("Unable to push metrics to server", e); } }