Skip to content

Commit 4830dd5

Browse files
authored
[Fix][Transform-V2] Fix multiTable SQL transform (#10263)
1 parent 28b65d3 commit 4830dd5

25 files changed

Lines changed: 363 additions & 51 deletions

seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public void testVectorFunctions(TestContainer container)
105105
@TestTemplate
106106
public void testSQLTransformMultiTable(TestContainer container)
107107
throws IOException, InterruptedException {
108-
Container.ExecResult sqlTransform = container.executeJob("/sql_transform.conf");
108+
Container.ExecResult sqlTransform = container.executeJob("/sql_transform_multi_table.conf");
109109
Assertions.assertEquals(0, sqlTransform.getExitCode());
110110
}
111111

seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/adaptsink/DefineSinkTypeMultiCatalogTransform.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2323
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
2424
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
25+
import org.apache.seatunnel.transform.common.IdentityMapTransform;
2526

2627
import java.util.List;
2728

@@ -45,4 +46,9 @@ protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
4546
CatalogTable table, ReadonlyConfig config) {
4647
return new DefineSinkTypeTransform(DefineSinkTypeTransformConfig.of(config), table);
4748
}
49+
50+
@Override
51+
protected SeaTunnelTransform<SeaTunnelRow> createIdentityTransform(CatalogTable catalogTable) {
52+
return new IdentityMapTransform(catalogTable);
53+
}
4854
}

seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogTransform.java

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.api.table.catalog.CatalogTable;
22-
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
23-
import org.apache.seatunnel.api.table.catalog.TableSchema;
2422
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2523
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2624
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
@@ -73,7 +71,7 @@ public AbstractMultiCatalogTransform(
7371
if (tableConfig != null) {
7472
transformMap.put(tableId, buildTransform(inputCatalogTable, tableConfig));
7573
} else {
76-
transformMap.put(tableId, new IdentityTransform(inputCatalogTable));
74+
transformMap.put(tableId, createIdentityTransform(inputCatalogTable));
7775
}
7876
});
7977

@@ -91,6 +89,9 @@ public AbstractMultiCatalogTransform(
9189
protected abstract SeaTunnelTransform<SeaTunnelRow> buildTransform(
9290
CatalogTable inputCatalogTable, ReadonlyConfig config);
9391

92+
protected abstract SeaTunnelTransform<SeaTunnelRow> createIdentityTransform(
93+
CatalogTable catalogTable);
94+
9495
@Override
9596
public List<CatalogTable> getProducedCatalogTables() {
9697
return outputCatalogTables;
@@ -103,36 +104,4 @@ public CatalogTable getProducedCatalogTable() {
103104

104105
@Override
105106
public void setTypeInfo(SeaTunnelDataType<SeaTunnelRow> inputDataType) {}
106-
107-
public static class IdentityTransform extends AbstractCatalogSupportMapTransform {
108-
private final CatalogTable catalogTable;
109-
110-
@Override
111-
public String getPluginName() {
112-
return "Identity";
113-
}
114-
115-
public IdentityTransform(CatalogTable catalogTable) {
116-
super(catalogTable);
117-
this.catalogTable = catalogTable;
118-
}
119-
120-
@Override
121-
protected SeaTunnelRow transformRow(SeaTunnelRow row) {
122-
return row;
123-
}
124-
125-
@Override
126-
protected TableSchema transformTableSchema() {
127-
return catalogTable.getTableSchema();
128-
}
129-
130-
@Override
131-
protected TableIdentifier transformTableIdentifier() {
132-
return catalogTable.getTableId();
133-
}
134-
135-
@Override
136-
public void setTypeInfo(SeaTunnelDataType<SeaTunnelRow> inputDataType) {}
137-
}
138107
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.transform.common;
19+
20+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
21+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
22+
import org.apache.seatunnel.api.table.catalog.TableSchema;
23+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
24+
25+
import java.util.Collections;
26+
import java.util.List;
27+
28+
public class IdentityFlatMapTransform extends AbstractCatalogSupportFlatMapTransform {
29+
private final CatalogTable catalogTable;
30+
31+
public IdentityFlatMapTransform(CatalogTable catalogTable) {
32+
super(catalogTable);
33+
this.catalogTable = catalogTable;
34+
}
35+
36+
@Override
37+
public String getPluginName() {
38+
return "IdentityFlatMap";
39+
}
40+
41+
@Override
42+
protected List<SeaTunnelRow> transformRow(SeaTunnelRow row) {
43+
return Collections.singletonList(row);
44+
}
45+
46+
@Override
47+
protected TableSchema transformTableSchema() {
48+
return catalogTable.getTableSchema();
49+
}
50+
51+
@Override
52+
protected TableIdentifier transformTableIdentifier() {
53+
return catalogTable.getTableId();
54+
}
55+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.transform.common;
19+
20+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
21+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
22+
import org.apache.seatunnel.api.table.catalog.TableSchema;
23+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
24+
25+
public class IdentityMapTransform extends AbstractCatalogSupportMapTransform {
26+
private final CatalogTable catalogTable;
27+
28+
public IdentityMapTransform(CatalogTable catalogTable) {
29+
super(catalogTable);
30+
this.catalogTable = catalogTable;
31+
}
32+
33+
@Override
34+
public String getPluginName() {
35+
return "IdentityMap";
36+
}
37+
38+
@Override
39+
protected SeaTunnelRow transformRow(SeaTunnelRow row) {
40+
return row;
41+
}
42+
43+
@Override
44+
protected TableSchema transformTableSchema() {
45+
return catalogTable.getTableSchema();
46+
}
47+
48+
@Override
49+
protected TableIdentifier transformTableIdentifier() {
50+
return catalogTable.getTableId();
51+
}
52+
}

seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldMultiCatalogTransform.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2323
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
2424
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
25+
import org.apache.seatunnel.transform.common.IdentityMapTransform;
2526

2627
import java.util.List;
2728

@@ -42,4 +43,9 @@ protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
4243
CatalogTable inputCatalogTable, ReadonlyConfig config) {
4344
return new CopyFieldTransform(CopyTransformConfig.of(config), inputCatalogTable);
4445
}
46+
47+
@Override
48+
protected SeaTunnelTransform<SeaTunnelRow> createIdentityTransform(CatalogTable catalogTable) {
49+
return new IdentityMapTransform(catalogTable);
50+
}
4551
}

seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileMultiCatalogTransform.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2323
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
2424
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
25+
import org.apache.seatunnel.transform.common.IdentityMapTransform;
2526

2627
import java.util.List;
2728

@@ -32,14 +33,19 @@ public DynamicCompileMultiCatalogTransform(
3233
super(inputCatalogTables, config);
3334
}
3435

36+
@Override
37+
public String getPluginName() {
38+
return DynamicCompileTransform.PLUGIN_NAME;
39+
}
40+
3541
@Override
3642
protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
3743
CatalogTable inputCatalogTable, ReadonlyConfig config) {
3844
return new DynamicCompileTransform(config, inputCatalogTable);
3945
}
4046

4147
@Override
42-
public String getPluginName() {
43-
return DynamicCompileTransform.PLUGIN_NAME;
48+
protected SeaTunnelTransform<SeaTunnelRow> createIdentityTransform(CatalogTable catalogTable) {
49+
return new IdentityMapTransform(catalogTable);
4450
}
4551
}

seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperMultiCatalogTransform.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2323
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
2424
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
25+
import org.apache.seatunnel.transform.common.IdentityMapTransform;
2526

2627
import java.util.List;
2728

@@ -42,4 +43,9 @@ protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
4243
CatalogTable inputCatalogTable, ReadonlyConfig config) {
4344
return new FieldMapperTransform(FieldMapperTransformConfig.of(config), inputCatalogTable);
4445
}
46+
47+
@Override
48+
protected SeaTunnelTransform<SeaTunnelRow> createIdentityTransform(CatalogTable catalogTable) {
49+
return new IdentityMapTransform(catalogTable);
50+
}
4551
}

seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2323
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
2424
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
25+
import org.apache.seatunnel.transform.common.IdentityMapTransform;
2526

2627
import java.util.List;
2728

@@ -42,4 +43,9 @@ protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
4243
CatalogTable inputCatalogTable, ReadonlyConfig config) {
4344
return new FilterFieldTransform(config, inputCatalogTable);
4445
}
46+
47+
@Override
48+
protected SeaTunnelTransform<SeaTunnelRow> createIdentityTransform(CatalogTable catalogTable) {
49+
return new IdentityMapTransform(catalogTable);
50+
}
4551
}

seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FieldRowKindMultiCatalogTransform.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2323
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
2424
import org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform;
25+
import org.apache.seatunnel.transform.common.IdentityMapTransform;
2526

2627
import java.util.List;
2728

@@ -42,4 +43,9 @@ protected SeaTunnelTransform<SeaTunnelRow> buildTransform(
4243
CatalogTable inputCatalogTable, ReadonlyConfig config) {
4344
return new FilterRowKindTransform(config, inputCatalogTable);
4445
}
46+
47+
@Override
48+
protected SeaTunnelTransform<SeaTunnelRow> createIdentityTransform(CatalogTable catalogTable) {
49+
return new IdentityMapTransform(catalogTable);
50+
}
4551
}

0 commit comments

Comments
 (0)