diff --git a/native-engine/blaze-jni-bridge/src/conf.rs b/native-engine/blaze-jni-bridge/src/conf.rs index 99ba76923..b69a1617f 100644 --- a/native-engine/blaze-jni-bridge/src/conf.rs +++ b/native-engine/blaze-jni-bridge/src/conf.rs @@ -42,6 +42,7 @@ define_conf!(BooleanConf, PARTIAL_AGG_SKIPPING_SKIP_SPILL); define_conf!(BooleanConf, PARQUET_ENABLE_PAGE_FILTERING); define_conf!(BooleanConf, PARQUET_ENABLE_BLOOM_FILTER); define_conf!(StringConf, SPARK_IO_COMPRESSION_CODEC); +define_conf!(IntConf, SPARK_IO_COMPRESSION_ZSTD_LEVEL); define_conf!(IntConf, TOKIO_WORKER_THREADS_PER_CPU); define_conf!(IntConf, SPARK_TASK_CPUS); define_conf!(StringConf, SPILL_COMPRESSION_CODEC); diff --git a/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs b/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs index 960429209..cb1c02bec 100644 --- a/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs +++ b/native-engine/datafusion-ext-plans/src/common/ipc_compression.rs @@ -18,7 +18,11 @@ use std::io::{BufReader, Read, Take, Write}; use arrow::{array::ArrayRef, datatypes::SchemaRef}; -use blaze_jni_bridge::{conf, conf::StringConf, is_jni_bridge_inited}; +use blaze_jni_bridge::{ + conf, + conf::{IntConf, StringConf}, + is_jni_bridge_inited, +}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use datafusion::common::Result; use datafusion_ext_commons::{ @@ -28,7 +32,6 @@ use datafusion_ext_commons::{ use once_cell::sync::OnceCell; pub const DEFAULT_SHUFFLE_COMPRESSION_TARGET_BUF_SIZE: usize = 4194304; -const ZSTD_LEVEL: i32 = 1; pub struct IpcCompressionWriter { output: W, @@ -181,7 +184,10 @@ impl IoCompressionWriter { pub fn try_new(codec: &str, inner: W) -> Result { match codec { "lz4" => Ok(Self::LZ4(lz4_flex::frame::FrameEncoder::new(inner))), - "zstd" => Ok(Self::ZSTD(zstd::Encoder::new(inner, ZSTD_LEVEL)?)), + "zstd" => Ok(Self::ZSTD(zstd::Encoder::new( + inner, + conf::SPARK_IO_COMPRESSION_ZSTD_LEVEL.value().unwrap_or(1), + )?)), _ => df_execution_err!("unsupported codec: {}", codec), } } diff --git a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java index e655f79ea..3f46ee144 100644 --- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java +++ b/spark-extension/src/main/java/org/apache/spark/sql/blaze/BlazeConf.java @@ -75,6 +75,9 @@ public enum BlazeConf { // spark io compression codec SPARK_IO_COMPRESSION_CODEC("spark.io.compression.codec", "lz4"), + // spark io compression zstd level + SPARK_IO_COMPRESSION_ZSTD_LEVEL("spark.io.compression.zstd.level", 1), + // tokio worker threads per cpu (spark.task.cpus), 0 for auto detection TOKIO_WORKER_THREADS_PER_CPU("spark.blaze.tokio.worker.threads.per.cpu", 0),