From fc0656d8160f4a9df7096cffa687ef641d8d1e60 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Mon, 27 Apr 2020 00:41:04 +0800 Subject: [PATCH 1/2] Add unique dataset id for all batch ingestions --- protos/feast/core/Store.proto | 1 + protos/feast/types/FeatureRow.proto | 4 ++++ sdk/python/feast/client.py | 17 +++++++++++++++++ sdk/python/feast/loaders/ingest.py | 16 ++++++++++++---- .../bigquery/writer/BigQueryFeatureSink.java | 11 +++++++++-- .../bigquery/writer/FeatureRowToTableRow.java | 2 ++ 6 files changed, 45 insertions(+), 6 deletions(-) diff --git a/protos/feast/core/Store.proto b/protos/feast/core/Store.proto index 0aa4c8cd420..ebd5f73afec 100644 --- a/protos/feast/core/Store.proto +++ b/protos/feast/core/Store.proto @@ -69,6 +69,7 @@ message Store { // ====================|==================|================================ // - event_timestamp | TIMESTAMP | event time of the FeatureRow // - created_timestamp | TIMESTAMP | processing time of the ingestion of the FeatureRow + // - dataset_id | STRING | identifier of the batch dataset a row belongs to // - job_id | STRING | identifier for the job that writes the FeatureRow to the corresponding BigQuery table // // BigQuery table created will be partitioned by the field "event_timestamp" diff --git a/protos/feast/types/FeatureRow.proto b/protos/feast/types/FeatureRow.proto index c170cd5d502..c3614e00274 100644 --- a/protos/feast/types/FeatureRow.proto +++ b/protos/feast/types/FeatureRow.proto @@ -39,4 +39,8 @@ message FeatureRow { // /:. This value will be used by the feast ingestion job to filter // rows, and write the values to the correct tables. string feature_set = 6; + + // Identifier tying this feature row to a specific ingestion dataset. For + // batch loads, this dataset id can be attributed to a single ingestion job. + string dataset_id = 7; } diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 0a38236a510..47c84f18d52 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -18,6 +18,7 @@ import shutil import tempfile import time +import uuid from collections import OrderedDict from math import ceil from typing import Dict, List, Optional, Tuple, Union @@ -825,6 +826,7 @@ def ingest( # Loop optimization declarations produce = producer.produce flush = producer.flush + dataset_id = _generate_dataset_id(feature_set) # Transform and push data to Kafka if feature_set.source.source_type == "Kafka": @@ -832,6 +834,7 @@ def ingest( file=dest_path, row_groups=list(range(pq_file.num_row_groups)), fs=feature_set, + dataset_id=dataset_id, max_workers=max_workers, ): @@ -916,6 +919,20 @@ def _build_feature_references( return features +def _generate_dataset_id(feature_set: FeatureSet) -> str: + """ + Generates a UUID from the feature set name, version, and the current time. + + Args: + feature_set: Feature set of the dataset to be ingested. + + Returns: + UUID unique to current time and the feature set provided. + """ + uuid_str = f"{feature_set.name}_{feature_set.version}_{int(time.time())}" + return str(uuid.uuid3(uuid.NAMESPACE_DNS, uuid_str)) + + def _read_table_from_source( source: Union[pd.DataFrame, str], chunk_size: int, max_workers: int ) -> Tuple[str, str]: diff --git a/sdk/python/feast/loaders/ingest.py b/sdk/python/feast/loaders/ingest.py index 4d215cc9901..3c4e8b99d0e 100644 --- a/sdk/python/feast/loaders/ingest.py +++ b/sdk/python/feast/loaders/ingest.py @@ -26,7 +26,7 @@ def _encode_pa_tables( - file: str, feature_set: str, fields: dict, row_group_idx: int + file: str, feature_set: str, fields: dict, dataset_id: str, row_group_idx: int ) -> List[bytes]: """ Helper function to encode a PyArrow table(s) read from parquet file(s) into @@ -49,6 +49,9 @@ def _encode_pa_tables( fields (dict[str, enum.Enum.ValueType]): A mapping of field names to their value types. + dataset_id (str): + UUID unique to this dataset. + row_group_idx(int): Row group index to read and encode into byte like FeatureRow protobuf objects. @@ -81,7 +84,9 @@ def _encode_pa_tables( # Iterate through the rows for row_idx in range(table.num_rows): feature_row = FeatureRow( - event_timestamp=datetime_col[row_idx], feature_set=feature_set + event_timestamp=datetime_col[row_idx], + feature_set=feature_set, + dataset_id=dataset_id, ) # Loop optimization declaration ext = feature_row.fields.extend @@ -97,7 +102,7 @@ def _encode_pa_tables( def get_feature_row_chunks( - file: str, row_groups: List[int], fs: FeatureSet, max_workers: int + file: str, row_groups: List[int], fs: FeatureSet, dataset_id: str, max_workers: int ) -> Iterable[List[bytes]]: """ Iterator function to encode a PyArrow table read from a parquet file to @@ -115,6 +120,9 @@ def get_feature_row_chunks( fs (feast.feature_set.FeatureSet): FeatureSet describing parquet files. + dataset_id (str): + UUID unique to this dataset. + max_workers (int): Maximum number of workers to spawn. @@ -128,7 +136,7 @@ def get_feature_row_chunks( field_map = {field.name: field.dtype for field in fs.fields.values()} pool = Pool(max_workers) - func = partial(_encode_pa_tables, file, feature_set, field_map) + func = partial(_encode_pa_tables, file, feature_set, field_map, dataset_id) for chunk in pool.imap(func, row_groups): yield chunk return diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java index d155d3f1f50..33f79f8d97a 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java @@ -42,6 +42,8 @@ public abstract class BigQueryFeatureSink implements FeatureSink { "Event time for the FeatureRow"; public static final String BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION = "Processing time of the FeatureRow ingestion in Feast\""; + public static final String BIGQUERY_DATASET_ID_FIELD_DESCRIPTION = + "Identifier of the batch dataset a row belongs to"; public static final String BIGQUERY_JOB_ID_FIELD_DESCRIPTION = "Feast import job ID for the FeatureRow"; @@ -108,10 +110,13 @@ public void prepareWrite(FeatureSetProto.FeatureSet featureSet) { Table table = bigquery.getTable(tableId); if (table != null) { log.info( - "Writing to existing BigQuery table '{}:{}.{}'", - getProjectId(), + "Updating and writing to existing BigQuery table '{}:{}.{}'", + datasetId.getProject(), datasetId.getDataset(), tableName); + TableDefinition tableDefinition = createBigQueryTableDefinition(featureSet.getSpec()); + TableInfo tableInfo = TableInfo.of(tableId, tableDefinition); + bigquery.update(tableInfo); return; } @@ -166,6 +171,8 @@ private TableDefinition createBigQueryTableDefinition(FeatureSetProto.FeatureSet "created_timestamp", Pair.of( StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION), + "dataset_id", + Pair.of(StandardSQLTypeName.STRING, BIGQUERY_DATASET_ID_FIELD_DESCRIPTION), "job_id", Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION)); for (Map.Entry> entry : diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java index 12833b31b85..9eaf504558e 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java @@ -31,6 +31,7 @@ public class FeatureRowToTableRow implements SerializableFunction { private static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp"; private static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp"; + private static final String DATASET_ID_COLUMN = "dataset_id"; private static final String JOB_ID_COLUMN = "job_id"; private final String jobId; @@ -47,6 +48,7 @@ public TableRow apply(FeatureRow featureRow) { TableRow tableRow = new TableRow(); tableRow.set(EVENT_TIMESTAMP_COLUMN, Timestamps.toString(featureRow.getEventTimestamp())); tableRow.set(CREATED_TIMESTAMP_COLUMN, Instant.now().toString()); + tableRow.set(DATASET_ID_COLUMN, featureRow.getDatasetId()); tableRow.set(JOB_ID_COLUMN, jobId); for (Field field : featureRow.getFieldsList()) { From 6bedf6487c36d2679997cb94965e57a39c0e10e6 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Sat, 2 May 2020 00:16:57 +0800 Subject: [PATCH 2/2] Rename to ingestion_id --- protos/feast/core/Store.proto | 2 +- protos/feast/types/FeatureRow.proto | 5 ++--- sdk/python/feast/client.py | 6 +++--- sdk/python/feast/loaders/ingest.py | 20 +++++++++++-------- .../bigquery/writer/BigQueryFeatureSink.java | 8 ++++---- .../bigquery/writer/FeatureRowToTableRow.java | 4 ++-- 6 files changed, 24 insertions(+), 21 deletions(-) diff --git a/protos/feast/core/Store.proto b/protos/feast/core/Store.proto index ebd5f73afec..f35561467e1 100644 --- a/protos/feast/core/Store.proto +++ b/protos/feast/core/Store.proto @@ -69,7 +69,7 @@ message Store { // ====================|==================|================================ // - event_timestamp | TIMESTAMP | event time of the FeatureRow // - created_timestamp | TIMESTAMP | processing time of the ingestion of the FeatureRow - // - dataset_id | STRING | identifier of the batch dataset a row belongs to + // - ingestion_id | STRING | unique id identifying groups of rows that have been ingested together // - job_id | STRING | identifier for the job that writes the FeatureRow to the corresponding BigQuery table // // BigQuery table created will be partitioned by the field "event_timestamp" diff --git a/protos/feast/types/FeatureRow.proto b/protos/feast/types/FeatureRow.proto index c3614e00274..c19a393fde5 100644 --- a/protos/feast/types/FeatureRow.proto +++ b/protos/feast/types/FeatureRow.proto @@ -40,7 +40,6 @@ message FeatureRow { // rows, and write the values to the correct tables. string feature_set = 6; - // Identifier tying this feature row to a specific ingestion dataset. For - // batch loads, this dataset id can be attributed to a single ingestion job. - string dataset_id = 7; + // Identifier tying this feature row to a specific ingestion job. + string ingestion_id = 7; } diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 47c84f18d52..0221a79b4b1 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -826,7 +826,7 @@ def ingest( # Loop optimization declarations produce = producer.produce flush = producer.flush - dataset_id = _generate_dataset_id(feature_set) + ingestion_id = _generate_ingestion_id(feature_set) # Transform and push data to Kafka if feature_set.source.source_type == "Kafka": @@ -834,7 +834,7 @@ def ingest( file=dest_path, row_groups=list(range(pq_file.num_row_groups)), fs=feature_set, - dataset_id=dataset_id, + ingestion_id=ingestion_id, max_workers=max_workers, ): @@ -919,7 +919,7 @@ def _build_feature_references( return features -def _generate_dataset_id(feature_set: FeatureSet) -> str: +def _generate_ingestion_id(feature_set: FeatureSet) -> str: """ Generates a UUID from the feature set name, version, and the current time. diff --git a/sdk/python/feast/loaders/ingest.py b/sdk/python/feast/loaders/ingest.py index 3c4e8b99d0e..34d0356ea78 100644 --- a/sdk/python/feast/loaders/ingest.py +++ b/sdk/python/feast/loaders/ingest.py @@ -26,7 +26,7 @@ def _encode_pa_tables( - file: str, feature_set: str, fields: dict, dataset_id: str, row_group_idx: int + file: str, feature_set: str, fields: dict, ingestion_id: str, row_group_idx: int ) -> List[bytes]: """ Helper function to encode a PyArrow table(s) read from parquet file(s) into @@ -49,8 +49,8 @@ def _encode_pa_tables( fields (dict[str, enum.Enum.ValueType]): A mapping of field names to their value types. - dataset_id (str): - UUID unique to this dataset. + ingestion_id (str): + UUID unique to this ingestion job. row_group_idx(int): Row group index to read and encode into byte like FeatureRow @@ -86,7 +86,7 @@ def _encode_pa_tables( feature_row = FeatureRow( event_timestamp=datetime_col[row_idx], feature_set=feature_set, - dataset_id=dataset_id, + ingestion_id=ingestion_id, ) # Loop optimization declaration ext = feature_row.fields.extend @@ -102,7 +102,11 @@ def _encode_pa_tables( def get_feature_row_chunks( - file: str, row_groups: List[int], fs: FeatureSet, dataset_id: str, max_workers: int + file: str, + row_groups: List[int], + fs: FeatureSet, + ingestion_id: str, + max_workers: int, ) -> Iterable[List[bytes]]: """ Iterator function to encode a PyArrow table read from a parquet file to @@ -120,8 +124,8 @@ def get_feature_row_chunks( fs (feast.feature_set.FeatureSet): FeatureSet describing parquet files. - dataset_id (str): - UUID unique to this dataset. + ingestion_id (str): + UUID unique to this ingestion job. max_workers (int): Maximum number of workers to spawn. @@ -136,7 +140,7 @@ def get_feature_row_chunks( field_map = {field.name: field.dtype for field in fs.fields.values()} pool = Pool(max_workers) - func = partial(_encode_pa_tables, file, feature_set, field_map, dataset_id) + func = partial(_encode_pa_tables, file, feature_set, field_map, ingestion_id) for chunk in pool.imap(func, row_groups): yield chunk return diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java index 33f79f8d97a..5d8f3d25cb7 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java @@ -42,8 +42,8 @@ public abstract class BigQueryFeatureSink implements FeatureSink { "Event time for the FeatureRow"; public static final String BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION = "Processing time of the FeatureRow ingestion in Feast\""; - public static final String BIGQUERY_DATASET_ID_FIELD_DESCRIPTION = - "Identifier of the batch dataset a row belongs to"; + public static final String BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION = + "Unique id identifying groups of rows that have been ingested together"; public static final String BIGQUERY_JOB_ID_FIELD_DESCRIPTION = "Feast import job ID for the FeatureRow"; @@ -171,8 +171,8 @@ private TableDefinition createBigQueryTableDefinition(FeatureSetProto.FeatureSet "created_timestamp", Pair.of( StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION), - "dataset_id", - Pair.of(StandardSQLTypeName.STRING, BIGQUERY_DATASET_ID_FIELD_DESCRIPTION), + "ingestion_id", + Pair.of(StandardSQLTypeName.STRING, BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION), "job_id", Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION)); for (Map.Entry> entry : diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java index 9eaf504558e..6a69b96d717 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java @@ -31,7 +31,7 @@ public class FeatureRowToTableRow implements SerializableFunction { private static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp"; private static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp"; - private static final String DATASET_ID_COLUMN = "dataset_id"; + private static final String INGESTION_ID_COLUMN = "ingestion_id"; private static final String JOB_ID_COLUMN = "job_id"; private final String jobId; @@ -48,7 +48,7 @@ public TableRow apply(FeatureRow featureRow) { TableRow tableRow = new TableRow(); tableRow.set(EVENT_TIMESTAMP_COLUMN, Timestamps.toString(featureRow.getEventTimestamp())); tableRow.set(CREATED_TIMESTAMP_COLUMN, Instant.now().toString()); - tableRow.set(DATASET_ID_COLUMN, featureRow.getDatasetId()); + tableRow.set(INGESTION_ID_COLUMN, featureRow.getIngestionId()); tableRow.set(JOB_ID_COLUMN, jobId); for (Field field : featureRow.getFieldsList()) {