Skip to content

Commit 9f7bce0

Browse files
committed
allow disabling send side zerocopy for rdma
1 parent 76d9390 commit 9f7bce0

5 files changed

Lines changed: 58 additions & 18 deletions

File tree

example/rdma_performance/client.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ DEFINE_int32(expected_qps, 0, "The expected QPS");
3838
DEFINE_int32(max_thread_num, 16, "The max number of threads are used");
3939
DEFINE_int32(attachment_size, -1, "Attachment size is used (in Bytes)");
4040
DEFINE_bool(echo_attachment, false, "Select whether attachment should be echo");
41+
DEFINE_bool(attachment_as_userdata, false, "Append attachment as user_data");
4142
DEFINE_string(connection_type, "single", "Connection type of the channel");
4243
DEFINE_string(protocol, "baidu_std", "Protocol type.");
4344
DEFINE_string(servers, "0.0.0.0:8002+0.0.0.0:8002", "IP Address of servers");
@@ -86,7 +87,12 @@ class PerformanceTest {
8687
if (attachment_size > 0) {
8788
_addr = malloc(attachment_size);
8889
butil::fast_rand_bytes(_addr, attachment_size);
89-
_attachment.append(_addr, attachment_size);
90+
if (FLAGS_attachment_as_userdata) {
91+
brpc::rdma::RegisterMemoryForRdma(_addr, attachment_size);
92+
_attachment.append_user_data(_addr, attachment_size, NULL);
93+
} else {
94+
_attachment.append(_addr, attachment_size);
95+
}
9096
}
9197
_echo_attachment = echo_attachment;
9298
}

src/brpc/rdma/rdma_endpoint.cpp

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ extern bool g_skip_rdma_init;
4949

5050
DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
5151
DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
52+
DEFINE_bool(rdma_send_zerocopy, true, "Enable zerocopy for send side");
5253
DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
5354
DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
5455
DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
@@ -801,29 +802,45 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
801802
wr.sg_list = sglist;
802803
wr.opcode = IBV_WR_SEND_WITH_IMM;
803804

804-
RdmaIOBuf* data = (RdmaIOBuf*)from[current];
805805
size_t sge_index = 0;
806806
while (sge_index < (uint32_t)max_sge &&
807807
this_len < _remote_recv_block_size) {
808-
if (data->size() == 0) {
808+
if (from[current]->size() == 0) {
809809
// The current IOBuf is empty, find next one
810810
++current;
811811
if (current == ndata) {
812812
break;
813813
}
814-
data = (RdmaIOBuf*)from[current];
815814
continue;
816815
}
817816

818-
ssize_t len = data->cut_into_sglist_and_iobuf(
819-
sglist, &sge_index, to, max_sge,
820-
_remote_recv_block_size - this_len);
821-
if (len < 0) {
822-
return -1;
817+
ssize_t len = 0;
818+
if (FLAGS_rdma_send_zerocopy) {
819+
ssize_t len = ((RdmaIOBuf*)from[current])->cut_into_sglist_and_iobuf(
820+
sglist, &sge_index, to, max_sge,
821+
_remote_recv_block_size - this_len);
822+
if (len < 0) {
823+
return -1;
824+
}
825+
this_len += len;
826+
total_len += len;
827+
} else {
828+
len = _remote_recv_block_size - this_len;
829+
void* buf = AllocBlock(len);
830+
if (!buf) {
831+
return -1;
832+
}
833+
len = from[current]->copy_to(buf, len);
834+
from[current]->cutn(to, len);
835+
sglist[sge_index].length = len;
836+
sglist[sge_index].addr = (uint64_t)buf;
837+
sglist[sge_index].lkey = GetLKey(buf);
838+
++sge_index;
839+
this_len += len;
840+
total_len += len;
841+
_sbuf_data[_sq_current] = buf;
842+
break;
823843
}
824-
CHECK(len > 0);
825-
this_len += len;
826-
total_len += len;
827844
}
828845
if (this_len == 0) {
829846
continue;
@@ -951,6 +968,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
951968
uint32_t acks = butil::NetToHost32(wc.imm_data);
952969
uint32_t num = acks;
953970
while (num > 0) {
971+
if (!FLAGS_rdma_send_zerocopy) {
972+
DeallocBlock(_sbuf_data[_sq_sent]);
973+
}
954974
_sbuf[_sq_sent++].clear();
955975
if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
956976
_sq_sent = 0;
@@ -1139,6 +1159,10 @@ int RdmaEndpoint::AllocateResources() {
11391159
if (_rbuf.size() != _rq_size) {
11401160
return -1;
11411161
}
1162+
_sbuf_data.resize(_sq_size, NULL);
1163+
if (_sbuf_data.size() != _sq_size) {
1164+
return -1;
1165+
}
11421166
_rbuf_data.resize(_rq_size, NULL);
11431167
if (_rbuf_data.size() != _rq_size) {
11441168
return -1;

src/brpc/rdma/rdma_endpoint.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ friend class brpc::Socket;
213213
// Act as sendbuf and recvbuf, but requires no memcpy
214214
std::vector<butil::IOBuf> _sbuf;
215215
std::vector<butil::IOBuf> _rbuf;
216+
// Data address of _sbuf
217+
std::vector<void*> _sbuf_data;
216218
// Data address of _rbuf
217219
std::vector<void*> _rbuf_data;
218220
// Remote block size for receiving

src/brpc/rdma/rdma_helper.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -643,12 +643,15 @@ ibv_pd* GetRdmaPd() {
643643
}
644644

645645
uint32_t GetLKey(void* buf) {
646-
BAIDU_SCOPED_LOCK(*g_user_mrs_lock);
647-
ibv_mr** mr_ptr = g_user_mrs->seek(buf);
648-
if (mr_ptr) {
649-
return (*mr_ptr)->lkey;
646+
uint32_t lkey = GetRegionId(buf);
647+
if (lkey == 0) {
648+
BAIDU_SCOPED_LOCK(*g_user_mrs_lock);
649+
ibv_mr** mr_ptr = g_user_mrs->seek(buf);
650+
if (mr_ptr) {
651+
return (*mr_ptr)->lkey;
652+
}
650653
}
651-
return 0;
654+
return lkey;
652655
}
653656

654657
ibv_gid GetRdmaGid() {

test/brpc_rdma_unittest.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ struct HelloMessage {
6868
};
6969

7070
DECLARE_bool(rdma_trace_verbose);
71+
DECLARE_bool(rdma_send_zerocopy);
7172
DECLARE_int32(rdma_memory_pool_max_regions);
7273
extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
7374
extern int (*IbvDestroyCq)(ibv_cq*);
@@ -1873,7 +1874,11 @@ TEST_F(RdmaTest, send_rpcs_with_user_defined_iobuf) {
18731874
google::protobuf::Closure* done = DoNothing();
18741875
::test::EchoService::Stub(&channel).Echo(&cntl[0], &req[0], &res[0], done);
18751876
bthread_id_join(cntl[0].call_id());
1876-
ASSERT_EQ(ERDMAMEM, cntl[0].ErrorCode());
1877+
if (rdma::FLAGS_rdma_send_zerocopy) {
1878+
ASSERT_EQ(ERDMAMEM, cntl[0].ErrorCode());
1879+
} else {
1880+
ASSERT_EQ(0, cntl[0].ErrorCode());
1881+
}
18771882
attach.clear();
18781883
sleep(2); // wait for client recover from EHOSTDOWN
18791884
cntl[0].Reset();

0 commit comments

Comments
 (0)