diff --git a/core/src/main/java/feast/core/config/AppConfig.java b/core/src/main/java/feast/core/config/AppConfig.java index 05e0278967f..76dd6dcc227 100644 --- a/core/src/main/java/feast/core/config/AppConfig.java +++ b/core/src/main/java/feast/core/config/AppConfig.java @@ -32,7 +32,9 @@ public ImportJobDefaults getImportJobDefaults( @Value("${feast.jobs.runner}") String runner, @Value("${feast.jobs.options}") String options, @Value("${feast.jobs.executable}") String executable, - @Value("${feast.jobs.errorsStoreId}") String errorsStoreId) { - return new ImportJobDefaults(coreApiUri, runner, options, executable, errorsStoreId); + @Value("${feast.jobs.errorsStoreType}") String errorsStoreType, + @Value("${feast.jobs.errorsStoreOptions}") String errorsStoreOptions) { + return new ImportJobDefaults( + coreApiUri, runner, options, executable, errorsStoreType, errorsStoreOptions); } } diff --git a/core/src/main/java/feast/core/config/ImportJobDefaults.java b/core/src/main/java/feast/core/config/ImportJobDefaults.java index d120cd9902b..989777da448 100644 --- a/core/src/main/java/feast/core/config/ImportJobDefaults.java +++ b/core/src/main/java/feast/core/config/ImportJobDefaults.java @@ -32,6 +32,7 @@ public class ImportJobDefaults { private String runner; private String importJobOptions; private String executable; - private String errorsStoreId; + private String errorsStoreType; + private String errorsStoreOptions; } diff --git a/core/src/main/java/feast/core/service/JobExecutionService.java b/core/src/main/java/feast/core/service/JobExecutionService.java index b5259eff1ba..d734a8aae85 100644 --- a/core/src/main/java/feast/core/service/JobExecutionService.java +++ b/core/src/main/java/feast/core/service/JobExecutionService.java @@ -28,24 +28,26 @@ import feast.core.model.JobStatus; import feast.core.util.TypeConversion; import feast.specs.ImportSpecProto.ImportSpec; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - import java.io.BufferedReader; import java.io.InputStreamReader; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.regex.Pattern; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; @Slf4j @Service public class JobExecutionService { - private static final int SLEEP_MS = 10; - private static final Pattern JOB_EXT_ID_PREFIX_REGEX = Pattern.compile(".*FeastImportJobId:.*"); public static final String JOB_PREFIX_DEFAULT = "feastimport"; - + private static final int SLEEP_MS = 10; + private static final Pattern JOB_EXT_ID_PREFIX_REGEX = Pattern.compile(".*FeastImportJobId:.*"); private JobInfoRepository jobInfoRepository; private ImportJobDefaults defaults; @@ -106,9 +108,6 @@ public SubmitImportJobResponse submitJob(ImportSpec importSpec, String jobPrefix /** * Update a given job's status - * - * @param jobId - * @param status */ public void updateJobStatus(String jobId, JobStatus status) { Optional jobRecordOptional = jobInfoRepository.findById(jobId); @@ -121,9 +120,6 @@ public void updateJobStatus(String jobId, JobStatus status) { /** * Update a given job's external id - * - * @param jobId - * @param jobExtId */ public void updateJobExtId(String jobId, String jobExtId) { Optional jobRecordOptional = jobInfoRepository.findById(jobId); @@ -137,8 +133,6 @@ public void updateJobExtId(String jobId, String jobExtId) { /** * Builds the command to execute the ingestion job * - * @param importSpec - * @param jobId * @return configured ProcessBuilder */ public ProcessBuilder getProcessBuilder(ImportSpec importSpec, String jobId) { @@ -153,7 +147,8 @@ public ProcessBuilder getProcessBuilder(ImportSpec importSpec, String jobId) { commands.add( option("importSpecBase64", Base64.getEncoder().encodeToString(importSpec.toByteArray()))); commands.add(option("coreApiUri", defaults.getCoreApiUri())); - commands.add(option("errorsStoreId", defaults.getErrorsStoreId())); + commands.add(option("errorsStoreType", defaults.getErrorsStoreType())); + commands.add(option("errorsStoreOptions", defaults.getErrorsStoreOptions())); options.forEach((k, v) -> commands.add(option(k, v))); return new ProcessBuilder(commands); } @@ -170,7 +165,7 @@ private String option(String key, String value) { */ public String runProcess(Process p) { try (BufferedReader outputStream = - new BufferedReader(new InputStreamReader(p.getInputStream())); + new BufferedReader(new InputStreamReader(p.getInputStream())); BufferedReader errorsStream = new BufferedReader(new InputStreamReader(p.getErrorStream()))) { String extId = ""; diff --git a/core/src/main/resources/application.properties b/core/src/main/resources/application.properties index fd0315322f8..ac8fb3534fe 100644 --- a/core/src/main/resources/application.properties +++ b/core/src/main/resources/application.properties @@ -20,7 +20,8 @@ feast.jobs.coreUri=${CORE_API_URI:localhost:8433} feast.jobs.runner=${JOB_RUNNER:DirectRunner} feast.jobs.options=${JOB_OPTIONS:{}} feast.jobs.executable=${JOB_EXECUTABLE:feast-ingestion.jar} -feast.jobs.errorsStoreId=${JOB_ERRORS_STORE_ID:STDOUT} +feast.jobs.errorsStoreType=${JOB_ERRORS_STORE_TYPE:stdout} +feast.jobs.errorsStoreOptions=${JOB_ERRORS_STORE_OPTIONS:{}} feast.jobs.dataflow.projectId = ${DATAFLOW_PROJECT_ID:} feast.jobs.dataflow.location = ${DATAFLOW_LOCATION:} diff --git a/core/src/test/java/feast/core/service/JobExecutionServiceTest.java b/core/src/test/java/feast/core/service/JobExecutionServiceTest.java index 7bd72a7e342..ad29bb8b9e0 100644 --- a/core/src/test/java/feast/core/service/JobExecutionServiceTest.java +++ b/core/src/test/java/feast/core/service/JobExecutionServiceTest.java @@ -17,12 +17,25 @@ package feast.core.service; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; +import static org.mockito.internal.verification.VerificationModeFactory.times; + import com.google.common.collect.Lists; import feast.core.config.ImportJobDefaults; import feast.core.dao.JobInfoRepository; import feast.core.model.JobInfo; import feast.core.model.JobStatus; import feast.specs.ImportSpecProto.ImportSpec; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Optional; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -31,25 +44,13 @@ import org.mockito.Mock; import org.mockito.Mockito; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Optional; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; -import static org.mockito.internal.verification.VerificationModeFactory.times; - public class JobExecutionServiceTest { - private ImportJobDefaults defaults; - @Mock JobInfoRepository jobInfoRepository; - @Rule public final ExpectedException expectedException = ExpectedException.none(); + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + @Mock + JobInfoRepository jobInfoRepository; + private ImportJobDefaults defaults; @Before public void setUp() { @@ -60,7 +61,8 @@ public void setUp() { "DirectRunner", "{\"key\":\"value\"}", "ingestion.jar", - "STDOUT"); + "STDOUT", + "{}"); } @Test @@ -77,7 +79,8 @@ public void shouldBuildProcessBuilderWithCorrectOptions() { "--runner=DirectRunner", "--importSpecBase64=CgRmaWxl", "--coreApiUri=localhost:8080", - "--errorsStoreId=STDOUT", + "--errorsStoreType=STDOUT", + "--errorsStoreOptions={}", "--key=value"); assertThat(pb.command(), equalTo(expected)); } diff --git a/ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java b/ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java index 2f2f5d9feb4..ce3bb622072 100644 --- a/ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java +++ b/ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java @@ -37,7 +37,9 @@ import java.util.List; import org.apache.beam.sdk.options.PipelineOptions; -/** An ImportJobModule is a Guice module for creating dependency injection bindings. */ +/** + * An ImportJobModule is a Guice module for creating dependency injection bindings. + */ public class ImportJobModule extends AbstractModule { private final ImportJobOptions options; diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportJobOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportJobOptions.java index 8ec34df5251..6da7d0f726f 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportJobOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportJobOptions.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.options.Validation.Required; public interface ImportJobOptions extends PipelineOptions { + @Description("Import spec yaml file path") @Required(groups = {"importSpec"}) String getImportSpecYamlFile(); @@ -72,14 +73,23 @@ public interface ImportJobOptions extends PipelineOptions { void setLimit(Long value); @Description( - "Set a store id to store errors in, if your data input is **very** small, you can use STDOUT" - + " or STDERR as the store id, otherwise it must match an associated storage spec") - String getErrorsStoreId(); + "Set an errors store type. One of: [stderr, stdout, file.json]. Note that you should not use " + + "stderr/stdout in production unless your data volume is extremely small.") + String getErrorsStoreType(); + + void setErrorsStoreType(String value); - void setErrorsStoreId(String value); + @Description( + "Provide errors store options as a json string containing key-values. Options required" + + "depend on the type of store set.") + @Default.String("{}") + String getErrorsStoreOptions(); + + void setErrorsStoreOptions(String value); @AutoService(PipelineOptionsRegistrar.class) class ImportJobOptionsRegistrar implements PipelineOptionsRegistrar { + @Override public Iterable> getPipelineOptions() { return Collections.singleton(ImportJobOptions.class); diff --git a/ingestion/src/main/java/feast/ingestion/transform/ErrorsStoreTransform.java b/ingestion/src/main/java/feast/ingestion/transform/ErrorsStoreTransform.java index ce1d4531eab..4147d964b03 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ErrorsStoreTransform.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ErrorsStoreTransform.java @@ -17,14 +17,9 @@ package feast.ingestion.transform; -import static com.google.common.base.Preconditions.checkNotNull; +import static feast.ingestion.util.JsonUtil.convertJsonStringToMap; import com.google.inject.Inject; -import java.util.List; -import lombok.extern.slf4j.Slf4j; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; import feast.ingestion.model.Specs; import feast.ingestion.options.ImportJobOptions; import feast.ingestion.transform.FeatureIO.Write; @@ -33,54 +28,60 @@ import feast.storage.ErrorsStore; import feast.storage.noop.NoOpIO; import feast.types.FeatureRowExtendedProto.FeatureRowExtended; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; import org.slf4j.event.Level; @Slf4j public class ErrorsStoreTransform extends FeatureIO.Write { - public static final String STDERR_STORE_ID = "STDERR"; - public static final String STDOUT_STORE_ID = "STDOUT"; - private String errorsStoreId; - private List stores; + public static final String ERRORS_STORE_STDERR = "stderr"; + public static final String ERRORS_STORE_STDOUT = "stdout"; + public static final String ERRORS_STORE_JSON = "file.json"; + + private String errorsStoreType; + private StorageSpec errorsStoreSpec; + private ErrorsStore errorsStore; private Specs specs; @Inject - public ErrorsStoreTransform(ImportJobOptions options, List stores, Specs specs) { - this.errorsStoreId = options.getErrorsStoreId(); - this.stores = stores; + public ErrorsStoreTransform( + ImportJobOptions options, Specs specs, List errorsStores) { this.specs = specs; + this.errorsStoreType = options.getErrorsStoreType(); + + for (ErrorsStore errorsStore : errorsStores) { + if (errorsStore.getType().equals(errorsStoreType)) { + this.errorsStore = errorsStore; + } + } + + this.errorsStoreSpec = + StorageSpec.newBuilder() + .setType(errorsStoreType) + .putAllOptions(convertJsonStringToMap(options.getErrorsStoreOptions())) + .build(); } @Override public PDone expand(PCollection input) { - if (errorsStoreId == null) { - log.warn("No errorsStoreId specified, errors will be discarded"); - return input.apply(new NoOpIO.Write()); - } - - if (errorsStoreId.equals(STDOUT_STORE_ID)) { - input.apply("Log errors to STDOUT", ParDo.of(new LoggerDoFn(Level.INFO))); - } else if (errorsStoreId.equals(STDERR_STORE_ID)) { - input.apply("Log errors to STDERR", ParDo.of(new LoggerDoFn(Level.ERROR))); - } else { - StorageSpec storageSpec = specs.getStorageSpec(errorsStoreId); - storageSpec = - checkNotNull( - storageSpec, - String.format("errorsStoreId=%s not found in storage specs", errorsStoreId)); - Write write = null; - for (ErrorsStore errorsStore : stores) { - if (errorsStore.getType().equals(storageSpec.getType())) { - write = errorsStore.create(storageSpec, specs); + switch (errorsStoreType) { + case ERRORS_STORE_STDOUT: + input.apply("Log errors to STDOUT", ParDo.of(new LoggerDoFn(Level.INFO))); + break; + case ERRORS_STORE_STDERR: + input.apply("Log errors to STDERR", ParDo.of(new LoggerDoFn(Level.ERROR))); + break; + default: + if (errorsStore == null) { + log.warn("No valid errors store specified, errors will be discarded"); + return input.apply(new NoOpIO.Write()); } - } - write = - checkNotNull( - write, - "No errors storage factory found for errorsStoreId=%s with type=%s", - errorsStoreId, - storageSpec.getType()); - return input.apply(write); + Write write = errorsStore.create(this.errorsStoreSpec, specs); + return input.apply(write); } return PDone.in(input.getPipeline()); } diff --git a/ingestion/src/main/java/feast/ingestion/util/JsonUtil.java b/ingestion/src/main/java/feast/ingestion/util/JsonUtil.java new file mode 100644 index 00000000000..b142c013f7d --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/util/JsonUtil.java @@ -0,0 +1,27 @@ +package feast.ingestion.util; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import java.lang.reflect.Type; +import java.util.Collections; +import java.util.Map; + +public class JsonUtil { + + private static Gson gson = new Gson(); + + /** + * Unmarshals a given json string to map + * + * @param jsonString valid json formatted string + * @return map of keys to values in json + */ + public static Map convertJsonStringToMap(String jsonString) { + if (jsonString == null || jsonString.equals("") || jsonString.equals("{}")) { + return Collections.emptyMap(); + } + Type stringMapType = new TypeToken>() { + }.getType(); + return gson.fromJson(jsonString, stringMapType); + } +} diff --git a/ingestion/src/main/java/feast/storage/service/ErrorsStoreService.java b/ingestion/src/main/java/feast/storage/service/ErrorsStoreService.java index 03ea2db68e4..0f2e8a30e09 100644 --- a/ingestion/src/main/java/feast/storage/service/ErrorsStoreService.java +++ b/ingestion/src/main/java/feast/storage/service/ErrorsStoreService.java @@ -19,14 +19,16 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import feast.storage.ErrorsStore; +import lombok.extern.slf4j.Slf4j; + import java.util.ArrayList; import java.util.List; import java.util.ServiceLoader; -import lombok.extern.slf4j.Slf4j; -import feast.storage.ErrorsStore; @Slf4j public class ErrorsStoreService { + private static ServiceLoader serviceLoader = ServiceLoader.load(ErrorsStore.class); private static List manuallyRegistered = new ArrayList<>(); diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java index 47011f5d9ad..672838e73b1 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java @@ -17,11 +17,6 @@ package feast.ingestion; -import static feast.FeastMatchers.hasCount; -import static feast.ToOrderedFeatureRows.orderedFeatureRow; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - import com.google.common.base.Charsets; import com.google.common.collect.Lists; import com.google.common.io.Files; @@ -46,11 +41,6 @@ import feast.types.FeatureRowExtendedProto.FeatureRowExtended; import feast.types.FeatureRowProto.FeatureRow; import feast.types.GranularityProto.Granularity; -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -62,16 +52,26 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +import static feast.FeastMatchers.hasCount; +import static feast.ToOrderedFeatureRows.orderedFeatureRow; +import static feast.storage.MockErrorsStore.MOCK_ERRORS_STORE_TYPE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + @Slf4j public class ImportJobCSVTest { - @Rule - public TemporaryFolder folder = new TemporaryFolder(); + @Rule public TemporaryFolder folder = new TemporaryFolder(); - @Rule - public TestPipeline testPipeline = TestPipeline.create(); + @Rule public TestPipeline testPipeline = TestPipeline.create(); - public ImportSpec initImportSpec(ImportSpec importSpec, String dataFile) throws IOException { + public ImportSpec initImportSpec(ImportSpec importSpec, String dataFile) { return importSpec.toBuilder().putOptions("path", dataFile).build(); } @@ -79,7 +79,7 @@ public ImportJobOptions initOptions() { Path path = Paths.get(Resources.getResource("core_specs/").getPath()); ImportJobOptions options = PipelineOptionsFactory.create().as(ImportJobOptions.class); options.setCoreApiSpecPath(path.toString()); - options.setErrorsStoreId("TEST_ERRORS"); + options.setErrorsStoreType(MOCK_ERRORS_STORE_TYPE); return options; } @@ -109,6 +109,7 @@ public void testImportCSV() throws IOException { importSpec = initImportSpec(importSpec, csvFile.toString()); ImportJobOptions options = initOptions(); + options.setErrorsStoreType(MOCK_ERRORS_STORE_TYPE); Injector injector = Guice.createInjector( @@ -124,7 +125,7 @@ public void testImportCSV() throws IOException { PCollection writtenToWarehouse = PCollectionList.of( - WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs()) + WarehouseStoreService.get(MockWarehouseStore.class).getWrite().getInputs()) .apply("flatten warehouse input", Flatten.pCollections()); PCollection writtenToErrors = @@ -246,12 +247,14 @@ public void testImportWithErrors() throws IOException { importSpec = initImportSpec(importSpec, csvFile.toString()); ImportJobOptions options = initOptions(); + options.setErrorsStoreType(MOCK_ERRORS_STORE_TYPE); Injector injector = Guice.createInjector( new ImportJobModule(options, importSpec), new TestPipelineModule(testPipeline)); ImportJob job = injector.getInstance(ImportJob.class); + injector.getInstance(ImportJob.class); job.expand(); @@ -268,7 +271,6 @@ public void testImportWithErrors() throws IOException { (errors) -> { int i = 0; for (FeatureRowExtended row : errors) { - log.error(row.toString()); assertEquals( row.getLastAttempt().getError().getCause(), "feast.ingestion.exceptions.TypeConversionException"); diff --git a/ingestion/src/test/java/feast/ingestion/transform/ErrorsStoreTransformTest.java b/ingestion/src/test/java/feast/ingestion/transform/ErrorsStoreTransformTest.java new file mode 100644 index 00000000000..cc666841221 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/transform/ErrorsStoreTransformTest.java @@ -0,0 +1,85 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.ingestion.transform; + +import static feast.ingestion.model.Errors.toError; +import static feast.storage.MockErrorsStore.MOCK_ERRORS_STORE_TYPE; + +import feast.ingestion.model.Specs; +import feast.ingestion.options.ImportJobOptions; +import feast.storage.MockErrorsStore; +import feast.types.FeatureRowExtendedProto.Attempt; +import feast.types.FeatureRowExtendedProto.Error; +import feast.types.FeatureRowExtendedProto.FeatureRowExtended; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.assertj.core.util.Lists; +import org.junit.Before; +import org.junit.Test; + +public class ErrorsStoreTransformTest { + + private ImportJobOptions options; + private Specs specs; + private PCollection inputs; + private List errors; + + @Before + public void setUp() { + options = PipelineOptionsFactory.create().as(ImportJobOptions.class); + options.setJobName("test"); + specs = Specs.builder().jobName("test").build(); + + Pipeline p = TestPipeline.create(); + errors = + Arrays.asList( + errorOf("test", new Exception("err")), errorOf("test", new Exception("err2"))); + inputs = p.apply(Create.of(errors)).setCoder(ProtoCoder.of(FeatureRowExtended.class)); + } + + private FeatureRowExtended errorOf(String transform, Throwable cause) { + Error error = toError(transform, cause); + return FeatureRowExtended.newBuilder() + .setLastAttempt(Attempt.newBuilder().setError(error).build()) + .build(); + } + + @Test + public void shouldWriteToGivenErrorsStore() { + MockErrorsStore mockStore = new MockErrorsStore(); + options.setErrorsStoreType(MOCK_ERRORS_STORE_TYPE); + ErrorsStoreTransform transform = + new ErrorsStoreTransform(options, specs, Lists.newArrayList(mockStore)); + transform.expand(inputs); + + PCollection writtenToErrors = + PCollectionList.of(mockStore.getWrite().getInputs()) + .apply("flatten errors input", Flatten.pCollections()); + + PAssert.that(writtenToErrors).containsInAnyOrder(errors); + } +} diff --git a/ingestion/src/test/java/feast/ingestion/util/JsonUtilTest.java b/ingestion/src/test/java/feast/ingestion/util/JsonUtilTest.java new file mode 100644 index 00000000000..ee91f120463 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/util/JsonUtilTest.java @@ -0,0 +1,44 @@ +/* + * Copyright 2018 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package feast.ingestion.util; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.junit.Test; + +public class JsonUtilTest { + + @Test + public void convertJsonStringToMapShouldConvertJsonStringToMap() { + String input = "{\"key\": \"value\"}"; + Map expected = new HashMap<>(); + expected.put("key", "value"); + assertThat(JsonUtil.convertJsonStringToMap(input), equalTo(expected)); + } + + @Test + public void convertJsonStringToMapShouldReturnEmptyMapForEmptyJson() { + String input = "{}"; + Map expected = Collections.emptyMap(); + assertThat(JsonUtil.convertJsonStringToMap(input), equalTo(expected)); + } +} \ No newline at end of file diff --git a/ingestion/src/test/java/feast/storage/MockErrorsStore.java b/ingestion/src/test/java/feast/storage/MockErrorsStore.java index 9abaf33208d..e83ba5641e7 100644 --- a/ingestion/src/test/java/feast/storage/MockErrorsStore.java +++ b/ingestion/src/test/java/feast/storage/MockErrorsStore.java @@ -21,7 +21,7 @@ @AutoService(ErrorsStore.class) public class MockErrorsStore extends MockFeatureStore implements ErrorsStore { - public static final String MOCK_ERRORS_STORE_TYPE = "MOCK_ERRORS_STORE"; + public static final String MOCK_ERRORS_STORE_TYPE = "errors.mock"; public MockErrorsStore() { super(MOCK_ERRORS_STORE_TYPE); diff --git a/ingestion/src/test/java/feast/storage/MockServingStore.java b/ingestion/src/test/java/feast/storage/MockServingStore.java index e52ca046e74..2c60bcbf784 100644 --- a/ingestion/src/test/java/feast/storage/MockServingStore.java +++ b/ingestion/src/test/java/feast/storage/MockServingStore.java @@ -21,7 +21,7 @@ @AutoService(ServingStore.class) public class MockServingStore extends MockFeatureStore implements ServingStore { - public static final String MOCK_SERVING_STORE_TYPE = "MOCK_SERVING_STORE"; + public static final String MOCK_SERVING_STORE_TYPE = "serving.mock"; public MockServingStore() { super(MOCK_SERVING_STORE_TYPE); diff --git a/ingestion/src/test/java/feast/storage/MockWarehouseStore.java b/ingestion/src/test/java/feast/storage/MockWarehouseStore.java index 452a2729df1..dbe9ab8caf5 100644 --- a/ingestion/src/test/java/feast/storage/MockWarehouseStore.java +++ b/ingestion/src/test/java/feast/storage/MockWarehouseStore.java @@ -21,7 +21,7 @@ @AutoService(WarehouseStore.class) public class MockWarehouseStore extends MockFeatureStore implements WarehouseStore { - public static final String MOCK_WAREHOUSE_STORE_TYPE = "MOCK_WAREHOUSE_STORE"; + public static final String MOCK_WAREHOUSE_STORE_TYPE = "warehouse.mock"; public MockWarehouseStore() { super(MOCK_WAREHOUSE_STORE_TYPE); diff --git a/ingestion/src/test/resources/core_specs/storage/TEST_ERRORS.json b/ingestion/src/test/resources/core_specs/storage/TEST_ERRORS.json deleted file mode 100644 index 9f5c3dc4a0a..00000000000 --- a/ingestion/src/test/resources/core_specs/storage/TEST_ERRORS.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "id": "TEST_ERRORS", - "type": "MOCK_ERRORS_STORE", - "options": {} -} \ No newline at end of file diff --git a/ingestion/src/test/resources/core_specs/storage/TEST_SERVING.json b/ingestion/src/test/resources/core_specs/storage/TEST_SERVING.json index f354728a0e7..a8520839d61 100644 --- a/ingestion/src/test/resources/core_specs/storage/TEST_SERVING.json +++ b/ingestion/src/test/resources/core_specs/storage/TEST_SERVING.json @@ -1,5 +1,5 @@ { "id": "TEST_SERVING", - "type": "MOCK_SERVING_STORE", + "type": "serving.mock", "options": {} } diff --git a/ingestion/src/test/resources/core_specs/storage/TEST_WAREHOUSE.json b/ingestion/src/test/resources/core_specs/storage/TEST_WAREHOUSE.json index 8e2013cb55f..13e57513983 100644 --- a/ingestion/src/test/resources/core_specs/storage/TEST_WAREHOUSE.json +++ b/ingestion/src/test/resources/core_specs/storage/TEST_WAREHOUSE.json @@ -1,5 +1,5 @@ { "id": "TEST_WAREHOUSE", - "type": "MOCK_WAREHOUSE_STORE", + "type": "warehouse.mock", "options": {} } \ No newline at end of file