Skip to content

Commit d0a8f35

Browse files
committed
[590] Add Delta HMS Catalog Sync implementation
1 parent 30f3e54 commit d0a8f35

7 files changed

Lines changed: 272 additions & 2 deletions

File tree

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
<apache.rat.version>0.16.1</apache.rat.version>
8989
<google.java.format.version>1.8</google.java.format.version>
9090
<delta.standalone.version>0.5.0</delta.standalone.version>
91+
<delta.hive.version>3.0.0</delta.hive.version>
9192
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
9293
<target.dir.pattern>**/target/**</target.dir.pattern>
9394
<delombok.output.dir>${project.build.directory}/delombok</delombok.output.dir>
@@ -291,7 +292,6 @@
291292
<groupId>io.delta</groupId>
292293
<artifactId>delta-standalone_${scala.binary.version}</artifactId>
293294
<version>${delta.standalone.version}</version>
294-
<scope>test</scope>
295295
</dependency>
296296

297297
<!-- Spark -->

xtable-core/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@
101101
<dependency>
102102
<groupId>io.delta</groupId>
103103
<artifactId>delta-standalone_${scala.binary.version}</artifactId>
104-
<scope>test</scope>
105104
</dependency>
106105

107106
<!-- Hadoop dependencies -->

xtable-hive-metastore/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@
4949
<version>${iceberg.version}</version>
5050
</dependency>
5151

52+
<!-- Delta dependencies -->
53+
<dependency>
54+
<groupId>io.delta</groupId>
55+
<artifactId>delta-hive_${scala.binary.version}</artifactId>
56+
<version>${delta.hive.version}</version>
57+
</dependency>
58+
5259
<!-- HMS dependencies -->
5360
<dependency>
5461
<groupId>org.apache.hive</groupId>

xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import org.apache.xtable.catalog.CatalogTableBuilder;
3434
import org.apache.xtable.exception.NotSupportedException;
35+
import org.apache.xtable.hms.table.DeltaHMSCatalogTableBuilder;
3536
import org.apache.xtable.hms.table.IcebergHMSCatalogTableBuilder;
3637
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
3738
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
@@ -44,6 +45,8 @@ public static CatalogTableBuilder<Table, Table> getTableBuilder(
4445
switch (tableFormat) {
4546
case TableFormat.ICEBERG:
4647
return new IcebergHMSCatalogTableBuilder(configuration);
48+
case TableFormat.DELTA:
49+
return new DeltaHMSCatalogTableBuilder();
4750
default:
4851
throw new NotSupportedException("Unsupported table format: " + tableFormat);
4952
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.hms.table;
20+
21+
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
22+
import static org.apache.xtable.catalog.Constants.PROP_EXTERNAL;
23+
import static org.apache.xtable.catalog.Constants.PROP_PATH;
24+
import static org.apache.xtable.catalog.Constants.PROP_SERIALIZATION_FORMAT;
25+
import static org.apache.xtable.hms.HMSCatalogTableBuilderFactory.newHmsTable;
26+
27+
import java.util.HashMap;
28+
import java.util.Locale;
29+
import java.util.Map;
30+
31+
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
32+
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
33+
import org.apache.hadoop.hive.metastore.api.Table;
34+
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
35+
36+
import com.google.common.annotations.VisibleForTesting;
37+
38+
import io.delta.hive.DeltaStorageHandler;
39+
40+
import org.apache.xtable.catalog.CatalogTableBuilder;
41+
import org.apache.xtable.hms.HMSSchemaExtractor;
42+
import org.apache.xtable.model.InternalTable;
43+
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
44+
import org.apache.xtable.model.storage.TableFormat;
45+
46+
public class DeltaHMSCatalogTableBuilder implements CatalogTableBuilder<Table, Table> {
47+
48+
private final HMSSchemaExtractor schemaExtractor;
49+
private static final String tableFormat = TableFormat.DELTA;
50+
51+
public DeltaHMSCatalogTableBuilder() {
52+
this.schemaExtractor = HMSSchemaExtractor.getInstance();
53+
}
54+
55+
@VisibleForTesting
56+
DeltaHMSCatalogTableBuilder(HMSSchemaExtractor schemaExtractor) {
57+
this.schemaExtractor = schemaExtractor;
58+
}
59+
60+
@Override
61+
public Table getCreateTableRequest(InternalTable table, CatalogTableIdentifier tableIdentifier) {
62+
return newHmsTable(tableIdentifier, getStorageDescriptor(table), getTableParameters());
63+
}
64+
65+
@Override
66+
public Table getUpdateTableRequest(
67+
InternalTable table, Table catalogTable, CatalogTableIdentifier tableIdentifier) {
68+
Table copyTb = new Table(catalogTable);
69+
copyTb.getSd().setCols(schemaExtractor.toColumns(tableFormat, table.getReadSchema()));
70+
return copyTb;
71+
}
72+
73+
@VisibleForTesting
74+
StorageDescriptor getStorageDescriptor(InternalTable table) {
75+
final StorageDescriptor storageDescriptor = new StorageDescriptor();
76+
storageDescriptor.setCols(schemaExtractor.toColumns(tableFormat, table.getReadSchema()));
77+
storageDescriptor.setLocation(table.getBasePath());
78+
SerDeInfo serDeInfo = new SerDeInfo();
79+
serDeInfo.setParameters(getSerDeParameters(table));
80+
storageDescriptor.setSerdeInfo(serDeInfo);
81+
return storageDescriptor;
82+
}
83+
84+
@VisibleForTesting
85+
Map<String, String> getTableParameters() {
86+
Map<String, String> parameters = new HashMap<>();
87+
parameters.put(PROP_EXTERNAL, "TRUE");
88+
parameters.put(TABLE_TYPE_PROP, tableFormat.toUpperCase(Locale.ENGLISH));
89+
parameters.put(
90+
hive_metastoreConstants.META_TABLE_STORAGE, DeltaStorageHandler.class.getCanonicalName());
91+
return parameters;
92+
}
93+
94+
private Map<String, String> getSerDeParameters(InternalTable table) {
95+
Map<String, String> parameters = new HashMap<>();
96+
parameters.put(PROP_SERIALIZATION_FORMAT, "1");
97+
parameters.put(PROP_PATH, table.getBasePath());
98+
return parameters;
99+
}
100+
}

xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ public class HMSCatalogSyncClientTestBase {
6363
.tableFormat(TableFormat.ICEBERG)
6464
.readSchema(InternalSchema.builder().fields(Collections.emptyList()).build())
6565
.build();
66+
protected static final InternalTable TEST_DELTA_INTERNAL_TABLE =
67+
InternalTable.builder()
68+
.basePath(TEST_BASE_PATH)
69+
.tableFormat(TableFormat.DELTA)
70+
.readSchema(InternalSchema.builder().fields(Collections.emptyList()).build())
71+
.build();
6672
protected static final InternalTable TEST_HUDI_INTERNAL_TABLE =
6773
InternalTable.builder()
6874
.basePath(TEST_BASE_PATH)
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.hms.table;
20+
21+
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.mockito.Mockito.mockStatic;
23+
import static org.mockito.Mockito.times;
24+
import static org.mockito.Mockito.verify;
25+
import static org.mockito.Mockito.when;
26+
27+
import java.time.Instant;
28+
import java.util.Collections;
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
32+
import lombok.SneakyThrows;
33+
34+
import org.apache.hadoop.hive.metastore.api.FieldSchema;
35+
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
36+
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
37+
import org.apache.hadoop.hive.metastore.api.Table;
38+
import org.apache.hadoop.security.UserGroupInformation;
39+
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.extension.ExtendWith;
41+
import org.mockito.MockedStatic;
42+
import org.mockito.junit.jupiter.MockitoExtension;
43+
44+
import org.apache.xtable.hms.HMSCatalogSyncClientTestBase;
45+
import org.apache.xtable.model.storage.TableFormat;
46+
47+
@ExtendWith(MockitoExtension.class)
48+
public class TestDeltaHMSCatalogTableBuilder extends HMSCatalogSyncClientTestBase {
49+
50+
private DeltaHMSCatalogTableBuilder mockDeltaHmsCatalogSyncRequestProvider;
51+
52+
private DeltaHMSCatalogTableBuilder createDeltaHMSCatalogTableBuilder() {
53+
return new DeltaHMSCatalogTableBuilder(mockHmsSchemaExtractor);
54+
}
55+
56+
void setupCommonMocks() {
57+
mockDeltaHmsCatalogSyncRequestProvider = createDeltaHMSCatalogTableBuilder();
58+
}
59+
60+
@SneakyThrows
61+
@Test
62+
void testGetCreateTableRequest() {
63+
mockDeltaHmsCatalogSyncRequestProvider = createDeltaHMSCatalogTableBuilder();
64+
when(mockHmsSchemaExtractor.toColumns(
65+
TableFormat.DELTA, TEST_DELTA_INTERNAL_TABLE.getReadSchema()))
66+
.thenReturn(Collections.emptyList());
67+
68+
Instant createdTime = Instant.now();
69+
try (MockedStatic<Instant> mockZonedDateTime = mockStatic(Instant.class)) {
70+
mockZonedDateTime.when(Instant::now).thenReturn(createdTime);
71+
Table expected = new Table();
72+
expected.setDbName(TEST_HMS_DATABASE);
73+
expected.setTableName(TEST_HMS_TABLE);
74+
expected.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
75+
expected.setCreateTime((int) createdTime.getEpochSecond());
76+
expected.setSd(getTestStorageDescriptor());
77+
expected.setTableType("EXTERNAL_TABLE");
78+
expected.setParameters(getTestParameters());
79+
80+
assertEquals(
81+
expected,
82+
mockDeltaHmsCatalogSyncRequestProvider.getCreateTableRequest(
83+
TEST_DELTA_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER));
84+
verify(mockHmsSchemaExtractor, times(1))
85+
.toColumns(TableFormat.DELTA, TEST_DELTA_INTERNAL_TABLE.getReadSchema());
86+
}
87+
}
88+
89+
@SneakyThrows
90+
@Test
91+
void testGetUpdateTableRequest() {
92+
setupCommonMocks();
93+
94+
Table hmsTable =
95+
newTable(
96+
TEST_HMS_DATABASE, TEST_HMS_TABLE, getTestParameters(), getTestStorageDescriptor());
97+
FieldSchema newColumn = new FieldSchema("new_column", "test", null);
98+
when(mockHmsSchemaExtractor.toColumns(
99+
TableFormat.DELTA, TEST_DELTA_INTERNAL_TABLE.getReadSchema()))
100+
.thenReturn(Collections.singletonList(newColumn));
101+
102+
Table output =
103+
mockDeltaHmsCatalogSyncRequestProvider.getUpdateTableRequest(
104+
TEST_DELTA_INTERNAL_TABLE, hmsTable, TEST_CATALOG_TABLE_IDENTIFIER);
105+
Table expected = new Table(hmsTable);
106+
expected.getSd().setCols(Collections.singletonList(newColumn));
107+
108+
assertEquals(expected, output);
109+
verify(mockHmsSchemaExtractor, times(1))
110+
.toColumns(TableFormat.DELTA, TEST_DELTA_INTERNAL_TABLE.getReadSchema());
111+
}
112+
113+
@Test
114+
void testGetStorageDescriptor() {
115+
mockDeltaHmsCatalogSyncRequestProvider = createDeltaHMSCatalogTableBuilder();
116+
when(mockHmsSchemaExtractor.toColumns(
117+
TableFormat.DELTA, TEST_DELTA_INTERNAL_TABLE.getReadSchema()))
118+
.thenReturn(Collections.emptyList());
119+
StorageDescriptor expected = getTestStorageDescriptor();
120+
assertEquals(
121+
expected,
122+
mockDeltaHmsCatalogSyncRequestProvider.getStorageDescriptor(TEST_DELTA_INTERNAL_TABLE));
123+
verify(mockHmsSchemaExtractor, times(1))
124+
.toColumns(TableFormat.DELTA, TEST_DELTA_INTERNAL_TABLE.getReadSchema());
125+
}
126+
127+
@Test
128+
void testGetTableParameters() {
129+
mockDeltaHmsCatalogSyncRequestProvider = createDeltaHMSCatalogTableBuilder();
130+
Map<String, String> expected = getTestParameters();
131+
assertEquals(expected, mockDeltaHmsCatalogSyncRequestProvider.getTableParameters());
132+
}
133+
134+
private StorageDescriptor getTestStorageDescriptor() {
135+
Map<String, String> serDeParams = new HashMap<>();
136+
serDeParams.put("serialization.format", "1");
137+
serDeParams.put("path", TEST_BASE_PATH);
138+
139+
StorageDescriptor storageDescriptor = new StorageDescriptor();
140+
storageDescriptor.setCols(Collections.emptyList());
141+
storageDescriptor.setLocation(TEST_BASE_PATH);
142+
SerDeInfo serDeInfo = new SerDeInfo();
143+
serDeInfo.setParameters(serDeParams);
144+
storageDescriptor.setSerdeInfo(serDeInfo);
145+
return storageDescriptor;
146+
}
147+
148+
private Map<String, String> getTestParameters() {
149+
Map<String, String> parameters = new HashMap<>();
150+
parameters.put("EXTERNAL", "TRUE");
151+
parameters.put("table_type", TableFormat.DELTA);
152+
parameters.put("storage_handler", "io.delta.hive.DeltaStorageHandler");
153+
return parameters;
154+
}
155+
}

0 commit comments

Comments
 (0)