From 6ee42755eabaaa2ae421906012b88eac78c67788 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Wed, 27 Nov 2019 15:36:32 +0800 Subject: [PATCH 1/4] Fix BigQuery write setting timepartition outside of table reference --- .../ingestion/transform/WriteToStore.java | 16 ++------- .../serving/bigquery/GetTableReference.java | 35 +++++++++++++++++++ 2 files changed, 38 insertions(+), 13 deletions(-) create mode 100644 ingestion/src/main/java/feast/store/serving/bigquery/GetTableReference.java diff --git a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java index b23d3d40465..986a0543baa 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java @@ -13,6 +13,7 @@ import feast.ingestion.utils.ResourceUtil; import feast.ingestion.values.FailedElement; import feast.store.serving.bigquery.FeatureRowToTableRow; +import feast.store.serving.bigquery.GetTableReference; import feast.store.serving.redis.FeatureRowToRedisMutationDoFn; import feast.store.serving.redis.RedisCustomIO; import feast.types.FeatureRowProto.FeatureRow; @@ -24,16 +25,12 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; -import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.ValueInSingleWindow; -import org.codehaus.jackson.JsonParser.Feature; import org.slf4j.Logger; @AutoValue @@ -88,15 +85,8 @@ public PDone expand(PCollection input) { .apply( "WriteTableRowToBigQuery", BigQueryIO.write() - .to((SerializableFunction, TableDestination>) element -> { - String[] split = element.getValue().getFeatureSet().split(":"); - return new TableDestination(String.format( - "%s:%s.%s_v%s", - bigqueryConfig.getProjectId(), - bigqueryConfig.getDatasetId(), - split[0], - split[1]), null); - }) + .to(new GetTableReference(bigqueryConfig.getProjectId(), + bigqueryConfig.getDatasetId())) .withFormatFunction(new FeatureRowToTableRow(options.getJobName())) .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(WriteDisposition.WRITE_APPEND) diff --git a/ingestion/src/main/java/feast/store/serving/bigquery/GetTableReference.java b/ingestion/src/main/java/feast/store/serving/bigquery/GetTableReference.java new file mode 100644 index 00000000000..0f0ba0386da --- /dev/null +++ b/ingestion/src/main/java/feast/store/serving/bigquery/GetTableReference.java @@ -0,0 +1,35 @@ +package feast.store.serving.bigquery; + +import com.google.api.services.bigquery.model.TimePartitioning; +import feast.types.FeatureRowProto.FeatureRow; +import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.ValueInSingleWindow; + +public class GetTableReference implements + SerializableFunction, TableDestination> { + + private String projectId; + private String datasetId; + + public GetTableReference(String projectId, String datasetId) { + this.projectId = projectId; + this.datasetId = datasetId; + } + + @Override + public TableDestination apply(ValueInSingleWindow input) { + String[] split = input.getValue().getFeatureSet().split(":"); + + TimePartitioning timePartitioning = + new TimePartitioning() + .setType("DAY") + .setField(FeatureRowToTableRow.getEventTimestampColumn()); + + return new TableDestination( + String.format("%s:%s.%s_v%s", projectId, datasetId, split[0], split[1]), + String + .format("Feast table for %s", input.getValue().getFeatureSet()), + timePartitioning); + } +} \ No newline at end of file From 3ba6fa70fb1abd5b9990a359d19ba89327b8c715 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Wed, 27 Nov 2019 15:55:29 +0800 Subject: [PATCH 2/4] Give core a bit more time to start up --- .prow/scripts/test-end-to-end.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.prow/scripts/test-end-to-end.sh b/.prow/scripts/test-end-to-end.sh index 592a3a59cd4..fd3291f0960 100755 --- a/.prow/scripts/test-end-to-end.sh +++ b/.prow/scripts/test-end-to-end.sh @@ -124,7 +124,7 @@ EOF nohup java -jar core/target/feast-core-0.3.2-SNAPSHOT.jar \ --spring.config.location=file:///tmp/core.application.yml \ &> /var/log/feast-core.log & -sleep 20 +sleep 30 tail -n10 /var/log/feast-core.log echo " From 64e0d8ec341da97add7de6990f9bb93bf3205dd6 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Wed, 27 Nov 2019 16:40:13 +0800 Subject: [PATCH 3/4] Rename GetTableReference to correctly reflect return type --- .../src/main/java/feast/ingestion/transform/WriteToStore.java | 4 ++-- .../{GetTableReference.java => GetTableDestination.java} | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) rename ingestion/src/main/java/feast/store/serving/bigquery/{GetTableReference.java => GetTableDestination.java} (90%) diff --git a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java index 986a0543baa..45a26bbed76 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java @@ -13,7 +13,7 @@ import feast.ingestion.utils.ResourceUtil; import feast.ingestion.values.FailedElement; import feast.store.serving.bigquery.FeatureRowToTableRow; -import feast.store.serving.bigquery.GetTableReference; +import feast.store.serving.bigquery.GetTableDestination; import feast.store.serving.redis.FeatureRowToRedisMutationDoFn; import feast.store.serving.redis.RedisCustomIO; import feast.types.FeatureRowProto.FeatureRow; @@ -85,7 +85,7 @@ public PDone expand(PCollection input) { .apply( "WriteTableRowToBigQuery", BigQueryIO.write() - .to(new GetTableReference(bigqueryConfig.getProjectId(), + .to(new GetTableDestination(bigqueryConfig.getProjectId(), bigqueryConfig.getDatasetId())) .withFormatFunction(new FeatureRowToTableRow(options.getJobName())) .withCreateDisposition(CreateDisposition.CREATE_NEVER) diff --git a/ingestion/src/main/java/feast/store/serving/bigquery/GetTableReference.java b/ingestion/src/main/java/feast/store/serving/bigquery/GetTableDestination.java similarity index 90% rename from ingestion/src/main/java/feast/store/serving/bigquery/GetTableReference.java rename to ingestion/src/main/java/feast/store/serving/bigquery/GetTableDestination.java index 0f0ba0386da..14517ffd817 100644 --- a/ingestion/src/main/java/feast/store/serving/bigquery/GetTableReference.java +++ b/ingestion/src/main/java/feast/store/serving/bigquery/GetTableDestination.java @@ -6,13 +6,13 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.ValueInSingleWindow; -public class GetTableReference implements +public class GetTableDestination implements SerializableFunction, TableDestination> { private String projectId; private String datasetId; - public GetTableReference(String projectId, String datasetId) { + public GetTableDestination(String projectId, String datasetId) { this.projectId = projectId; this.datasetId = datasetId; } From c2a01ef866a602c0cfb0f3fd8a475332bba20a1c Mon Sep 17 00:00:00 2001 From: zhilingc Date: Wed, 27 Nov 2019 16:49:16 +0800 Subject: [PATCH 4/4] Increase kafka sleep time --- .prow/scripts/test-end-to-end.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.prow/scripts/test-end-to-end.sh b/.prow/scripts/test-end-to-end.sh index fd3291f0960..88917bf3bef 100755 --- a/.prow/scripts/test-end-to-end.sh +++ b/.prow/scripts/test-end-to-end.sh @@ -59,7 +59,7 @@ nohup /tmp/kafka/bin/zookeeper-server-start.sh /tmp/kafka/config/zookeeper.prope sleep 5 tail -n10 /var/log/zookeeper.log nohup /tmp/kafka/bin/kafka-server-start.sh /tmp/kafka/config/server.properties &> /var/log/kafka.log 2>&1 & -sleep 5 +sleep 10 tail -n10 /var/log/kafka.log echo "