Skip to content

Commit 245e662

Browse files
author
zhangli20
committed
add shuffle_write.output_io_time metric
1 parent e502334 commit 245e662

8 files changed

Lines changed: 135 additions & 49 deletions

File tree

native-engine/datafusion-ext-plans/src/common/ipc_compression.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ impl<W: Write> IpcCompressionWriter<W> {
8282
pub fn inner(&self) -> &W {
8383
&self.output
8484
}
85+
86+
pub fn inner_mut(&mut self) -> &mut W {
87+
&mut self.output
88+
}
8589
}
8690

8791
pub struct IpcCompressionReader<R: Read + 'static> {

native-engine/datafusion-ext-plans/src/common/timer_helper.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::{
1616
future::Future,
17+
io::Write,
1718
sync::{
1819
atomic::{AtomicUsize, Ordering::Relaxed},
1920
Arc,
@@ -37,6 +38,8 @@ pub trait TimerHelper {
3738

3839
fn duration(&self) -> Duration;
3940
fn sub_duration(&self, duration: Duration);
41+
42+
fn wrap_writer<W: Write>(&self, w: W) -> TimedWriter<W>;
4043
}
4144

4245
impl TimerHelper for Time {
@@ -85,6 +88,22 @@ impl TimerHelper for Time {
8588
};
8689
xtime.nanos.fetch_sub(duration.as_nanos() as usize, Relaxed);
8790
}
91+
92+
fn wrap_writer<W: Write>(&self, w: W) -> TimedWriter<W> {
93+
TimedWriter(w, self.clone())
94+
}
95+
}
96+
97+
pub struct TimedWriter<W: Write>(pub W, Time);
98+
99+
impl<W: Write> Write for TimedWriter<W> {
100+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
101+
self.1.with_timer(|| self.0.write(buf))
102+
}
103+
104+
fn flush(&mut self) -> std::io::Result<()> {
105+
self.1.with_timer(|| self.0.flush())
106+
}
88107
}
89108

90109
pub trait RegisterTimer {

native-engine/datafusion-ext-plans/src/shuffle/single_repartitioner.rs

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,43 +20,57 @@ use std::{
2020

2121
use arrow::record_batch::RecordBatch;
2222
use async_trait::async_trait;
23-
use datafusion::{common::Result, physical_plan::metrics::BaselineMetrics};
23+
use datafusion::{
24+
common::Result,
25+
physical_plan::metrics::{BaselineMetrics, Time},
26+
};
2427
use tokio::sync::Mutex;
2528

26-
use crate::{common::ipc_compression::IpcCompressionWriter, shuffle::ShuffleRepartitioner};
29+
use crate::{
30+
common::{
31+
ipc_compression::IpcCompressionWriter,
32+
timer_helper::{TimedWriter, TimerHelper},
33+
},
34+
shuffle::ShuffleRepartitioner,
35+
};
2736

2837
pub struct SingleShuffleRepartitioner {
2938
output_data_file: String,
3039
output_index_file: String,
31-
output_data: Arc<Mutex<Option<IpcCompressionWriter<File>>>>,
40+
output_data: Arc<Mutex<Option<IpcCompressionWriter<TimedWriter<File>>>>>,
41+
output_io_time: Time,
3242
metrics: BaselineMetrics,
3343
}
3444

3545
impl SingleShuffleRepartitioner {
3646
pub fn new(
3747
output_data_file: String,
3848
output_index_file: String,
49+
output_io_time: Time,
3950
metrics: BaselineMetrics,
4051
) -> Self {
4152
Self {
4253
output_data_file,
4354
output_index_file,
4455
output_data: Arc::new(Mutex::default()),
56+
output_io_time,
4557
metrics,
4658
}
4759
}
4860

4961
fn get_output_writer<'a>(
5062
&self,
51-
output_data: &'a mut Option<IpcCompressionWriter<File>>,
52-
) -> Result<&'a mut IpcCompressionWriter<File>> {
63+
output_data: &'a mut Option<IpcCompressionWriter<TimedWriter<File>>>,
64+
) -> Result<&'a mut IpcCompressionWriter<TimedWriter<File>>> {
5365
if output_data.is_none() {
5466
*output_data = Some(IpcCompressionWriter::new(
55-
OpenOptions::new()
56-
.write(true)
57-
.create(true)
58-
.truncate(true)
59-
.open(&self.output_data_file)?,
67+
self.output_io_time.wrap_writer(
68+
OpenOptions::new()
69+
.write(true)
70+
.create(true)
71+
.truncate(true)
72+
.open(&self.output_data_file)?,
73+
),
6074
));
6175
}
6276
Ok(output_data.as_mut().unwrap())
@@ -79,16 +93,33 @@ impl ShuffleRepartitioner for SingleShuffleRepartitioner {
7993

8094
// write index file
8195
if let Some(output_writer) = output_data.as_mut() {
96+
let mut output_index = self.output_io_time.wrap_writer(
97+
OpenOptions::new()
98+
.write(true)
99+
.create(true)
100+
.truncate(true)
101+
.open(&self.output_index_file)?,
102+
);
82103
output_writer.finish_current_buf()?;
83-
let offset = output_writer.inner().stream_position()?;
84-
let mut output_index = File::create(&self.output_index_file)?;
104+
let offset = output_writer.inner_mut().0.stream_position()?;
85105
output_index.write_all(&[0u8; 8])?;
86106
output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
87107
} else {
88108
// write empty data file and index file
89-
let output_data = File::create(&self.output_data_file)?;
90-
output_data.set_len(0)?;
91-
let mut output_index = File::create(&self.output_index_file)?;
109+
let _output_data = self.output_io_time.wrap_writer(
110+
OpenOptions::new()
111+
.write(true)
112+
.create(true)
113+
.truncate(true)
114+
.open(&self.output_data_file)?,
115+
);
116+
let mut output_index = self.output_io_time.wrap_writer(
117+
OpenOptions::new()
118+
.write(true)
119+
.create(true)
120+
.truncate(true)
121+
.open(&self.output_index_file)?,
122+
);
92123
output_index.write_all(&[0u8; 16])?;
93124
}
94125
Ok(())

native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::{
16-
fs::{File, OpenOptions},
16+
fs::OpenOptions,
1717
io::{BufReader, Read, Seek, Write},
1818
sync::Weak,
1919
};
@@ -23,7 +23,10 @@ use async_trait::async_trait;
2323
use bytesize::ByteSize;
2424
use datafusion::{
2525
common::{DataFusionError, Result},
26-
physical_plan::{metrics::ExecutionPlanMetricsSet, Partitioning},
26+
physical_plan::{
27+
metrics::{ExecutionPlanMetricsSet, Time},
28+
Partitioning,
29+
},
2730
};
2831
use datafusion_ext_commons::{
2932
df_execution_err,
@@ -32,6 +35,7 @@ use datafusion_ext_commons::{
3235
use futures::lock::Mutex;
3336

3437
use crate::{
38+
common::timer_helper::TimerHelper,
3539
memmgr::{
3640
metrics::SpillMetrics,
3741
spill::{try_new_spill, Spill},
@@ -49,6 +53,7 @@ pub struct SortShuffleRepartitioner {
4953
spills: Mutex<Vec<ShuffleSpill>>,
5054
partitioning: Partitioning,
5155
num_output_partitions: usize,
56+
output_io_time: Time,
5257
spill_metrics: SpillMetrics,
5358
}
5459

@@ -58,6 +63,7 @@ impl SortShuffleRepartitioner {
5863
output_data_file: String,
5964
output_index_file: String,
6065
partitioning: Partitioning,
66+
output_io_time: Time,
6167
metrics: &ExecutionPlanMetricsSet,
6268
) -> Self {
6369
let num_output_partitions = partitioning.partition_count();
@@ -70,6 +76,7 @@ impl SortShuffleRepartitioner {
7076
spills: Mutex::default(),
7177
partitioning,
7278
num_output_partitions,
79+
output_io_time,
7380
spill_metrics: SpillMetrics::new(metrics, partition_id),
7481
}
7582
}
@@ -158,22 +165,33 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner {
158165
// no spills - directly write current batches into final file
159166
if spills.is_empty() {
160167
let partitioning = self.partitioning.clone();
168+
let output_io_time = self.output_io_time.clone();
161169
tokio::task::spawn_blocking(move || {
162-
let mut output_data = OpenOptions::new()
163-
.write(true)
164-
.create(true)
165-
.truncate(true)
166-
.open(&data_file)?;
167-
let mut output_index = OpenOptions::new()
168-
.write(true)
169-
.create(true)
170-
.truncate(true)
171-
.open(&index_file)?;
170+
let mut output_data = output_io_time.wrap_writer(
171+
OpenOptions::new()
172+
.write(true)
173+
.create(true)
174+
.truncate(true)
175+
.open(&data_file)?,
176+
);
177+
let mut output_index = output_io_time.wrap_writer(
178+
OpenOptions::new()
179+
.write(true)
180+
.create(true)
181+
.truncate(true)
182+
.open(&index_file)?,
183+
);
172184

185+
// write data file
173186
let offsets = data.write(&mut output_data, &partitioning)?;
187+
188+
// write index file
189+
let mut offsets_data = vec![];
174190
for offset in offsets {
175-
output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
191+
offsets_data.extend_from_slice(&(offset as i64).to_le_bytes()[..]);
176192
}
193+
output_index.write_all(&offsets_data)?;
194+
177195
Ok::<(), DataFusionError>(())
178196
})
179197
.await
@@ -216,13 +234,22 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner {
216234
let mut offsets = vec![0];
217235

218236
// append partition in each spills
237+
let output_io_time = self.output_io_time.clone();
219238
tokio::task::spawn_blocking(move || {
220-
let mut output_data = OpenOptions::new()
221-
.write(true)
222-
.create(true)
223-
.truncate(true)
224-
.open(data_file)?;
225-
let mut cur_partition_id = 0;
239+
let mut output_data = output_io_time.wrap_writer(
240+
OpenOptions::new()
241+
.write(true)
242+
.create(true)
243+
.truncate(true)
244+
.open(&data_file)?,
245+
);
246+
let mut output_index = output_io_time.wrap_writer(
247+
OpenOptions::new()
248+
.write(true)
249+
.create(true)
250+
.truncate(true)
251+
.open(&index_file)?,
252+
);
226253

227254
if !spills.is_empty() {
228255
// select partitions from spills
@@ -243,14 +270,15 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner {
243270
num_output_partitions,
244271
);
245272

273+
let mut cur_partition_id = 0;
246274
loop {
247275
let mut min_spill = cursors.peek_mut();
248276
if min_spill.cur + 1 >= min_spill.offsets.len() {
249277
break;
250278
}
251279

252280
while cur_partition_id < min_spill.cur {
253-
offsets.push(output_data.stream_position()?);
281+
offsets.push(output_data.0.stream_position()?);
254282
cur_partition_id += 1;
255283
}
256284
let (spill_offset_start, spill_offset_end) = (
@@ -269,12 +297,15 @@ impl ShuffleRepartitioner for SortShuffleRepartitioner {
269297
}
270298

271299
// add one extra offset at last to ease partition length computation
272-
offsets.resize(num_output_partitions + 1, output_data.stream_position()?);
300+
offsets.resize(num_output_partitions + 1, output_data.0.stream_position()?);
273301

274-
let mut output_index = File::create(index_file)?;
302+
// write index file
303+
let mut offsets_data = vec![];
275304
for offset in offsets {
276-
output_index.write_all(&(offset as i64).to_le_bytes()[..])?;
305+
offsets_data.extend_from_slice(&(offset as i64).to_le_bytes()[..]);
277306
}
307+
output_index.write_all(&offsets_data)?;
308+
278309
Ok::<(), DataFusionError>(())
279310
})
280311
.await

native-engine/datafusion-ext-plans/src/shuffle_writer_exec.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ use datafusion_ext_commons::df_execution_err;
3333
use futures::{stream::once, TryStreamExt};
3434

3535
use crate::{
36-
common::batch_statisitcs::{stat_input, InputBatchStatistics},
36+
common::{
37+
batch_statisitcs::{stat_input, InputBatchStatistics},
38+
timer_helper::RegisterTimer,
39+
},
3740
memmgr::MemManager,
3841
shuffle::{
3942
single_repartitioner::SingleShuffleRepartitioner,
@@ -110,11 +113,13 @@ impl ExecutionPlan for ShuffleWriterExec {
110113
) -> Result<SendableRecordBatchStream> {
111114
// record uncompressed data size
112115
let data_size_metric = MetricBuilder::new(&self.metrics).counter("data_size", partition);
116+
let output_time = self.metrics.register_timer("output_io_time", partition);
113117

114118
let repartitioner: Arc<dyn ShuffleRepartitioner> = match &self.partitioning {
115119
p if p.partition_count() == 1 => Arc::new(SingleShuffleRepartitioner::new(
116120
self.output_data_file.clone(),
117121
self.output_index_file.clone(),
122+
output_time,
118123
BaselineMetrics::new(&self.metrics, partition),
119124
)),
120125
Partitioning::Hash(..) => {
@@ -123,6 +128,7 @@ impl ExecutionPlan for ShuffleWriterExec {
123128
self.output_data_file.clone(),
124129
self.output_index_file.clone(),
125130
self.partitioning.clone(),
131+
output_time,
126132
&self.metrics,
127133
));
128134
MemManager::register_consumer(partitioner.clone(), true);

spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ case class NativeShuffleExchangeExec(
5757
"mem_spill_size",
5858
"mem_spill_iotime",
5959
"disk_spill_size",
60-
"disk_spill_iotime"))
60+
"disk_spill_iotime",
61+
"output_io_time"))
6162
.toSeq: _*)).toMap
6263

6364
lazy val readMetrics: Map[String, SQLMetric] =

spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ object NativeHelper extends Logging {
108108
"mem_spill_size" -> sizeMetric("Native.mem_spill_size"),
109109
"mem_spill_iotime" -> nanoTimingMetric("Native.mem_spill_iotime"),
110110
"disk_spill_size" -> sizeMetric("Native.disk_spill_size"),
111-
"disk_spill_iotime" -> nanoTimingMetric("Native.disk_spill_iotime"))
111+
"disk_spill_iotime" -> nanoTimingMetric("Native.disk_spill_iotime"),
112+
"output_io_time" -> nanoTimingMetric("Native.output_io_time"))
112113

113114
if (BlazeConf.INPUT_BATCH_STATISTICS_ENABLE.booleanConf()) {
114115
metrics ++= TreeMap(

spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeBase.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -167,22 +167,15 @@ abstract class NativeShuffleExchangeBase(
167167
val nativeInputRDD = rdd.asInstanceOf[NativeRDD]
168168
val numPartitions = outputPartitioning.numPartitions
169169
val nativeMetrics = MetricNode(
170-
Map(),
170+
metrics,
171171
nativeInputRDD.metrics :: Nil,
172172
Some({
173-
case ("stage_id", v) => metrics("stage_id") += v
174-
case ("data_size", v) => metrics("dataSize") += v
175173
case ("output_rows", v) =>
176174
val shuffleWriteMetrics = TaskContext.get.taskMetrics().shuffleWriteMetrics
177175
new SQLShuffleWriteMetricsReporter(shuffleWriteMetrics, metrics).incRecordsWritten(v)
178176
case ("elapsed_compute", v) =>
179177
val shuffleWriteMetrics = TaskContext.get.taskMetrics().shuffleWriteMetrics
180178
new SQLShuffleWriteMetricsReporter(shuffleWriteMetrics, metrics).incWriteTime(v)
181-
case ("mem_spill_count", v) if v > 0 => metrics("mem_spill_count").add(v)
182-
case ("mem_spill_size", v) if v > 0 => metrics("mem_spill_size").add(v)
183-
case ("mem_spill_iotime", v) if v > 0 => metrics("mem_spill_iotime").add(v)
184-
case ("disk_spill_size", v) if v > 0 => metrics("disk_spill_size").add(v)
185-
case ("disk_spill_iotime", v) if v > 0 => metrics("disk_spill_iotime").add(v)
186179
case _ =>
187180
}))
188181
val nativeHashExprs = this.nativeHashExprs

0 commit comments

Comments
 (0)