Skip to content

Commit 3b4dcaa

Browse files
davidheryantofeast-ci-bot
authored andcommitted
Fix incorrect BigQuery schema creation from FeatureSetSpec (#340)
* Add test and fix for BigQuery schema creation from FeatureSetSpec * Add entity fields in the createBigQueryTableDefinition test * Apply spotless plugin for consistent formatting
1 parent 1ef1b71 commit 3b4dcaa

2 files changed

Lines changed: 192 additions & 8 deletions

File tree

ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,19 @@
6868
* need to manage any schemas. This class will not be used in that case.
6969
*/
7070
public class StoreUtil {
71+
7172
private static final Map<ValueType.Enum, StandardSQLTypeName> VALUE_TYPE_TO_STANDARD_SQL_TYPE =
7273
new HashMap<>();
7374
private static final Logger log = org.slf4j.LoggerFactory.getLogger(StoreUtil.class);
7475

76+
// Column description for reserved fields
77+
public static final String BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION =
78+
"Event time for the FeatureRow";
79+
public static final String BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION =
80+
"Processing time of the FeatureRow ingestion in Feast\"";
81+
public static final String BIGQUERY_JOB_ID_FIELD_DESCRIPTION =
82+
"Feast import job ID for the FeatureRow";
83+
7584
// Refer to protos/feast/core/Store.proto for the mapping definition.
7685
static {
7786
VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(Enum.BYTES, StandardSQLTypeName.BYTES);
@@ -110,15 +119,15 @@ public static void setupStore(Store store, FeatureSetSpec featureSetSpec) {
110119
}
111120

112121
@SuppressWarnings("DuplicatedCode")
113-
private static TableDefinition createBigQueryTableDefinition(FeatureSetSpec featureSetSpec) {
122+
public static TableDefinition createBigQueryTableDefinition(FeatureSetSpec featureSetSpec) {
114123
List<Field> fields = new ArrayList<>();
115124
log.info("Table will have the following fields:");
116125

117126
for (EntitySpec entitySpec : featureSetSpec.getEntitiesList()) {
118127
Builder builder =
119128
Field.newBuilder(
120129
entitySpec.getName(), VALUE_TYPE_TO_STANDARD_SQL_TYPE.get(entitySpec.getValueType()));
121-
if (entitySpec.getValueTypeValue() >= 7 && entitySpec.getValueTypeValue() <= 17) {
130+
if (entitySpec.getValueType().name().toLowerCase().endsWith("_list")) {
122131
builder.setMode(Mode.REPEATED);
123132
}
124133
Field field = builder.build();
@@ -130,7 +139,7 @@ private static TableDefinition createBigQueryTableDefinition(FeatureSetSpec feat
130139
Field.newBuilder(
131140
featureSpec.getName(),
132141
VALUE_TYPE_TO_STANDARD_SQL_TYPE.get(featureSpec.getValueType()));
133-
if (featureSpec.getValueTypeValue() >= 7 && featureSpec.getValueTypeValue() <= 17) {
142+
if (featureSpec.getValueType().name().toLowerCase().endsWith("_list")) {
134143
builder.setMode(Mode.REPEATED);
135144
}
136145
Field field = builder.build();
@@ -143,13 +152,12 @@ private static TableDefinition createBigQueryTableDefinition(FeatureSetSpec feat
143152
reservedFieldNameToPairOfStandardSQLTypeAndDescription =
144153
ImmutableMap.of(
145154
"event_timestamp",
146-
Pair.of(StandardSQLTypeName.TIMESTAMP, "Event time for the FeatureRow"),
155+
Pair.of(StandardSQLTypeName.TIMESTAMP, BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION),
147156
"created_timestamp",
148157
Pair.of(
149-
StandardSQLTypeName.TIMESTAMP,
150-
"Processing time of the FeatureRow ingestion in Feast"),
158+
StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION),
151159
"job_id",
152-
Pair.of(StandardSQLTypeName.STRING, "Feast import job ID for the FeatureRow"));
160+
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION));
153161
for (Map.Entry<String, Pair<StandardSQLTypeName, String>> entry :
154162
reservedFieldNameToPairOfStandardSQLTypeAndDescription.entrySet()) {
155163
Field field =

ingestion/src/test/java/feast/ingestion/util/StoreUtilTest.java

Lines changed: 177 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,39 @@
1616
*/
1717
package feast.ingestion.util;
1818

19+
import static feast.types.ValueProto.ValueType.Enum.BOOL;
20+
import static feast.types.ValueProto.ValueType.Enum.BOOL_LIST;
21+
import static feast.types.ValueProto.ValueType.Enum.BYTES;
22+
import static feast.types.ValueProto.ValueType.Enum.BYTES_LIST;
23+
import static feast.types.ValueProto.ValueType.Enum.DOUBLE;
24+
import static feast.types.ValueProto.ValueType.Enum.DOUBLE_LIST;
25+
import static feast.types.ValueProto.ValueType.Enum.FLOAT;
26+
import static feast.types.ValueProto.ValueType.Enum.FLOAT_LIST;
1927
import static feast.types.ValueProto.ValueType.Enum.INT32;
28+
import static feast.types.ValueProto.ValueType.Enum.INT32_LIST;
29+
import static feast.types.ValueProto.ValueType.Enum.INT64;
30+
import static feast.types.ValueProto.ValueType.Enum.INT64_LIST;
31+
import static feast.types.ValueProto.ValueType.Enum.STRING;
2032
import static feast.types.ValueProto.ValueType.Enum.STRING_LIST;
2133

2234
import com.google.cloud.bigquery.BigQuery;
35+
import com.google.cloud.bigquery.Field;
36+
import com.google.cloud.bigquery.Field.Mode;
37+
import com.google.cloud.bigquery.Schema;
38+
import com.google.cloud.bigquery.StandardSQLTypeName;
2339
import feast.core.FeatureSetProto.EntitySpec;
2440
import feast.core.FeatureSetProto.FeatureSetSpec;
2541
import feast.core.FeatureSetProto.FeatureSpec;
2642
import feast.ingestion.utils.StoreUtil;
43+
import java.util.Arrays;
44+
import org.junit.Assert;
2745
import org.junit.Test;
2846
import org.mockito.Mockito;
2947

3048
public class StoreUtilTest {
49+
3150
@Test
32-
public void setupBigQuery_shouldCreateTable_givenFeatureSetSpec() {
51+
public void setupBigQuery_shouldCreateTable_givenValidFeatureSetSpec() {
3352
FeatureSetSpec featureSetSpec =
3453
FeatureSetSpec.newBuilder()
3554
.setName("feature_set_1")
@@ -41,4 +60,161 @@ public void setupBigQuery_shouldCreateTable_givenFeatureSetSpec() {
4160
BigQuery mockedBigquery = Mockito.mock(BigQuery.class);
4261
StoreUtil.setupBigQuery(featureSetSpec, "project-1", "dataset_1", mockedBigquery);
4362
}
63+
64+
@Test
65+
public void createBigQueryTableDefinition_shouldCreateCorrectSchema_givenValidFeatureSetSpec() {
66+
FeatureSetSpec input =
67+
FeatureSetSpec.newBuilder()
68+
.addAllEntities(
69+
Arrays.asList(
70+
EntitySpec.newBuilder().setName("bytes_entity").setValueType(BYTES).build(),
71+
EntitySpec.newBuilder().setName("string_entity").setValueType(STRING).build(),
72+
EntitySpec.newBuilder().setName("int32_entity").setValueType(INT32).build(),
73+
EntitySpec.newBuilder().setName("int64_entity").setValueType(INT64).build(),
74+
EntitySpec.newBuilder().setName("double_entity").setValueType(DOUBLE).build(),
75+
EntitySpec.newBuilder().setName("float_entity").setValueType(FLOAT).build(),
76+
EntitySpec.newBuilder().setName("bool_entity").setValueType(BOOL).build(),
77+
EntitySpec.newBuilder()
78+
.setName("bytes_list_entity")
79+
.setValueType(BYTES_LIST)
80+
.build(),
81+
EntitySpec.newBuilder()
82+
.setName("string_list_entity")
83+
.setValueType(STRING_LIST)
84+
.build(),
85+
EntitySpec.newBuilder()
86+
.setName("int32_list_entity")
87+
.setValueType(INT32_LIST)
88+
.build(),
89+
EntitySpec.newBuilder()
90+
.setName("int64_list_entity")
91+
.setValueType(INT64_LIST)
92+
.build(),
93+
EntitySpec.newBuilder()
94+
.setName("double_list_entity")
95+
.setValueType(DOUBLE_LIST)
96+
.build(),
97+
EntitySpec.newBuilder()
98+
.setName("float_list_entity")
99+
.setValueType(FLOAT_LIST)
100+
.build(),
101+
EntitySpec.newBuilder()
102+
.setName("bool_list_entity")
103+
.setValueType(BOOL_LIST)
104+
.build()))
105+
.addAllFeatures(
106+
Arrays.asList(
107+
FeatureSpec.newBuilder().setName("bytes_feature").setValueType(BYTES).build(),
108+
FeatureSpec.newBuilder().setName("string_feature").setValueType(STRING).build(),
109+
FeatureSpec.newBuilder().setName("int32_feature").setValueType(INT32).build(),
110+
FeatureSpec.newBuilder().setName("int64_feature").setValueType(INT64).build(),
111+
FeatureSpec.newBuilder().setName("double_feature").setValueType(DOUBLE).build(),
112+
FeatureSpec.newBuilder().setName("float_feature").setValueType(FLOAT).build(),
113+
FeatureSpec.newBuilder().setName("bool_feature").setValueType(BOOL).build(),
114+
FeatureSpec.newBuilder()
115+
.setName("bytes_list_feature")
116+
.setValueType(BYTES_LIST)
117+
.build(),
118+
FeatureSpec.newBuilder()
119+
.setName("string_list_feature")
120+
.setValueType(STRING_LIST)
121+
.build(),
122+
FeatureSpec.newBuilder()
123+
.setName("int32_list_feature")
124+
.setValueType(INT32_LIST)
125+
.build(),
126+
FeatureSpec.newBuilder()
127+
.setName("int64_list_feature")
128+
.setValueType(INT64_LIST)
129+
.build(),
130+
FeatureSpec.newBuilder()
131+
.setName("double_list_feature")
132+
.setValueType(DOUBLE_LIST)
133+
.build(),
134+
FeatureSpec.newBuilder()
135+
.setName("float_list_feature")
136+
.setValueType(FLOAT_LIST)
137+
.build(),
138+
FeatureSpec.newBuilder()
139+
.setName("bool_list_feature")
140+
.setValueType(BOOL_LIST)
141+
.build()))
142+
.build();
143+
144+
Schema actual = StoreUtil.createBigQueryTableDefinition(input).getSchema();
145+
146+
Schema expected =
147+
Schema.of(
148+
Arrays.asList(
149+
// Fields from entity
150+
Field.newBuilder("bytes_entity", StandardSQLTypeName.BYTES).build(),
151+
Field.newBuilder("string_entity", StandardSQLTypeName.STRING).build(),
152+
Field.newBuilder("int32_entity", StandardSQLTypeName.INT64).build(),
153+
Field.newBuilder("int64_entity", StandardSQLTypeName.INT64).build(),
154+
Field.newBuilder("double_entity", StandardSQLTypeName.FLOAT64).build(),
155+
Field.newBuilder("float_entity", StandardSQLTypeName.FLOAT64).build(),
156+
Field.newBuilder("bool_entity", StandardSQLTypeName.BOOL).build(),
157+
Field.newBuilder("bytes_list_entity", StandardSQLTypeName.BYTES)
158+
.setMode(Mode.REPEATED)
159+
.build(),
160+
Field.newBuilder("string_list_entity", StandardSQLTypeName.STRING)
161+
.setMode(Mode.REPEATED)
162+
.build(),
163+
Field.newBuilder("int32_list_entity", StandardSQLTypeName.INT64)
164+
.setMode(Mode.REPEATED)
165+
.build(),
166+
Field.newBuilder("int64_list_entity", StandardSQLTypeName.INT64)
167+
.setMode(Mode.REPEATED)
168+
.build(),
169+
Field.newBuilder("double_list_entity", StandardSQLTypeName.FLOAT64)
170+
.setMode(Mode.REPEATED)
171+
.build(),
172+
Field.newBuilder("float_list_entity", StandardSQLTypeName.FLOAT64)
173+
.setMode(Mode.REPEATED)
174+
.build(),
175+
Field.newBuilder("bool_list_entity", StandardSQLTypeName.BOOL)
176+
.setMode(Mode.REPEATED)
177+
.build(),
178+
// Fields from feature
179+
Field.newBuilder("bytes_feature", StandardSQLTypeName.BYTES).build(),
180+
Field.newBuilder("string_feature", StandardSQLTypeName.STRING).build(),
181+
Field.newBuilder("int32_feature", StandardSQLTypeName.INT64).build(),
182+
Field.newBuilder("int64_feature", StandardSQLTypeName.INT64).build(),
183+
Field.newBuilder("double_feature", StandardSQLTypeName.FLOAT64).build(),
184+
Field.newBuilder("float_feature", StandardSQLTypeName.FLOAT64).build(),
185+
Field.newBuilder("bool_feature", StandardSQLTypeName.BOOL).build(),
186+
Field.newBuilder("bytes_list_feature", StandardSQLTypeName.BYTES)
187+
.setMode(Mode.REPEATED)
188+
.build(),
189+
Field.newBuilder("string_list_feature", StandardSQLTypeName.STRING)
190+
.setMode(Mode.REPEATED)
191+
.build(),
192+
Field.newBuilder("int32_list_feature", StandardSQLTypeName.INT64)
193+
.setMode(Mode.REPEATED)
194+
.build(),
195+
Field.newBuilder("int64_list_feature", StandardSQLTypeName.INT64)
196+
.setMode(Mode.REPEATED)
197+
.build(),
198+
Field.newBuilder("double_list_feature", StandardSQLTypeName.FLOAT64)
199+
.setMode(Mode.REPEATED)
200+
.build(),
201+
Field.newBuilder("float_list_feature", StandardSQLTypeName.FLOAT64)
202+
.setMode(Mode.REPEATED)
203+
.build(),
204+
Field.newBuilder("bool_list_feature", StandardSQLTypeName.BOOL)
205+
.setMode(Mode.REPEATED)
206+
.build(),
207+
// Reserved fields
208+
Field.newBuilder("event_timestamp", StandardSQLTypeName.TIMESTAMP)
209+
.setDescription(StoreUtil.BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION)
210+
.build(),
211+
Field.newBuilder("created_timestamp", StandardSQLTypeName.TIMESTAMP)
212+
.setDescription(StoreUtil.BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION)
213+
.build(),
214+
Field.newBuilder("job_id", StandardSQLTypeName.STRING)
215+
.setDescription(StoreUtil.BIGQUERY_JOB_ID_FIELD_DESCRIPTION)
216+
.build()));
217+
218+
Assert.assertEquals(expected, actual);
219+
}
44220
}

0 commit comments

Comments
 (0)