Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/java/feast/core/config/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public ImportJobDefaults getImportJobDefaults(
@Value("${feast.jobs.runner}") String runner,
@Value("${feast.jobs.options}") String options,
@Value("${feast.jobs.executable}") String executable,
@Value("${feast.jobs.errorsStoreId}") String errorsStoreId) {
return new ImportJobDefaults(coreApiUri, runner, options, executable, errorsStoreId);
@Value("${feast.jobs.errorsStoreType}") String errorsStoreType,
@Value("${feast.jobs.errorsStoreOptions}") String errorsStoreOptions) {
return new ImportJobDefaults(
coreApiUri, runner, options, executable, errorsStoreType, errorsStoreOptions);
}
}
3 changes: 2 additions & 1 deletion core/src/main/java/feast/core/config/ImportJobDefaults.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ImportJobDefaults {
private String runner;
private String importJobOptions;
private String executable;
private String errorsStoreId;
private String errorsStoreType;
private String errorsStoreOptions;
}

31 changes: 13 additions & 18 deletions core/src/main/java/feast/core/service/JobExecutionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,26 @@
import feast.core.model.JobStatus;
import feast.core.util.TypeConversion;
import feast.specs.ImportSpecProto.ImportSpec;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.time.Instant;
import java.util.*;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class JobExecutionService {
private static final int SLEEP_MS = 10;
private static final Pattern JOB_EXT_ID_PREFIX_REGEX = Pattern.compile(".*FeastImportJobId:.*");

public static final String JOB_PREFIX_DEFAULT = "feastimport";

private static final int SLEEP_MS = 10;
private static final Pattern JOB_EXT_ID_PREFIX_REGEX = Pattern.compile(".*FeastImportJobId:.*");
private JobInfoRepository jobInfoRepository;
private ImportJobDefaults defaults;

Expand Down Expand Up @@ -106,9 +108,6 @@ public SubmitImportJobResponse submitJob(ImportSpec importSpec, String jobPrefix

/**
* Update a given job's status
*
* @param jobId
* @param status
*/
public void updateJobStatus(String jobId, JobStatus status) {
Optional<JobInfo> jobRecordOptional = jobInfoRepository.findById(jobId);
Expand All @@ -121,9 +120,6 @@ public void updateJobStatus(String jobId, JobStatus status) {

/**
* Update a given job's external id
*
* @param jobId
* @param jobExtId
*/
public void updateJobExtId(String jobId, String jobExtId) {
Optional<JobInfo> jobRecordOptional = jobInfoRepository.findById(jobId);
Expand All @@ -137,8 +133,6 @@ public void updateJobExtId(String jobId, String jobExtId) {
/**
* Builds the command to execute the ingestion job
*
* @param importSpec
* @param jobId
* @return configured ProcessBuilder
*/
public ProcessBuilder getProcessBuilder(ImportSpec importSpec, String jobId) {
Expand All @@ -153,7 +147,8 @@ public ProcessBuilder getProcessBuilder(ImportSpec importSpec, String jobId) {
commands.add(
option("importSpecBase64", Base64.getEncoder().encodeToString(importSpec.toByteArray())));
commands.add(option("coreApiUri", defaults.getCoreApiUri()));
commands.add(option("errorsStoreId", defaults.getErrorsStoreId()));
commands.add(option("errorsStoreType", defaults.getErrorsStoreType()));
commands.add(option("errorsStoreOptions", defaults.getErrorsStoreOptions()));
options.forEach((k, v) -> commands.add(option(k, v)));
return new ProcessBuilder(commands);
}
Expand All @@ -170,7 +165,7 @@ private String option(String key, String value) {
*/
public String runProcess(Process p) {
try (BufferedReader outputStream =
new BufferedReader(new InputStreamReader(p.getInputStream()));
new BufferedReader(new InputStreamReader(p.getInputStream()));
BufferedReader errorsStream =
new BufferedReader(new InputStreamReader(p.getErrorStream()))) {
String extId = "";
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ feast.jobs.coreUri=${CORE_API_URI:localhost:8433}
feast.jobs.runner=${JOB_RUNNER:DirectRunner}
feast.jobs.options=${JOB_OPTIONS:{}}
feast.jobs.executable=${JOB_EXECUTABLE:feast-ingestion.jar}
feast.jobs.errorsStoreId=${JOB_ERRORS_STORE_ID:STDOUT}
feast.jobs.errorsStoreType=${JOB_ERRORS_STORE_TYPE:stdout}
feast.jobs.errorsStoreOptions=${JOB_ERRORS_STORE_OPTIONS:{}}

feast.jobs.dataflow.projectId = ${DATAFLOW_PROJECT_ID:}
feast.jobs.dataflow.location = ${DATAFLOW_LOCATION:}
Expand Down
41 changes: 22 additions & 19 deletions core/src/test/java/feast/core/service/JobExecutionServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,25 @@

package feast.core.service;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.mockito.internal.verification.VerificationModeFactory.times;

import com.google.common.collect.Lists;
import feast.core.config.ImportJobDefaults;
import feast.core.dao.JobInfoRepository;
import feast.core.model.JobInfo;
import feast.core.model.JobStatus;
import feast.specs.ImportSpecProto.ImportSpec;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -31,25 +44,13 @@
import org.mockito.Mock;
import org.mockito.Mockito;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.mockito.internal.verification.VerificationModeFactory.times;

public class JobExecutionServiceTest {
private ImportJobDefaults defaults;
@Mock JobInfoRepository jobInfoRepository;

@Rule public final ExpectedException expectedException = ExpectedException.none();
@Rule
public final ExpectedException expectedException = ExpectedException.none();
@Mock
JobInfoRepository jobInfoRepository;
private ImportJobDefaults defaults;

@Before
public void setUp() {
Expand All @@ -60,7 +61,8 @@ public void setUp() {
"DirectRunner",
"{\"key\":\"value\"}",
"ingestion.jar",
"STDOUT");
"STDOUT",
"{}");
}

@Test
Expand All @@ -77,7 +79,8 @@ public void shouldBuildProcessBuilderWithCorrectOptions() {
"--runner=DirectRunner",
"--importSpecBase64=CgRmaWxl",
"--coreApiUri=localhost:8080",
"--errorsStoreId=STDOUT",
"--errorsStoreType=STDOUT",
"--errorsStoreOptions={}",
"--key=value");
assertThat(pb.command(), equalTo(expected));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;

/** An ImportJobModule is a Guice module for creating dependency injection bindings. */
/**
* An ImportJobModule is a Guice module for creating dependency injection bindings.
*/
public class ImportJobModule extends AbstractModule {

private final ImportJobOptions options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.options.Validation.Required;

public interface ImportJobOptions extends PipelineOptions {

@Description("Import spec yaml file path")
@Required(groups = {"importSpec"})
String getImportSpecYamlFile();
Expand Down Expand Up @@ -72,14 +73,23 @@ public interface ImportJobOptions extends PipelineOptions {
void setLimit(Long value);

@Description(
"Set a store id to store errors in, if your data input is **very** small, you can use STDOUT"
+ " or STDERR as the store id, otherwise it must match an associated storage spec")
String getErrorsStoreId();
"Set an errors store type. One of: [stderr, stdout, file.json]. Note that you should not use "
+ "stderr/stdout in production unless your data volume is extremely small.")
String getErrorsStoreType();

void setErrorsStoreType(String value);

void setErrorsStoreId(String value);
@Description(
"Provide errors store options as a json string containing key-values. Options required"
+ "depend on the type of store set.")
@Default.String("{}")
String getErrorsStoreOptions();

void setErrorsStoreOptions(String value);

@AutoService(PipelineOptionsRegistrar.class)
class ImportJobOptionsRegistrar implements PipelineOptionsRegistrar {

@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return Collections.singleton(ImportJobOptions.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@

package feast.ingestion.transform;

import static com.google.common.base.Preconditions.checkNotNull;
import static feast.ingestion.util.JsonUtil.convertJsonStringToMap;

import com.google.inject.Inject;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import feast.ingestion.model.Specs;
import feast.ingestion.options.ImportJobOptions;
import feast.ingestion.transform.FeatureIO.Write;
Expand All @@ -33,54 +28,60 @@
import feast.storage.ErrorsStore;
import feast.storage.noop.NoOpIO;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.slf4j.event.Level;

@Slf4j
public class ErrorsStoreTransform extends FeatureIO.Write {
public static final String STDERR_STORE_ID = "STDERR";
public static final String STDOUT_STORE_ID = "STDOUT";

private String errorsStoreId;
private List<ErrorsStore> stores;
public static final String ERRORS_STORE_STDERR = "stderr";
public static final String ERRORS_STORE_STDOUT = "stdout";
public static final String ERRORS_STORE_JSON = "file.json";

private String errorsStoreType;
private StorageSpec errorsStoreSpec;
private ErrorsStore errorsStore;
private Specs specs;

@Inject
public ErrorsStoreTransform(ImportJobOptions options, List<ErrorsStore> stores, Specs specs) {
this.errorsStoreId = options.getErrorsStoreId();
this.stores = stores;
public ErrorsStoreTransform(
ImportJobOptions options, Specs specs, List<ErrorsStore> errorsStores) {
this.specs = specs;
this.errorsStoreType = options.getErrorsStoreType();

for (ErrorsStore errorsStore : errorsStores) {
if (errorsStore.getType().equals(errorsStoreType)) {
this.errorsStore = errorsStore;
}
}

this.errorsStoreSpec =
StorageSpec.newBuilder()
.setType(errorsStoreType)
.putAllOptions(convertJsonStringToMap(options.getErrorsStoreOptions()))
.build();
}

@Override
public PDone expand(PCollection<FeatureRowExtended> input) {
if (errorsStoreId == null) {
log.warn("No errorsStoreId specified, errors will be discarded");
return input.apply(new NoOpIO.Write());
}

if (errorsStoreId.equals(STDOUT_STORE_ID)) {
input.apply("Log errors to STDOUT", ParDo.of(new LoggerDoFn(Level.INFO)));
} else if (errorsStoreId.equals(STDERR_STORE_ID)) {
input.apply("Log errors to STDERR", ParDo.of(new LoggerDoFn(Level.ERROR)));
} else {
StorageSpec storageSpec = specs.getStorageSpec(errorsStoreId);
storageSpec =
checkNotNull(
storageSpec,
String.format("errorsStoreId=%s not found in storage specs", errorsStoreId));
Write write = null;
for (ErrorsStore errorsStore : stores) {
if (errorsStore.getType().equals(storageSpec.getType())) {
write = errorsStore.create(storageSpec, specs);
switch (errorsStoreType) {
case ERRORS_STORE_STDOUT:
input.apply("Log errors to STDOUT", ParDo.of(new LoggerDoFn(Level.INFO)));
break;
case ERRORS_STORE_STDERR:
input.apply("Log errors to STDERR", ParDo.of(new LoggerDoFn(Level.ERROR)));
break;
default:
if (errorsStore == null) {
log.warn("No valid errors store specified, errors will be discarded");
return input.apply(new NoOpIO.Write());
}
}
write =
checkNotNull(
write,
"No errors storage factory found for errorsStoreId=%s with type=%s",
errorsStoreId,
storageSpec.getType());
return input.apply(write);
Write write = errorsStore.create(this.errorsStoreSpec, specs);
return input.apply(write);
}
return PDone.in(input.getPipeline());
}
Expand Down
27 changes: 27 additions & 0 deletions ingestion/src/main/java/feast/ingestion/util/JsonUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package feast.ingestion.util;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Map;

public class JsonUtil {

private static Gson gson = new Gson();

/**
* Unmarshals a given json string to map
*
* @param jsonString valid json formatted string
* @return map of keys to values in json
*/
public static Map<String, String> convertJsonStringToMap(String jsonString) {
if (jsonString == null || jsonString.equals("") || jsonString.equals("{}")) {
return Collections.emptyMap();
}
Type stringMapType = new TypeToken<Map<String, String>>() {
}.getType();
return gson.fromJson(jsonString, stringMapType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import feast.storage.ErrorsStore;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
import lombok.extern.slf4j.Slf4j;
import feast.storage.ErrorsStore;

@Slf4j
public class ErrorsStoreService {

private static ServiceLoader<ErrorsStore> serviceLoader = ServiceLoader.load(ErrorsStore.class);
private static List<ErrorsStore> manuallyRegistered = new ArrayList<>();

Expand Down
Loading