Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -520,11 +520,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false)
if (willCancel)
{
// If we have been canceled, then ensure that we write the ATTN packet as well
#if NET
task = AsyncHelper.CreateContinuationTask(task, CancelWritePacket);
#else
task = AsyncHelper.CreateContinuationTask(task, CancelWritePacket, _parser.Connection);
#endif
}

return task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2361,7 +2361,10 @@ private Task CopyColumnsAsync(int col, TaskCompletionSource<object> source = nul
// This is in its own method to avoid always allocating the lambda in CopyColumnsAsync
private void CopyColumnsAsyncSetupContinuation(TaskCompletionSource<object> source, Task task, int i)
{
AsyncHelper.ContinueTaskWithState(task, source, this,
AsyncHelper.ContinueTaskWithState(
task,
source,
state: this,
onSuccess: (object state) =>
{
SqlBulkCopy sqlBulkCopy = (SqlBulkCopy)state;
Expand All @@ -2373,9 +2376,7 @@ private void CopyColumnsAsyncSetupContinuation(TaskCompletionSource<object> sour
{
source.SetResult(null);
}
},
connectionToDoom: _connection.GetOpenTdsConnection()
);
});
}

// The notification logic.
Expand Down Expand Up @@ -2510,10 +2511,11 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts,
}
resultTask = source.Task;

AsyncHelper.ContinueTaskWithState(readTask, source, this,
onSuccess: (object state) => ((SqlBulkCopy)state).CopyRowsAsync(i + 1, totalRows, cts, source),
connectionToDoom: _connection.GetOpenTdsConnection()
);
AsyncHelper.ContinueTaskWithState(
readTask,
source,
state: this,
onSuccess: (object state) => ((SqlBulkCopy)state).CopyRowsAsync(i + 1, totalRows, cts, source));
return resultTask; // Associated task will be completed when all rows are copied to server/exception/cancelled.
}
}
Expand All @@ -2535,14 +2537,13 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts,
}
else
{
AsyncHelper.ContinueTaskWithState(readTask, source, sqlBulkCopy,
onSuccess: (object state2) => ((SqlBulkCopy)state2).CopyRowsAsync(i + 1, totalRows, cts, source),
connectionToDoom: _connection.GetOpenTdsConnection()
);
AsyncHelper.ContinueTaskWithState(
readTask,
source,
state: sqlBulkCopy,
onSuccess: (object state2) => ((SqlBulkCopy)state2).CopyRowsAsync(i + 1, totalRows, cts, source));
}
},
connectionToDoom: _connection.GetOpenTdsConnection()
);
});
return resultTask;
}
}
Expand Down Expand Up @@ -2611,7 +2612,10 @@ private Task CopyBatchesAsync(BulkCopySimpleResultSet internalResults, string up
source = new TaskCompletionSource<object>();
}

AsyncHelper.ContinueTaskWithState(commandTask, source, this,
AsyncHelper.ContinueTaskWithState(
commandTask,
source,
state: this,
onSuccess: (object state) =>
{
SqlBulkCopy sqlBulkCopy = (SqlBulkCopy)state;
Expand All @@ -2621,9 +2625,7 @@ private Task CopyBatchesAsync(BulkCopySimpleResultSet internalResults, string up
// Continuation finished sync, recall into CopyBatchesAsync to continue
sqlBulkCopy.CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}
},
connectionToDoom: _connection.GetOpenTdsConnection()
);
});
return source.Task;
}
}
Expand Down Expand Up @@ -2677,7 +2679,10 @@ private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults,
{ // First time only
source = new TaskCompletionSource<object>();
}
AsyncHelper.ContinueTaskWithState(task, source, this,
AsyncHelper.ContinueTaskWithState(
task,
source,
state: this,
onSuccess: (object state) =>
{
SqlBulkCopy sqlBulkCopy = (SqlBulkCopy)state;
Expand All @@ -2689,9 +2694,7 @@ private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults,
}
},
onFailure: static (Exception _, object state) => ((SqlBulkCopy)state).CopyBatchesAsyncContinuedOnError(cleanupParser: false),
onCancellation: static (object state) => ((SqlBulkCopy)state).CopyBatchesAsyncContinuedOnError(cleanupParser: true),
connectionToDoom: _connection.GetOpenTdsConnection()
);
onCancellation: static (object state) => ((SqlBulkCopy)state).CopyBatchesAsyncContinuedOnError(cleanupParser: true));

return source.Task;
}
Expand Down Expand Up @@ -2738,7 +2741,10 @@ private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internal
source = new TaskCompletionSource<object>();
}

AsyncHelper.ContinueTaskWithState(writeTask, source, this,
AsyncHelper.ContinueTaskWithState(
writeTask,
source,
state: this,
onSuccess: (object state) =>
{
SqlBulkCopy sqlBulkCopy = (SqlBulkCopy)state;
Expand All @@ -2756,9 +2762,7 @@ private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internal
// Always call back into CopyBatchesAsync
sqlBulkCopy.CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
},
onFailure: static (Exception _, object state) => ((SqlBulkCopy)state).CopyBatchesAsyncContinuedOnError(cleanupParser: false),
connectionToDoom: _connection.GetOpenTdsConnection()
);
onFailure: static (Exception _, object state) => ((SqlBulkCopy)state).CopyBatchesAsyncContinuedOnError(cleanupParser: false));
return source.Task;
}
}
Expand Down Expand Up @@ -2859,7 +2863,10 @@ private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet int
{
source = new TaskCompletionSource<object>();
}
AsyncHelper.ContinueTaskWithState(task, source, this,
AsyncHelper.ContinueTaskWithState(
task,
source,
state: this,
onSuccess: (object state) =>
{
SqlBulkCopy sqlBulkCopy = (SqlBulkCopy)state;
Expand Down Expand Up @@ -2902,9 +2909,7 @@ private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet int
}
}
}
},
connectionToDoom: _connection.GetOpenTdsConnection()
);
});
return;
}
else
Expand Down Expand Up @@ -3029,14 +3034,9 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio
_parserLock.Wait(canReleaseFromAnyThread: true);
WriteToServerInternalRestAsync(cts, source);
},
connectionToAbort: _connection,
onFailure: static (_, state) => ((StrongBox<CancellationTokenRegistration>)state).Value.Dispose(),
onCancellation: static state => ((StrongBox<CancellationTokenRegistration>)state).Value.Dispose(),
#if NET
exceptionConverter: ex => SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex)
#else
exceptionConverter: (ex, _) => SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex)
#endif
);
return;
}
Expand Down Expand Up @@ -3085,10 +3085,11 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio

if (internalResultsTask != null)
{
AsyncHelper.ContinueTaskWithState(internalResultsTask, source, this,
onSuccess: (object state) => ((SqlBulkCopy)state).WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source),
connectionToDoom: _connection.GetOpenTdsConnection()
);
AsyncHelper.ContinueTaskWithState(
internalResultsTask,
source,
state: this,
onSuccess: (object state) => ((SqlBulkCopy)state).WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source));
}
else
{
Expand Down Expand Up @@ -3169,9 +3170,7 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken)
{
sqlBulkCopy.WriteToServerInternalRestAsync(ctoken, source); // Passing the same completion which will be completed by the Callee.
}
},
connectionToDoom: _connection.GetOpenTdsConnection()
);
});
return resultTask;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1749,11 +1749,7 @@ private SqlDataReader RunExecuteReaderTdsWithTransparentParameterEncryption(
onCancellation: static state =>
{
((SqlCommand)state).CachedAsyncState?.ResetAsyncState();
}
#if NETFRAMEWORK
, connectionToAbort: _activeConnection
#endif
);
});

task = completion.Task;
return ds;
Expand Down
Loading
Loading