Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/java/feast/core/config/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public ImportJobDefaults getImportJobDefaults(
@Value("${feast.jobs.runner}") String runner,
@Value("${feast.jobs.options}") String options,
@Value("${feast.jobs.executable}") String executable,
@Value("${feast.jobs.errorsStoreId}") String errorsStoreId) {
return new ImportJobDefaults(coreApiUri, runner, options, executable, errorsStoreId);
@Value("${feast.jobs.errorsStoreType}") String errorsStoreType,
@Value("${feast.jobs.errorsStoreOptions}") String errorsStoreOptions) {
return new ImportJobDefaults(
coreApiUri, runner, options, executable, errorsStoreType, errorsStoreOptions);
}
}
3 changes: 2 additions & 1 deletion core/src/main/java/feast/core/config/ImportJobDefaults.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ImportJobDefaults {
private String runner;
private String importJobOptions;
private String executable;
private String errorsStoreId;
private String errorsStoreType;
private String errorsStoreOptions;
}

Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public ProcessBuilder getProcessBuilder(ImportSpec importSpec, String jobId) {
commands.add(
option("importSpecBase64", Base64.getEncoder().encodeToString(importSpec.toByteArray())));
commands.add(option("coreApiUri", defaults.getCoreApiUri()));
commands.add(option("errorsStoreId", defaults.getErrorsStoreId()));
commands.add(option("errorsStoreType", defaults.getErrorsStoreType()));
commands.add(option("errorsStoreOptions", defaults.getErrorsStoreOptions()));
options.forEach((k, v) -> commands.add(option(k, v)));
return new ProcessBuilder(commands);
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ feast.jobs.coreUri=${CORE_API_URI:localhost:8433}
feast.jobs.runner=${JOB_RUNNER:DirectRunner}
feast.jobs.options=${JOB_OPTIONS:{}}
feast.jobs.executable=${JOB_EXECUTABLE:feast-ingestion.jar}
feast.jobs.errorsStoreId=${JOB_ERRORS_STORE_ID:STDOUT}
feast.jobs.errorsStoreType=${JOB_ERRORS_STORE_TYPE:stdout}
feast.jobs.errorsStoreOptions=${JOB_ERRORS_STORE_OPTIONS:{}}

feast.jobs.dataflow.projectId = ${DATAFLOW_PROJECT_ID:}
feast.jobs.dataflow.location = ${DATAFLOW_LOCATION:}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public void setUp() {
"DirectRunner",
"{\"key\":\"value\"}",
"ingestion.jar",
"STDOUT");
"STDOUT",
"{}");
}

@Test
Expand All @@ -77,7 +78,8 @@ public void shouldBuildProcessBuilderWithCorrectOptions() {
"--runner=DirectRunner",
"--importSpecBase64=CgRmaWxl",
"--coreApiUri=localhost:8080",
"--errorsStoreId=STDOUT",
"--errorsStoreType=STDOUT",
"--errorsStoreOptions={}",
"--key=value");
assertThat(pb.command(), equalTo(expected));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@
import feast.storage.ErrorsStore;
import feast.storage.ServingStore;
import feast.storage.WarehouseStore;
import feast.storage.file.json.JsonFileStores;
import feast.storage.service.ErrorsStoreService;
import feast.storage.service.ServingStoreService;
import feast.storage.service.WarehouseStoreService;
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;

import static feast.ingestion.transform.ErrorsStoreTransform.ERRORS_STORE_JSON;

/** An ImportJobModule is a Guice module for creating dependency injection bindings. */
public class ImportJobModule extends AbstractModule {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,19 @@ public interface ImportJobOptions extends PipelineOptions {
void setLimit(Long value);

@Description(
"Set a store id to store errors in, if your data input is **very** small, you can use STDOUT"
+ " or STDERR as the store id, otherwise it must match an associated storage spec")
String getErrorsStoreId();
"Set an errors store type. One of: [STDERR, STDOUT, JSON]. Note that you should not use "
Comment thread
zhilingc marked this conversation as resolved.
Outdated
+ "STDERR/STDOUT in production unless your data volume is extremely small.")
String getErrorsStoreType();

void setErrorsStoreId(String value);
void setErrorsStoreType(String value);

@Description(
"Provide errors store options as a json string containing key-values. Options required"
+ "depend on the type of store set.")
@Default.String("{}")
String getErrorsStoreOptions();

void setErrorsStoreOptions(String value);

@AutoService(PipelineOptionsRegistrar.class)
class ImportJobOptionsRegistrar implements PipelineOptionsRegistrar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,7 @@

package feast.ingestion.transform;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.inject.Inject;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import feast.ingestion.model.Specs;
import feast.ingestion.options.ImportJobOptions;
import feast.ingestion.transform.FeatureIO.Write;
Expand All @@ -33,54 +26,62 @@
import feast.storage.ErrorsStore;
import feast.storage.noop.NoOpIO;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.slf4j.event.Level;

import java.util.List;

import static feast.ingestion.util.JsonUtil.convertJsonStringToMap;

@Slf4j
public class ErrorsStoreTransform extends FeatureIO.Write {
public static final String STDERR_STORE_ID = "STDERR";
public static final String STDOUT_STORE_ID = "STDOUT";
public static final String ERRORS_STORE_STDERR = "stderr";
public static final String ERRORS_STORE_STDOUT = "stdout";
public static final String ERRORS_STORE_JSON = "file.json";

private String errorsStoreId;
private List<ErrorsStore> stores;
private String errorsStoreType;
private StorageSpec errorsStoreSpec;
private ErrorsStore errorsStore;
private Specs specs;

@Inject
public ErrorsStoreTransform(ImportJobOptions options, List<ErrorsStore> stores, Specs specs) {
this.errorsStoreId = options.getErrorsStoreId();
this.stores = stores;
public ErrorsStoreTransform(
ImportJobOptions options, Specs specs, List<ErrorsStore> errorsStores) {
this.specs = specs;
this.errorsStoreType = options.getErrorsStoreType();

for (ErrorsStore errorsStore : errorsStores) {
if (errorsStore.getType().equals(errorsStoreType)) {
this.errorsStore = errorsStore;
}
}

this.errorsStoreSpec =
StorageSpec.newBuilder()
.setType(errorsStoreType)
.putAllOptions(convertJsonStringToMap(options.getErrorsStoreOptions()))
.build();
}

@Override
public PDone expand(PCollection<FeatureRowExtended> input) {
if (errorsStoreId == null) {
log.warn("No errorsStoreId specified, errors will be discarded");
return input.apply(new NoOpIO.Write());
}

if (errorsStoreId.equals(STDOUT_STORE_ID)) {
input.apply("Log errors to STDOUT", ParDo.of(new LoggerDoFn(Level.INFO)));
} else if (errorsStoreId.equals(STDERR_STORE_ID)) {
input.apply("Log errors to STDERR", ParDo.of(new LoggerDoFn(Level.ERROR)));
} else {
StorageSpec storageSpec = specs.getStorageSpec(errorsStoreId);
storageSpec =
checkNotNull(
storageSpec,
String.format("errorsStoreId=%s not found in storage specs", errorsStoreId));
Write write = null;
for (ErrorsStore errorsStore : stores) {
if (errorsStore.getType().equals(storageSpec.getType())) {
write = errorsStore.create(storageSpec, specs);
switch (errorsStoreType) {
case ERRORS_STORE_STDOUT:
input.apply("Log errors to STDOUT", ParDo.of(new LoggerDoFn(Level.INFO)));
break;
case ERRORS_STORE_STDERR:
input.apply("Log errors to STDERR", ParDo.of(new LoggerDoFn(Level.ERROR)));
break;
default:
if (errorsStore == null) {
log.warn("No valid errors store specified, errors will be discarded");
return input.apply(new NoOpIO.Write());
}
}
write =
checkNotNull(
write,
"No errors storage factory found for errorsStoreId=%s with type=%s",
errorsStoreId,
storageSpec.getType());
return input.apply(write);
Write write = errorsStore.create(this.errorsStoreSpec, specs);
return input.apply(write);
}
return PDone.in(input.getPipeline());
}
Expand Down
25 changes: 25 additions & 0 deletions ingestion/src/main/java/feast/ingestion/util/JsonUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package feast.ingestion.util;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Map;

public class JsonUtil {
private static Gson gson = new Gson();
/**
* Unmarshals a given json string to map
*
* @param jsonString valid json formatted string
* @return map of keys to values in json
*/
public static Map<String, String> convertJsonStringToMap(String jsonString) {
if (jsonString == null || jsonString.equals("") || jsonString.equals("{}")) {
return Collections.emptyMap();
}
Type stringMapType = new TypeToken<Map<String, String>>() {}.getType();
return gson.fromJson(jsonString, stringMapType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import feast.storage.ErrorsStore;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
import lombok.extern.slf4j.Slf4j;
import feast.storage.ErrorsStore;

@Slf4j
public class ErrorsStoreService {
Expand All @@ -38,7 +39,7 @@ public class ErrorsStoreService {

public static List<ErrorsStore> getAll() {
return Lists.newArrayList(
Iterators.concat(manuallyRegistered.iterator(), serviceLoader.iterator()));
Iterators.concat(manuallyRegistered.iterator(), serviceLoader.iterator()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is weird, are you using the google-java-format plugin or the imported googles styles xml?
https://github.com/google/styleguide/blob/gh-pages/intellij-java-google-style.xml

The imports don't get rearranged for me either.

}

/** Get store of the given subclass. */
Expand Down
32 changes: 18 additions & 14 deletions ingestion/src/test/java/feast/ingestion/ImportJobCSVTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

package feast.ingestion;

import static feast.FeastMatchers.hasCount;
import static feast.ToOrderedFeatureRows.orderedFeatureRow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
Expand All @@ -46,11 +41,6 @@
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.GranularityProto.Granularity;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
Expand All @@ -62,6 +52,18 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;

import static feast.FeastMatchers.hasCount;
import static feast.ToOrderedFeatureRows.orderedFeatureRow;
import static feast.storage.MockErrorsStore.MOCK_ERRORS_STORE_TYPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

@Slf4j
public class ImportJobCSVTest {

Expand All @@ -71,15 +73,15 @@ public class ImportJobCSVTest {
@Rule
public TestPipeline testPipeline = TestPipeline.create();

public ImportSpec initImportSpec(ImportSpec importSpec, String dataFile) throws IOException {
public ImportSpec initImportSpec(ImportSpec importSpec, String dataFile) {
return importSpec.toBuilder().putOptions("path", dataFile).build();
}

public ImportJobOptions initOptions() {
Path path = Paths.get(Resources.getResource("core_specs/").getPath());
ImportJobOptions options = PipelineOptionsFactory.create().as(ImportJobOptions.class);
options.setCoreApiSpecPath(path.toString());
options.setErrorsStoreId("TEST_ERRORS");
options.setErrorsStoreType(MOCK_ERRORS_STORE_TYPE);
return options;
}

Expand Down Expand Up @@ -109,6 +111,7 @@ public void testImportCSV() throws IOException {
importSpec = initImportSpec(importSpec, csvFile.toString());

ImportJobOptions options = initOptions();
options.setErrorsStoreType(MOCK_ERRORS_STORE_TYPE);

Injector injector =
Guice.createInjector(
Expand All @@ -128,7 +131,7 @@ public void testImportCSV() throws IOException {
.apply("flatten warehouse input", Flatten.pCollections());

PCollection<FeatureRowExtended> writtenToErrors =
PCollectionList.of(ErrorsStoreService.get(MockErrorsStore.class).getWrite().getInputs())
PCollectionList.of((ErrorsStoreService.get(MockErrorsStore.class)).getWrite().getInputs())
.apply("flatten errors input", Flatten.pCollections());

List<FeatureRow> expectedRows =
Expand Down Expand Up @@ -246,12 +249,14 @@ public void testImportWithErrors() throws IOException {
importSpec = initImportSpec(importSpec, csvFile.toString());

ImportJobOptions options = initOptions();
options.setErrorsStoreType(MOCK_ERRORS_STORE_TYPE);

Injector injector =
Guice.createInjector(
new ImportJobModule(options, importSpec), new TestPipelineModule(testPipeline));

ImportJob job = injector.getInstance(ImportJob.class);

injector.getInstance(ImportJob.class);
job.expand();

Expand All @@ -268,7 +273,6 @@ public void testImportWithErrors() throws IOException {
(errors) -> {
int i = 0;
for (FeatureRowExtended row : errors) {
log.error(row.toString());
assertEquals(
row.getLastAttempt().getError().getCause(),
"feast.ingestion.exceptions.TypeConversionException");
Expand Down
Loading