Skip to content

Commit 8bb7f27

Browse files
committed
Fix pipeline options toArgs() returning empty list #765
1 parent b497a0b commit 8bb7f27

5 files changed

Lines changed: 129 additions & 17 deletions

File tree

core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,45 +47,45 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
4747
}
4848

4949
/* Project id to use when launching jobs. */
50-
@NotBlank String project;
50+
@NotBlank public String project;
5151

5252
/* The Google Compute Engine region for creating Dataflow jobs. */
53-
@NotBlank String region;
53+
@NotBlank public String region;
5454

5555
/* GCP availability zone for operations. */
56-
@NotBlank String zone;
56+
@NotBlank public String zone;
5757

5858
/* Run the job as a specific service account, instead of the default GCE robot. */
59-
String serviceAccount;
59+
public String serviceAccount;
6060

6161
/* GCE network for launching workers. */
62-
@NotBlank String network;
62+
@NotBlank public String network;
6363

6464
/* GCE subnetwork for launching workers. */
65-
@NotBlank String subnetwork;
65+
@NotBlank public String subnetwork;
6666

6767
/* Machine type to create Dataflow worker VMs as. */
68-
String workerMachineType;
68+
public String workerMachineType;
6969

7070
/* The autoscaling algorithm to use for the workerpool. */
71-
String autoscalingAlgorithm;
71+
public String autoscalingAlgorithm;
7272

7373
/* Specifies whether worker pools should be started with public IP addresses. */
74-
Boolean usePublicIps;
74+
public Boolean usePublicIps;
7575

7676
/**
7777
* A pipeline level default location for storing temporary files. Support Google Cloud Storage
7878
* locations, e.g. gs://bucket/object
7979
*/
80-
@NotBlank String tempLocation;
80+
@NotBlank public String tempLocation;
8181

8282
/* The maximum number of workers to use for the workerpool. */
83-
Integer maxNumWorkers;
83+
public Integer maxNumWorkers;
8484

8585
/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
86-
String deadLetterTableSpec;
86+
public String deadLetterTableSpec;
8787

88-
Map<String, String> labels;
88+
public Map<String, String> labels;
8989

9090
/** Validates Dataflow runner configuration options */
9191
public void validate() {

core/src/main/java/feast/core/job/direct/DirectRunnerConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ public class DirectRunnerConfig extends RunnerConfig {
2424
* Controls the amount of target parallelism the DirectRunner will use. Defaults to the greater of
2525
* the number of available processors and 3. Must be a value greater than zero.
2626
*/
27-
Integer targetParallelism;
27+
public Integer targetParallelism;
2828

2929
/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
30-
String deadletterTableSpec;
30+
public String deadletterTableSpec;
3131

3232
public DirectRunnerConfig(DirectRunnerConfigOptions runnerConfigOptions) {
3333
this.deadletterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();

core/src/main/java/feast/core/job/option/RunnerConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
public abstract class RunnerConfig {
3131

3232
/**
33-
* Converts the fields in this class to a list of --key=value args to be passed to a {@link
34-
* org.apache.beam.sdk.options.PipelineOptionsFactory}.
33+
* Converts the public-access fields in this class to a list of --key=value args to be passed to a
34+
* {@link org.apache.beam.sdk.options.PipelineOptionsFactory}.
3535
*
3636
* <p>Ignores values that are proto-default (e.g. empty string, 0).
3737
*
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2020 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.core.job.dataflow;
18+
19+
import static org.hamcrest.CoreMatchers.equalTo;
20+
import static org.hamcrest.Matchers.containsInAnyOrder;
21+
import static org.junit.Assert.*;
22+
23+
import com.google.common.collect.Lists;
24+
import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
25+
import java.util.Arrays;
26+
import java.util.List;
27+
import org.junit.Test;
28+
29+
public class DataflowRunnerConfigTest {
30+
@Test
31+
public void shouldConvertToPipelineArgs() throws IllegalAccessException {
32+
DataflowRunnerConfigOptions opts =
33+
DataflowRunnerConfigOptions.newBuilder()
34+
.setProject("my-project")
35+
.setRegion("asia-east1")
36+
.setZone("asia-east1-a")
37+
.setTempLocation("gs://bucket/tempLocation")
38+
.setNetwork("default")
39+
.setSubnetwork("regions/asia-east1/subnetworks/mysubnetwork")
40+
.setMaxNumWorkers(1)
41+
.setAutoscalingAlgorithm("THROUGHPUT_BASED")
42+
.setUsePublicIps(false)
43+
.setWorkerMachineType("n1-standard-1")
44+
.setDeadLetterTableSpec("project_id:dataset_id.table_id")
45+
.putLabels("key", "value")
46+
.build();
47+
48+
DataflowRunnerConfig dataflowRunnerConfig = new DataflowRunnerConfig(opts);
49+
List<String> args = Lists.newArrayList(dataflowRunnerConfig.toArgs());
50+
String[] expectedArgs =
51+
Arrays.asList(
52+
"--project=my-project",
53+
"--region=asia-east1",
54+
"--zone=asia-east1-a",
55+
"--tempLocation=gs://bucket/tempLocation",
56+
"--network=default",
57+
"--subnetwork=regions/asia-east1/subnetworks/mysubnetwork",
58+
"--maxNumWorkers=1",
59+
"--autoscalingAlgorithm=THROUGHPUT_BASED",
60+
"--usePublicIps=false",
61+
"--workerMachineType=n1-standard-1",
62+
"--deadLetterTableSpec=project_id:dataset_id.table_id",
63+
"--labels={\"key\":\"value\"}")
64+
.toArray(String[]::new);
65+
assertThat(args.size(), equalTo(expectedArgs.length));
66+
assertThat(args, containsInAnyOrder(expectedArgs));
67+
}
68+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2020 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.core.job.direct;
18+
19+
import static org.hamcrest.Matchers.containsInAnyOrder;
20+
import static org.hamcrest.Matchers.equalTo;
21+
import static org.junit.Assert.*;
22+
23+
import com.google.common.collect.Lists;
24+
import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;
25+
import java.util.List;
26+
import org.junit.Test;
27+
28+
public class DirectRunnerConfigTest {
29+
@Test
30+
public void shouldConvertToPipelineArgs() throws IllegalAccessException {
31+
DirectRunnerConfigOptions opts =
32+
DirectRunnerConfigOptions.newBuilder()
33+
.setTargetParallelism(1)
34+
.setDeadLetterTableSpec("project_id:dataset_id.table_id")
35+
.build();
36+
DirectRunnerConfig directRunnerConfig = new DirectRunnerConfig(opts);
37+
List<String> args = Lists.newArrayList(directRunnerConfig.toArgs());
38+
assertThat(args.size(), equalTo(2));
39+
assertThat(
40+
args,
41+
containsInAnyOrder(
42+
"--targetParallelism=1", "--deadletterTableSpec=project_id:dataset_id.table_id"));
43+
}
44+
}

0 commit comments

Comments
 (0)