Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

import static org.apache.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode;
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX;
import static org.apache.fluss.utils.PropertiesUtils.extractPrefix;
import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix;

/** A connection to Fluss cluster, and holds the client session resources. */
public final class FlussConnection implements Connection {
Expand All @@ -74,7 +74,7 @@ public final class FlussConnection implements Connection {
// only pass options with 'client.fs.' prefix
FileSystem.initialize(
Configuration.fromMap(
extractPrefix(new HashMap<>(conf.toMap()), CLIENT_PREFIX + "fs.")),
extractAndRemovePrefix(new HashMap<>(conf.toMap()), CLIENT_PREFIX + "fs.")),
null);
// for client metrics.
setupClientMetricsConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1740,7 +1740,7 @@ void testFileSystemRecognizeConnectionConf() throws Exception {
Configuration filesystemConf = testFileSystem.getConfiguration();
assertThat(filesystemConf.toMap())
.containsExactlyEntriesOf(
Collections.singletonMap("client.fs.test.key", "fs_test_value"));
Collections.singletonMap("test.key", "fs_test_value"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* (abfs, abfss, wasb, wasbs) based on Azure HDFS support in the hadoop-azure module.
*/
abstract class AzureFileSystemPlugin implements FileSystemPlugin {
private static final String[] FLUSS_CONFIG_PREFIXES = {"fs.azure."};
private static final String[] FLUSS_CONFIG_PREFIXES = {"azure.", "fs.azure."};

private static final String HADOOP_CONFIG_PREFIX = "fs.azure.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public class OBSFileSystemPlugin implements FileSystemPlugin {
* In order to simplify, we make fluss obs configuration keys same with hadoop obs module. So,
* we add all configuration key with prefix `fs.obs` in fluss conf to hadoop conf
*/
private static final String[] FLUSS_CONFIG_PREFIXES = {"fs.obs."};
private static final String[] FLUSS_CONFIG_PREFIXES = {"obs.", "fs.obs."};

private static final String HADOOP_CONFIG_PREFIX = "fs.obs.";

private static final String ACCESS_KEY_ID = "fs.obs.access.key";
public static final String CREDENTIALS_PROVIDER = "fs.obs.security.provider";
Expand Down Expand Up @@ -116,15 +118,14 @@ org.apache.hadoop.conf.Configuration getHadoopConfiguration(Configuration flussC
for (String key : flussConfig.keySet()) {
for (String prefix : FLUSS_CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
String newKey = HADOOP_CONFIG_PREFIX + key.substring(prefix.length());
String value =
flussConfig.getString(
ConfigBuilder.key(key).stringType().noDefaultValue(), null);
conf.set(key, value);
conf.set(newKey, value);

LOG.debug(
"Adding Fluss config entry for {} as {} to Hadoop config",
key,
conf.get(key));
"Adding Fluss config entry for {} as {} to Hadoop config", key, newKey);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public class OSSFileSystemPlugin implements FileSystemPlugin {
* In order to simplify, we make fluss oss configuration keys same with hadoop oss module. So,
* we add all configuration key with prefix `fs.oss` in fluss conf to hadoop conf
*/
private static final String[] FLUSS_CONFIG_PREFIXES = {"fs.oss."};
private static final String[] FLUSS_CONFIG_PREFIXES = {"oss.", "fs.oss."};

private static final String HADOOP_CONFIG_PREFIX = "fs.oss.";

public static final String REGION_KEY = "fs.oss.region";

Expand Down Expand Up @@ -127,15 +129,14 @@ org.apache.hadoop.conf.Configuration getHadoopConfiguration(Configuration flussC
for (String key : flussConfig.keySet()) {
for (String prefix : FLUSS_CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
String newKey = HADOOP_CONFIG_PREFIX + key.substring(prefix.length());
String value =
flussConfig.getString(
ConfigBuilder.key(key).stringType().noDefaultValue(), null);
conf.set(key, value);
conf.set(newKey, value);

LOG.debug(
"Adding Fluss config entry for {} as {} to Hadoop config",
key,
conf.get(key));
"Adding Fluss config entry for {} as {} to Hadoop config", key, newKey);
}
}
}
Expand Down