_quickreduce_device_inline_ void set_sync_flag(uint32_t* flag_ptr,
uint32_t flag) {
__atomic_store_n(flag_ptr, flag, __ATOMIC_RELEASE);
}
_quickreduce_device_inline_ void wait_sync_flag(uint32_t* flag_ptr,
uint32_t flag) {
while (__atomic_load_n(flag_ptr, __ATOMIC_RELAXED) != flag) {
}
for (int r = 0; r < kWorldSize; r++) {
int32x4_t\* send_buffer =
reinterpret_cast<int32x4_t\*>(buffer_list\[r\] + comm_data0_offset +
rank \* Codec::kRankTransmittedTileSize);
codec.send(send_buffer, &tA\[r \* Codec::kRankAtoms\]);
}
\__syncthreads();
if (thread < kWorldSize) {
int r = thread;
uint32_t\* flag_ptr = reinterpret_cast<uint32_t\*>(
buffer_list\[r\] + comm_flags0_offset + rank \* sizeof(uint32_t));
set_sync_flag(flag_ptr, flag_color);// device scope release
}
// --------------------------------------------------------
// Phase-1B: Reduce the segment data from the communication buffers.
int32x4_t tR\[Codec::kRankAtoms\] = {};
{
// Read the data from the communication buffer.
int32x4_t\* recv_buffer =
reinterpret_cast<int32x4_t\*>(rank_buffer + comm_data0_offset);
uint32_t\* flag_ptr =
reinterpret_cast<uint32_t\*>(rank_buffer + comm_flags0_offset);
for (int r = 0; r < kWorldSize; r++) {
// Wait for the flags to be set.
if (thread == 0) {
wait_sync_flag(&flag_ptr\[r\], flag_color);//device scope relaxed
}
\__syncthreads();
// note: we reuse tA as temp buffer here
codec.recv(&recv_buffer, tA);
for (int i = 0; i < Codec::kRankAtoms; i++) {
packed_assign_add<T>(&tR\[i\], &tA\[i\]);
}
}
}