Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions .azure-pipelines/templates/nccl-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -245,25 +245,25 @@ steps:
mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_ENABLE_NCCL_FALLBACK=TRUE -x MSCCLPP_NCCL_LIB_PATH=/root/nccl/build/lib/libnccl.so -x MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION="allreduce" /root/nccl-tests/build/broadcast_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20"'
workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: Bash@3
name: RunNcclReduceScatterFallbaclkToNcclTest
displayName: Run NCCL ReduceScatter Test with or without Fallback to NCCL operation
inputs:
targetType: 'inline'
script: |
set -e
HOSTFILE=$(System.DefaultWorkingDirectory)/mscclpp/test/deploy/hostfile_ci
ROOT_DIR=$(System.DefaultWorkingDirectory)/mscclpp
SSH_OPTION="StrictHostKeyChecking=no"
KeyFilePath=${SSHKEYFILE_SECUREFILEPATH}
parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" \
-O $SSH_OPTION 'sudo docker exec -t mscclpp-test bash -c "\
cd /root/mscclpp; \
echo \"mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_ENABLE_NCCL_FALLBACK=TRUE -x MSCCLPP_NCCL_LIB_PATH=/root/nccl/build/lib/libnccl.so -x MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION=\"reducescatter\" /root/nccl-tests/build/reduce_scatter_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20\"; \
mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_ENABLE_NCCL_FALLBACK=TRUE -x MSCCLPP_NCCL_LIB_PATH=/root/nccl/build/lib/libnccl.so -x MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION="reducescatter" /root/nccl-tests/build/reduce_scatter_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20; \
echo \"mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_ENABLE_NCCL_FALLBACK=TRUE -x MSCCLPP_NCCL_LIB_PATH=/root/nccl/build/lib/libnccl.so -x MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION=\"broadcast\" -x MSCCLPP_EXECUTION_PLAN_DIR=/root/mscclpp/msccl-users/execution-files /root/nccl-tests/build/reduce_scatter_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20\"; \
mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_ENABLE_NCCL_FALLBACK=TRUE -x MSCCLPP_NCCL_LIB_PATH=/root/nccl/build/lib/libnccl.so -x MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION="broadcast" -x MSCCLPP_EXECUTION_PLAN_DIR=/root/mscclpp/msccl-users/execution-files /root/nccl-tests/build/reduce_scatter_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20"'
workingDirectory: '$(System.DefaultWorkingDirectory)'
# - task: Bash@3
# name: RunNcclReduceScatterFallbaclkToNcclTest
# displayName: Run NCCL ReduceScatter Test with or without Fallback to NCCL operation
# inputs:
# targetType: 'inline'
# script: |
# set -e
# HOSTFILE=$(System.DefaultWorkingDirectory)/mscclpp/test/deploy/hostfile_ci
# ROOT_DIR=$(System.DefaultWorkingDirectory)/mscclpp
# SSH_OPTION="StrictHostKeyChecking=no"
# KeyFilePath=${SSHKEYFILE_SECUREFILEPATH}
# parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" \
# -O $SSH_OPTION 'sudo docker exec -t mscclpp-test bash -c "\
# cd /root/mscclpp; \
# echo \"mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_ENABLE_NCCL_FALLBACK=TRUE -x MSCCLPP_NCCL_LIB_PATH=/root/nccl/build/lib/libnccl.so -x MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION=\"reducescatter\" /root/nccl-tests/build/reduce_scatter_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20\"; \
# mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_ENABLE_NCCL_FALLBACK=TRUE -x MSCCLPP_NCCL_LIB_PATH=/root/nccl/build/lib/libnccl.so -x MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION="reducescatter" /root/nccl-tests/build/reduce_scatter_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20; \
# echo \"mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_ENABLE_NCCL_FALLBACK=TRUE -x MSCCLPP_NCCL_LIB_PATH=/root/nccl/build/lib/libnccl.so -x MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION=\"broadcast\" -x MSCCLPP_EXECUTION_PLAN_DIR=/root/mscclpp/msccl-users/execution-files /root/nccl-tests/build/reduce_scatter_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20\"; \
# mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_ENABLE_NCCL_FALLBACK=TRUE -x MSCCLPP_NCCL_LIB_PATH=/root/nccl/build/lib/libnccl.so -x MSCCLPP_FORCE_NCCL_FALLBACK_OPERATION="broadcast" -x MSCCLPP_EXECUTION_PLAN_DIR=/root/mscclpp/msccl-users/execution-files /root/nccl-tests/build/reduce_scatter_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20"'
# workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: AzureCLI@2
name: StopVMSS
Expand Down
212 changes: 114 additions & 98 deletions src/include/execution_kernel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ __shared__ BufferType* portChannelBufferTypes_;
__shared__ uint32_t flag_;
__shared__ uint32_t scratchChunkSize_;
__shared__ uint32_t scratchOffset_;
#if defined(ENABLE_NPKIT)
__shared__ NpKitEvent* eventBuffer_;
#endif

template <typename T>
MSCCLPP_DEVICE_INLINE T* getBuffer(T* input, T* output, T* scratch, BufferType bufferType) {
Expand Down Expand Up @@ -652,16 +655,26 @@ MSCCLPP_DEVICE_INLINE void handleMultiLoadReduceStore(const Operation& op, uint3
#endif

template <typename T, typename PacketType, bool ReuseScratch>
MSCCLPP_DEVICE_INLINE void handlePipeline(const Operation& op, T* input, T* output, T* scratch) {
MSCCLPP_DEVICE_INLINE void handlePipeline(const Operation& op, T* input, T* output, T* scratch
#if defined(ENABLE_NPKIT)
,
uint64_t& eventBufferHead
#endif
) {
uint16_t nIterations = op.nIterations;
uint16_t nOperations = op.nOperations;
uint32_t unitSize = op.unitSize;
const Operation* operations = &op + 1;
for (uint16_t i = 0; i < nIterations; i++) {
uint32_t offset = i * unitSize;
for (uint8_t opId = 0; opId < nOperations; opId++) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY)
executeDeviceFunction<T, PacketType, ReuseScratch>(operations[opId], input, output, scratch, nullptr, offset,
unitSize, eventBufferHead);
#else
executeDeviceFunction<T, PacketType, ReuseScratch>(operations[opId], input, output, scratch, nullptr, offset,
unitSize);
#endif
}
}
}
Expand All @@ -682,94 +695,104 @@ MSCCLPP_DEVICE_INLINE void handleSemAcquire(const Operation& op) {
}
}

#if defined(ENABLE_NPKIT)
MSCCLPP_DEVICE_INLINE uint32_t getOpSize(const Operation& op, uint32_t offset, uint32_t unitSize) {
if (op.type == OperationType::BARRIER || op.type == OperationType::WAIT || op.type == OperationType::SIGNAL ||
op.type == OperationType::RELAXED_WAIT || op.type == OperationType::RELAXED_SIGNAL ||
op.type == OperationType::NOP || op.type == OperationType::FLUSH || op.type == OperationType::PIPELINE ||
op.type == OperationType::SEM_ACQUIRE || op.type == OperationType::SEM_RELEASE) {
return 0;
}
return min(op.inputBufferSizes[0] - offset, unitSize);
}
#endif

template <typename T, typename PacketType, bool ReuseScratch>
MSCCLPP_DEVICE_INLINE void executeDeviceFunction(const Operation& op, T* input, T* output, T* scratch, uint8_t* nSteps,
uint32_t offset, uint32_t unitSize) {
uint32_t offset, uint32_t unitSize
#if defined(ENABLE_NPKIT)
,
uint64_t& eventBufferHead
#endif
) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY)
uint32_t opSize = 0;
if (unitSize < UINT32_MAX) {
opSize = getOpSize(op, offset, unitSize);
}
NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY + (int)op.type, opSize, 0, NPKIT_GET_GPU_TIMESTAMP(),
eventBuffer_, &eventBufferHead);
#endif
if (nSteps != nullptr) {
*nSteps = 1;
}
OperationType opType = op.type;
if (opType == OperationType::NOP) {
return handleNop();
}
if (opType == OperationType::BARRIER) {
return handleBarrier(op);
}
if (opType == OperationType::SIGNAL) {
return handleSignal(op);
}
if (opType == OperationType::WAIT) {
return handleWait(op);
}
if (opType == OperationType::RELAXED_SIGNAL) {
return handleSignal<true>(op);
}
if (opType == OperationType::RELAXED_WAIT) {
return handleWait<true>(op);
}
if (opType == OperationType::FLUSH) {
return handleFlush(op);
}
if (opType == OperationType::PUT) {
return handlePut<ReuseScratch>(op, input, output, scratch, offset, unitSize);
}
if (opType == OperationType::PUT_WITH_SIGNAL) {
return handlePut<ReuseScratch, true>(op, input, output, scratch, offset, unitSize);
}
if (opType == OperationType::PUT_WITH_SIGNAL_AND_FLUSH) {
return handlePut<ReuseScratch, true, true>(op, input, output, scratch, offset, unitSize);
}
if (opType == OperationType::PUT_PACKET) {
return handlePutPacket<PacketType>(op, input, output, scratch);
}
if (opType == OperationType::READ_PUT_PACKET) {
return handleReadPutPacket<PacketType>(op, scratch);
}
if (opType == OperationType::GET) {
return handleGet<ReuseScratch>(op, input, output, scratch, offset, unitSize);
}
if (opType == OperationType::READ_REDUCE_SEND) {
return handleReadReduceSend<T, ReuseScratch, true>(op, input, output, scratch, offset, unitSize);
}
if (opType == OperationType::READ_REDUCE) {
return handleReadReduceSend<T, ReuseScratch, false>(op, input, output, scratch, offset, unitSize);
}
if (opType == OperationType::COPY) {
return handleCopy<ReuseScratch>(op, input, output, scratch, offset, unitSize);
}
if (opType == OperationType::REDUCE_SEND) {
return handleReduceSend<T, ReuseScratch>(op, input, output, scratch, offset, unitSize);
}
if (opType == OperationType::REDUCE) {
return handleReduceSend<T, ReuseScratch, false>(op, input, output, scratch, offset, unitSize);
}
if (opType == OperationType::REDUCE_SEND_PACKET) {
return handleReduceSendPacket<T, PacketType>(op, input, output, scratch);
}
if (opType == OperationType::REDUCE_PACKET) {
return handleReduceSendPacket<T, PacketType, false>(op, input, output, scratch);
}
if (opType == OperationType::UNPACK_COPY_PACKET) {
return handleUnpackCopyPacket<PacketType>(op, input, output, scratch);
}
if (opType == OperationType::COPY_PACKET) {
return handleCopyPacket<PacketType>(op, input, output, scratch);
}
if (opType == OperationType::SEM_ACQUIRE) {
return handleSemAcquire(op);
}
if (opType == OperationType::SEM_RELEASE) {
return handleSemRelease(op);
handleNop();
} else if (opType == OperationType::BARRIER) {
handleBarrier(op);
} else if (opType == OperationType::SIGNAL) {
handleSignal(op);
} else if (opType == OperationType::WAIT) {
handleWait(op);
} else if (opType == OperationType::RELAXED_SIGNAL) {
handleSignal<true>(op);
} else if (opType == OperationType::RELAXED_WAIT) {
handleWait<true>(op);
} else if (opType == OperationType::FLUSH) {
handleFlush(op);
} else if (opType == OperationType::PUT) {
handlePut<ReuseScratch>(op, input, output, scratch, offset, unitSize);
} else if (opType == OperationType::PUT_WITH_SIGNAL) {
handlePut<ReuseScratch, true>(op, input, output, scratch, offset, unitSize);
} else if (opType == OperationType::PUT_WITH_SIGNAL_AND_FLUSH) {
handlePut<ReuseScratch, true, true>(op, input, output, scratch, offset, unitSize);
} else if (opType == OperationType::PUT_PACKET) {
handlePutPacket<PacketType>(op, input, output, scratch);
} else if (opType == OperationType::READ_PUT_PACKET) {
handleReadPutPacket<PacketType>(op, scratch);
} else if (opType == OperationType::GET) {
handleGet<ReuseScratch>(op, input, output, scratch, offset, unitSize);
} else if (opType == OperationType::READ_REDUCE_SEND) {
handleReadReduceSend<T, ReuseScratch, true>(op, input, output, scratch, offset, unitSize);
} else if (opType == OperationType::READ_REDUCE) {
handleReadReduceSend<T, ReuseScratch, false>(op, input, output, scratch, offset, unitSize);
} else if (opType == OperationType::COPY) {
handleCopy<ReuseScratch>(op, input, output, scratch, offset, unitSize);
} else if (opType == OperationType::REDUCE_SEND) {
handleReduceSend<T, ReuseScratch>(op, input, output, scratch, offset, unitSize);
} else if (opType == OperationType::REDUCE) {
handleReduceSend<T, ReuseScratch, false>(op, input, output, scratch, offset, unitSize);
} else if (opType == OperationType::REDUCE_SEND_PACKET) {
handleReduceSendPacket<T, PacketType>(op, input, output, scratch);
} else if (opType == OperationType::REDUCE_PACKET) {
handleReduceSendPacket<T, PacketType, false>(op, input, output, scratch);
} else if (opType == OperationType::UNPACK_COPY_PACKET) {
handleUnpackCopyPacket<PacketType>(op, input, output, scratch);
} else if (opType == OperationType::COPY_PACKET) {
handleCopyPacket<PacketType>(op, input, output, scratch);
} else if (opType == OperationType::SEM_ACQUIRE) {
handleSemAcquire(op);
} else if (opType == OperationType::SEM_RELEASE) {
handleSemRelease(op);
}
#if defined(__CUDA_ARCH__) && __CUDA_ARCH__ >= 900
if (opType == OperationType::MULTI_LOAD_REDUCE_STORE) {
return handleMultiLoadReduceStore<T, ReuseScratch>(op, offset, unitSize);
else if (opType == OperationType::MULTI_LOAD_REDUCE_STORE) {
handleMultiLoadReduceStore<T, ReuseScratch>(op, offset, unitSize);
}
#endif
if (opType == OperationType::PIPELINE) {
else if (opType == OperationType::PIPELINE) {
*nSteps = op.nOperations + 1;
return handlePipeline<T, PacketType, ReuseScratch>(op, input, output, scratch);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT)
handlePipeline<T, PacketType, ReuseScratch>(op, input, output, scratch, eventBufferHead);
#else
handlePipeline<T, PacketType, ReuseScratch>(op, input, output, scratch);
#endif
}
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT)
NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT + (int)op.type, opSize, 0, NPKIT_GET_GPU_TIMESTAMP(),
eventBuffer_, &eventBufferHead);
#endif
return;
}

Expand All @@ -789,12 +812,12 @@ __global__ __launch_bounds__(1024, 1) void executionKernel([[maybe_unused]] int
int bid = blockIdx.x;
int tid = threadIdx.x;
#if defined(ENABLE_NPKIT)
NpKitEvent* event_buffer = (NpKitEvent*)((char*)sharedMem + sizeof(DeviceExecutionPlan));
uint64_t event_buffer_head = 0;
eventBuffer_ = (NpKitEvent*)((char*)sharedMem + sizeof(DeviceExecutionPlan));
uint64_t eventBufferHead = 0;
#if defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT)
uint64_t npkit_timestamp_entry = 0;
uint64_t npkitTimestampEntry = 0;
if (tid == 0) {
npkit_timestamp_entry = NPKIT_GET_GPU_TIMESTAMP();
npkitTimestampEntry = NPKIT_GET_GPU_TIMESTAMP();
}
#endif
#endif
Expand Down Expand Up @@ -824,41 +847,34 @@ __global__ __launch_bounds__(1024, 1) void executionKernel([[maybe_unused]] int
#else
NpKit::CollectGpuEventShm(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp,
#endif
event_buffer, &event_buffer_head);
eventBuffer_, &eventBufferHead);
#endif

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU)
NpKit::CollectGpuEventShm(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, NPKIT_GET_GPU_TIMESTAMP(), event_buffer,
&event_buffer_head);
NpKit::CollectGpuEventShm(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, NPKIT_GET_GPU_TIMESTAMP(), eventBuffer_, &eventBufferHead);
#endif

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_ENTRY) && \
defined(ENABLE_NPKIT_EVENT_EXECUTOR_INIT_EXIT)
NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_INIT_ENTRY, 0, 0, npkit_timestamp_entry, event_buffer,
&event_buffer_head);
NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_INIT_EXIT, 0, 0, NPKIT_GET_GPU_TIMESTAMP(), event_buffer,
&event_buffer_head);
NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_INIT_ENTRY, 0, 0, npkitTimestampEntry, eventBuffer_, &eventBufferHead);
NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_INIT_EXIT, 0, 0, NPKIT_GET_GPU_TIMESTAMP(), eventBuffer_,
&eventBufferHead);
#endif

for (int i = 0; i < nOperations;) {
Operation& op = operations[i];

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY)
NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY + (int)op.type, op.size, 0, NPKIT_GET_GPU_TIMESTAMP(),
event_buffer, &event_buffer_head);
#endif
uint8_t nSteps = 0;
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_ENTRY)
executeDeviceFunction<T, PacketType, ReuseScratch>(op, input, output, scratch, &nSteps, 0, UINT32_MAX,
eventBufferHead);
#else
executeDeviceFunction<T, PacketType, ReuseScratch>(op, input, output, scratch, &nSteps);
i += nSteps;

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT)
NpKit::CollectGpuEventShm(NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT + (int)op.type, op.size, 0, NPKIT_GET_GPU_TIMESTAMP(),
event_buffer, &event_buffer_head);
#endif
i += nSteps;
}

#if defined(ENABLE_NPKIT)
NpKit::StoreGpuEventShm(npKitEventCollectContexts, event_buffer, event_buffer_head);
NpKit::StoreGpuEventShm(npKitEventCollectContexts, eventBuffer_, eventBufferHead);
#endif
}
#endif // defined(MSCCLPP_DEVICE_COMPILE)
Expand Down
7 changes: 2 additions & 5 deletions test/execution-files/allreduce_packet.json
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
"reduce_op": "sum"
},
{
"name": "cpkt",
"name": "ucpkt",
"src_buff": [
{
"type": "s",
Expand Down Expand Up @@ -223,10 +223,7 @@
"reduce_op": "sum"
},
{
"name": "nop"
},
{
"name": "cpkt",
"name": "ucpkt",
"src_buff": [
{
"type": "s",
Expand Down