diff --git a/infra/scripts/test-end-to-end-batch-dataflow.sh b/infra/scripts/test-end-to-end-batch-dataflow.sh index 1d75463b0ec..9b3c8b5a83b 100644 --- a/infra/scripts/test-end-to-end-batch-dataflow.sh +++ b/infra/scripts/test-end-to-end-batch-dataflow.sh @@ -216,7 +216,6 @@ if [[ ${TEST_EXIT_CODE} != 0 ]]; then fi cd ${ORIGINAL_DIR} -exit ${TEST_EXIT_CODE} echo " ============================================================ @@ -243,4 +242,6 @@ while read line do echo $line gcloud dataflow jobs cancel $line --region=${GCLOUD_REGION} -done < ingesting_jobs.txt \ No newline at end of file +done < ingesting_jobs.txt + +exit ${TEST_EXIT_CODE} diff --git a/infra/scripts/test-end-to-end-batch.sh b/infra/scripts/test-end-to-end-batch.sh index fe24c0df33c..ffaba183a9d 100755 --- a/infra/scripts/test-end-to-end-batch.sh +++ b/infra/scripts/test-end-to-end-batch.sh @@ -281,7 +281,6 @@ if [[ ${TEST_EXIT_CODE} != 0 ]]; then fi cd ${ORIGINAL_DIR} -exit ${TEST_EXIT_CODE} echo " ============================================================ @@ -290,3 +289,5 @@ Cleaning up " bq rm -r -f ${GOOGLE_CLOUD_PROJECT}:${DATASET_NAME} + +exit ${TEST_EXIT_CODE} \ No newline at end of file diff --git a/tests/e2e/bq-batch-retrieval.py b/tests/e2e/bq-batch-retrieval.py index 35075c92bbb..a4d8a729ef7 100644 --- a/tests/e2e/bq-batch-retrieval.py +++ b/tests/e2e/bq-batch-retrieval.py @@ -19,6 +19,7 @@ from feast.feature_set import FeatureSet from feast.type_map import ValueType from google.cloud import storage, bigquery +from google.cloud.storage import Blob from google.protobuf.duration_pb2 import Duration from pandavro import to_avro @@ -155,6 +156,7 @@ def test_batch_get_batch_features_with_file(client): client.ingest(file_fs1, features_1_df, timeout=480) # Rename column (datetime -> event_timestamp) + features_1_df['datetime'] + pd.Timedelta(seconds=1) # adds buffer to avoid rounding errors features_1_df = features_1_df.rename(columns={"datetime": "event_timestamp"}) to_avro( @@ -169,6 +171,7 @@ def test_batch_get_batch_features_with_file(client): ) output = feature_retrieval_job.to_dataframe() + clean_up_remote_files(feature_retrieval_job.get_avro_files()) print(output.head()) assert output["entity_id"].to_list() == [ @@ -194,6 +197,7 @@ def test_batch_get_batch_features_with_gs_path(client, gcs_path): client.ingest(gcs_fs1, features_1_df, timeout=360) # Rename column (datetime -> event_timestamp) + features_1_df['datetime'] + pd.Timedelta(seconds=1) # adds buffer to avoid rounding errors features_1_df = features_1_df.rename(columns={"datetime": "event_timestamp"}) # Output file to local @@ -220,6 +224,8 @@ def test_batch_get_batch_features_with_gs_path(client, gcs_path): ) output = feature_retrieval_job.to_dataframe() + clean_up_remote_files(feature_retrieval_job.get_avro_files()) + blob.delete() print(output.head()) assert output["entity_id"].to_list() == [ @@ -256,6 +262,7 @@ def test_batch_order_by_creation_time(client): feature_refs=[f"{PROJECT_NAME}/feature_value3"], ) output = feature_retrieval_job.to_dataframe() + clean_up_remote_files(feature_retrieval_job.get_avro_files()) print(output.head()) assert output["feature_value3"].to_list() == ["CORRECT"] * N_ROWS @@ -291,6 +298,7 @@ def test_batch_additional_columns_in_entity_table(client): entity_rows=entity_df, feature_refs=[f"{PROJECT_NAME}/feature_value4"] ) output = feature_retrieval_job.to_dataframe().sort_values(by=["entity_id"]) + clean_up_remote_files(feature_retrieval_job.get_avro_files()) print(output.head(10)) assert np.allclose( @@ -336,6 +344,7 @@ def test_batch_point_in_time_correctness_join(client): entity_rows=entity_df, feature_refs=[f"{PROJECT_NAME}/feature_value5"] ) output = feature_retrieval_job.to_dataframe() + clean_up_remote_files(feature_retrieval_job.get_avro_files()) print(output.head()) assert output["feature_value5"].to_list() == ["CORRECT"] * N_EXAMPLES @@ -384,6 +393,7 @@ def test_batch_multiple_featureset_joins(client): ], ) output = feature_retrieval_job.to_dataframe() + clean_up_remote_files(feature_retrieval_job.get_avro_files()) print(output.head()) assert output["entity_id"].to_list() == [ @@ -417,6 +427,7 @@ def test_batch_no_max_age(client): ) output = feature_retrieval_job.to_dataframe() + clean_up_remote_files(feature_retrieval_job.get_avro_files()) print(output.head()) assert output["entity_id"].to_list() == output["feature_value8"].to_list() @@ -499,6 +510,7 @@ def test_update_featureset_apply_featureset_and_ingest_first_subset( ) output = feature_retrieval_job.to_dataframe().sort_values(by=["entity_id"]) + clean_up_remote_files(feature_retrieval_job.get_avro_files()) print(output.head()) assert output["update_feature1"].to_list() == subset_df["update_feature1"].to_list() @@ -552,6 +564,7 @@ def test_update_featureset_update_featureset_and_ingest_second_subset( ) output = feature_retrieval_job.to_dataframe().sort_values(by=["entity_id"]) + clean_up_remote_files(feature_retrieval_job.get_avro_files()) print(output.head()) assert output["update_feature1"].to_list() == subset_df["update_feature1"].to_list() @@ -587,6 +600,7 @@ def test_update_featureset_retrieve_valid_fields(client, update_featureset_dataf ], ) output = feature_retrieval_job.to_dataframe().sort_values(by=["entity_id"]) + clean_up_remote_files(feature_retrieval_job.get_avro_files()) print(output.head(10)) assert ( output["update_feature1"].to_list() @@ -623,3 +637,12 @@ def get_rows_ingested( for row in rows: return row["count"] + + +def clean_up_remote_files(files): + storage_client = storage.Client() + for file_uri in files: + if file_uri.scheme == "gs": + blob = Blob.from_string(file_uri.geturl(), client=storage_client) + blob.delete() +