Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 17 additions & 35 deletions src/CopyOnWrite/Copy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks.Dataflow;
using System.Threading.Tasks;
using Task = Microsoft.Build.Utilities.Task;

#nullable enable annotations

Expand Down Expand Up @@ -554,19 +555,18 @@ private bool CopyParallel(

// Lockless flags updated from each thread - each needs to be a processor word for atomicity.
var successFlags = new IntPtr[DestinationFiles.Length];
var actionBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = parallelism,
CancellationToken = _cancellationTokenSource.Token,
EnsureOrdered = parallelism == 1,
};
var partitionCopyActionBlock = new ActionBlock<List<int>>(
async (List<int> partition) =>
{
// Break from synchronous thread context of caller to get onto thread pool thread.
await System.Threading.Tasks.Task.Yield();

for (int partitionIndex = 0; partitionIndex < partition.Count && !_cancellationTokenSource.IsCancellationRequested; partitionIndex++)
Parallel.ForEach(partitionsByDestination.Values,
new ParallelOptions
{
CancellationToken = _cancellationTokenSource.Token,
MaxDegreeOfParallelism = parallelism,
},
(List<int> partition) =>
{
for (int partitionIndex = 0;
partitionIndex < partition.Count && !_cancellationTokenSource.IsCancellationRequested;
partitionIndex++)
{
int fileIndex = partition[partitionIndex];
ITaskItem sourceItem = SourceFiles[fileIndex];
Expand All @@ -583,9 +583,9 @@ private bool CopyParallel(
if (!copyComplete)
{
if (DoCopyIfNecessary(
new FileState(sourceItem.ItemSpec),
new FileState(destItem.ItemSpec),
copyFile))
new FileState(sourceItem.ItemSpec),
new FileState(destItem.ItemSpec),
copyFile))
{
copyComplete = true;
}
Expand All @@ -602,25 +602,7 @@ private bool CopyParallel(
successFlags[fileIndex] = (IntPtr)1;
}
}
},
actionBlockOptions);

foreach (List<int> partition in partitionsByDestination.Values)
{
bool partitionAccepted = partitionCopyActionBlock.Post(partition);
if (_cancellationTokenSource.IsCancellationRequested)
{
break;
}
else if (!partitionAccepted)
{
// Retail assert...
LogError("Failed posting a file copy to an ActionBlock. Should not happen with block at max int capacity.");
}
}

partitionCopyActionBlock.Complete();
partitionCopyActionBlock.Completion.GetAwaiter().GetResult();
});

// Assemble an in-order list of destination items that succeeded.
destinationFilesSuccessfullyCopied = new List<ITaskItem>(DestinationFiles.Length);
Expand Down