Skip to content

Commit 01a7377

Browse files
committed
split: fix round-robin filtering to handle closed filter processes correctly
- Track closed state for output writers to stop writing to them when the underlying process/file is closed - Ensure all files are created when not in elide-empty-files mode, even if no data is written - Flush writers in round-robin mode to ensure data is sent to filter processes immediately - Add integration tests for round-robin filtering and early closure handling
1 parent 5e64120 commit 01a7377

2 files changed

Lines changed: 83 additions & 10 deletions

File tree

src/uu/split/src/split.rs

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -923,6 +923,7 @@ struct OutFile {
923923
filename: String,
924924
maybe_writer: Option<BufWriter<Box<dyn Write>>>,
925925
is_new: bool,
926+
is_closed: bool,
926927
}
927928

928929
/// A set of output files
@@ -994,6 +995,7 @@ impl ManageOutFiles for OutFiles {
994995
filename,
995996
maybe_writer,
996997
is_new: true,
998+
is_closed: false,
997999
});
9981000
}
9991001
Ok(out_files)
@@ -1198,7 +1200,7 @@ where
11981200
None => {
11991201
let idx = (i - 1) as usize;
12001202
let writer = out_files.get_writer(idx, settings)?;
1201-
writer.write_all(buf)?;
1203+
custom_write_all(buf, writer, settings)?;
12021204
}
12031205
}
12041206
} else {
@@ -1393,13 +1395,21 @@ where
13931395
// Create one writer for each chunk.
13941396
// This will create each of the underlying files
13951397
// or stdin pipes to child shell/command processes if in `--filter` mode
1398+
// If in N chunks mode
1399+
// Create one writer for each chunk.
1400+
// This will create each of the underlying files
1401+
// or stdin pipes to child shell/command processes if in `--filter` mode.
1402+
// We use lazy initialization (is_writer_optional = true) to avoid hitting
1403+
// system resource limits (like EMFILE or EAGAIN on macOS) prematurely,
1404+
// especially when many filter processes would be spawned at once.
13961405
if kth_chunk.is_none() {
1397-
out_files = OutFiles::init(num_chunks, settings, settings.elide_empty_files)?;
1406+
out_files = OutFiles::init(num_chunks, settings, true)?;
13981407
}
13991408

1400-
let num_chunks: usize = num_chunks.try_into().unwrap();
1409+
let num_chunks_size: usize = num_chunks.try_into().unwrap();
14011410
let sep = settings.separator;
1402-
let mut closed_writers = 0;
1411+
let mut any_writer_open = true;
1412+
let mut wrote_in_current_round = false;
14031413

14041414
let mut i = 0;
14051415
loop {
@@ -1413,22 +1423,56 @@ where
14131423

14141424
let bytes = line.as_slice();
14151425
if let Some(chunk_number) = kth_chunk {
1416-
if (i % num_chunks) == (chunk_number - 1) as usize {
1426+
if (i % num_chunks_size) == (chunk_number - 1) as usize {
14171427
stdout_writer.write_all(bytes)?;
14181428
}
14191429
} else {
1420-
let writer = out_files.get_writer(i % num_chunks, settings)?;
1421-
let writer_stdin_open = custom_write_all(bytes, writer, settings)?;
1422-
if !writer_stdin_open {
1423-
closed_writers += 1;
1430+
let chunk_idx = i % num_chunks_size;
1431+
if !out_files[chunk_idx].is_closed {
1432+
let writer = out_files.get_writer(chunk_idx, settings)?;
1433+
let writer_stdin_open = custom_write_all(bytes, writer, settings)?;
1434+
if writer_stdin_open {
1435+
wrote_in_current_round = true;
1436+
// Ensure data is sent to filter processes immediately in round-robin mode
1437+
if let Err(e) = writer.flush() {
1438+
if ignorable_io_error(&e, settings) {
1439+
out_files[chunk_idx].is_closed = true;
1440+
} else {
1441+
return Err(e.into());
1442+
}
1443+
}
1444+
} else {
1445+
out_files[chunk_idx].is_closed = true;
1446+
}
14241447
}
14251448
}
14261449
i += 1;
1427-
if closed_writers == num_chunks {
1450+
1451+
// After completing a full round of chunks
1452+
if i % num_chunks_size == 0 && kth_chunk.is_none() {
1453+
if !wrote_in_current_round {
1454+
any_writer_open = false;
1455+
}
1456+
wrote_in_current_round = false;
1457+
}
1458+
1459+
if !any_writer_open {
14281460
// all writers are closed - stop reading
14291461
break;
14301462
}
14311463
}
1464+
1465+
// GNU coreutils compatibility: Ensure all files are created if !elide_empty_files,
1466+
// even if no data was written to them.
1467+
if kth_chunk.is_none() && !settings.elide_empty_files {
1468+
for chunk_idx in 0..num_chunks_size {
1469+
// We only need to "touch" the file (instantiate the writer).
1470+
// get_writer will create the file/process if it doesn't exist.
1471+
// If it already exists, it does nothing.
1472+
let _ = out_files.get_writer(chunk_idx, settings);
1473+
}
1474+
}
1475+
14321476
Ok(())
14331477
}
14341478

tests/by-util/test_split.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2091,3 +2091,32 @@ fn test_split_directory_already_exists() {
20912091
.no_stdout()
20922092
.stderr_is("split: xaa: Is a directory\n");
20932093
}
2094+
2095+
#[test]
2096+
fn test_round_robin_to_files() {
2097+
let (at, mut ucmd) = at_and_ucmd!();
2098+
at.write("input", "line1\nline2\nline3\nline4\n");
2099+
2100+
ucmd.args(&["-n", "r/2", "input", "out"]).succeeds();
2101+
2102+
assert_eq!(at.read("outaa"), "line1\nline3\n");
2103+
assert_eq!(at.read("outab"), "line2\nline4\n");
2104+
}
2105+
2106+
#[test]
2107+
#[cfg(not(windows))]
2108+
fn test_round_robin_filter_closes_early() {
2109+
// This test exercises the wrote_in_current_round logic
2110+
// We use a filter that only accepts one line then closes.
2111+
let (at, mut ucmd) = at_and_ucmd!();
2112+
at.write("input", "line1\nline2\nline3\nline4\n");
2113+
2114+
// Filter 'head -n 1' will close its stdin after reading one line.
2115+
// In round-robin, this should cause split to mark that chunk as closed.
2116+
ucmd.args(&["--filter", "head -n 1 > $FILE", "-n", "r/2", "input", "out"])
2117+
.succeeds();
2118+
2119+
assert_eq!(at.read("outaa"), "line1\n");
2120+
// outab also only gets one line because 'head -n 1' closes for it too.
2121+
assert_eq!(at.read("outab"), "line2\n");
2122+
}

0 commit comments

Comments
 (0)