src/test/msgr/perf_msgr_server.cc server, server, establish connection MessengerServer Messenger *Messenger::create type=async + rdma, lname=server -> Messenger *Messenger::create new AsyncMessenger -> AsyncMessenger::AsyncMessenger dispatch_queue DispatchQueue(CephContext *cct, Messenger *msgr, string & name) local fast distribution, throttle lookup_or_create_singleton_object<StackSingleton> single->ready(transport_type) NetworkStack::create(cct, type) -> std::make_shared<RDMAStack>(c, t) RDMAStack::RDMAStack NetworkStack(cct, t) structure, the thread pool has 3 threads by default, create_worker(cct, type, worker_id) -> NetworkStack::create_worker -> new RDMAWorker(c, worker_id) -> RDMAWorker::RDMAWorker Worker(CephContext *c, unsigned worker_id) Stack is a network IO framework that encapsulates all necessary basic network interfaces, and then it manages thread work. Different network backends such as posix, dpdk and even RDMA need to inherit the Stack class to implement the necessary interfaces. So this will make it easy for others to integrate the network backend into ceph. Otherwise, each backend would need to implement the entire Messenger logic, such as reconnection, policy handling, session maintenance... w->center.init -> EventCenter::init driver = new EpollDriver(cct) driver->init(this, nevent) -> int EpollDriver::init events = (struct epoll_event*)calloc epfd = epoll_create(1024) fcntl(epfd, F_SETFD, FD_CLOEXEC) file_events.resize(nevent) 5000 pipe_cloexec(fds, 0) -> pipe2(pipefd, O_CLOEXEC | flags) creates pipes, all are non-blocking notify_receive_fd = fds[0] receiving end, reading end notify_send_fd = fds[1] sending end, writing end workers.push_back(w) Infiniband::Infiniband device_name(cct->_conf->ms_async_rdma_device_name) Gets the rdma device name from the configuration TODO port_num(cct->_conf->ms_async_rdma_port_num) defaults to 1 and the port is also obtained from the configuration file verify_prereq -> void Infiniband::verify_prereq RDMAV_HUGEPAGES_SAFE Set safe large pages ibv_fork_init getrlimit(RLIMIT_MEMLOCK, & amp;limit) Gets the resource limit configuration get_num_worker 3 for w->set_dispatcher(rdma_dispatcher) w->set_ib(ib) stack->start() std::function<void ()> thread = add_thread(i) not executed yet w->center.set_owner() notify_handler = new C_handle_notify(this, cct) create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler) sets epoll listening on the read end of the previous pipeline driver->add_event(fd, event->mask, mask) epoll_ctl(epfd, op, fd, & amp;ee) event->read_cb = ctxt sets read event callback w->initialize() w->init_done() init_cond.notify_all() notifies waiting threads to complete initialization while (!w->done) w->center.process_events loop processing events -> int EventCenter::process_events driver->event_wait(fired_events, & amp;tv) -> int EpollDriver::event_wait epoll_wait writes c on the write end to trigger execution here fired_events[event_id].fd = e->data.fd event = _get_file_event(fired_events[event_id].fd) cb = event->read_cb readable callback cb->do_request(fired_events[event_id].fd) handles events r = read(fd_or_id, c, sizeof(c)) Read the characters sent by the peer of the pipe, such as: c cur_process.swap(external_events) spawn_worker(i, std::move(thread)) starts a new thread and returns to the join controller workers[i]->wait_for_init() Wait for all workers to complete initialization local_connection = ceph::make_ref<AsyncConnection> -> AsyncConnection::AsyncConnection ms_connection_ready_timeout connection establishment timeout ms_connection_idle_timeout inactive time, if the connection at both ends is idle for more than 15 minutes (no active reading and writing), the connection is destroyed read_handler = new C_handle_read(this) -> conn->process() void AsyncConnection::process() write_handler = new C_handle_write(this) -> conn->handle_write() void AsyncConnection::handle_write write_callback_handler = new C_handle_write_callback(this) -> AsyncConnection::handle_write_callback -> AsyncConnection::write passes callback when writing wakeup_handler = new C_time_wakeup(this) -> void AsyncConnection::wakeup_from -> void AsyncConnection::process() tick_handler = new C_tick_wakeup(this)-> void AsyncConnection::tick timer() protocol->fault() handles errors init_local_connection void ms_deliver_handle_fast_connect reap_handler = new C_handle_reap(this) void AsyncMessenger::reap_dead harvests dead connections processors.push_back(new Processor(this, stack->get_worker(i), cct)) Processor::Processor listen_handler(new C_processor_accept(this)) void Processor::accept() waits for the event to be triggered (triggered after the client executes connect) listen_sockets -> while (true) msgr->get_stack()->get_worker() listen_socket.accept( & amp;cli_socket, opts, & amp;addr, w) msgr->add_accept msgr->set_default_policy dummy_auth.auth_registry.refresh_config() msgr->set_auth_server( & amp;dummy_auth) initialization function, called before binding server.start() msgr->bind(addr) AsyncMessenger::bind bindv -> int r = p->bind int Processor::bind listen_sockets.resize conf->ms_bind_retry_count 3 retries worker->center.submit_to lambda []()->void anonymous function c->in_thread() pthread_equal(pthread_self(), owner) this thread C_submit_event<func> event(std::move(f), false) f=listen void do_request -> f() -> listen -> worker->listen(listen_addr, k, opts, & amp;listen_sockets[k]) -> int RDMAWorker::listen is executed triggered by an event ib->init() -> void Infiniband::init newDeviceList(cct) ibv_get_device_list 4 network port if (cct->_conf->ms_async_rdma_cm) new Device(cct, device_list[i]) -> Device::Device ibv_open_device ibv_get_device_name ibv_query_device reference device attribute: device_attr get_device queries the device list according to the configured device name. The first one is taken by default, such as: mlx5_0 binding_port -> void Device::binding_port new Port(cct, ctxt, port_id) Port ID starts from 1 -> Port::Port ibv_query_port(ctxt, port_num, &port_attr) ibv_query_gid(ctxt, port_num, gid_idx, & amp;gid) ib_physical_port = device->active_port->get_port_num() gets the physical port new ProtectionDomain(cct, device) -> Infiniband::ProtectionDomain::ProtectionDomain -> ibv_alloc_pd(device->ctxt) support_srq = cct->_conf->ms_async_rdma_support_srq shared receive queue srq rx_queue_len = device->device_attr.max_srq_wr ends up being 4096 tx_queue_len = device->device_attr.max_qp_wr - 1 The sending queue reserves 1 WR for beacon, such as: 1024 1_K overloaded operator device->device_attr.max_cqe device allows 4194303 completion event memory_manager = new MemoryManager(cct, device, pd) -> Infiniband::MemoryManager::MemoryManager 128K -> mem_pool -> boost::pool memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len) -> void Infiniband::MemoryManager::create_tx_pool send = new Cluster(*this, size) send->fill(tx_num) -> int Infiniband::MemoryManager::Cluster::fill base = (char*)manager.malloc(bytes) -> void* Infiniband::MemoryManager::malloc -> std::malloc(size) Standard allocation or allocation of huge pages (huge_pages_malloc) ibv_reg_mr register memory new(chunk) Chunk free_chunks.push_back(chunk) create_shared_receive_queue ibv_create_srq post_chunks_to_rq -> int Infiniband::post_chunks_to_rq chunk = get_memory_manager()->get_rx_buffer() -> return reinterpret_cast<Chunk *>(rxbuf_pool.malloc()) ibv_post_srq_recv dispatcher->polling_start() -> void RDMADispatcher::polling_start ib->get_memory_manager()->set_rx_stat_logger(perf_logger) -> void PerfCounters::set tx_cc = ib->create_comp_channel(cct) -> Infiniband::CompletionChannel* Infiniband::create_comp_channel -> new Infiniband::CompletionChannel tx_cq = ib->create_comp_queue(cct, tx_cc) cq->init() -> int Infiniband::CompletionChannel::init ibv_create_comp_channel creates the channel -> NetHandler(cct).set_nonblock(channel->fd) sets non-blocking t = std::thread( & amp;RDDMADispatcher::polling, this) starts the polling thread rdma-polling -> void RDMADispatcher::polling tx_cq->poll_cq(MAX_COMPLETIONS, wc) handle_tx_event -> tx_chunks.push_back(chunk) -> post_tx_buffer tx -> void RDMAWorker::handle_pending_message() handle_rx_event -> void RDMADispatcher::handle_rx_event conn->post_chunks_to_rq(1) Add a memory block (WR) to the receive queue -> int Infiniband::post_chunks_to_rq ibv_post_srq_recv | ibv_post_recv polled[conn].push_back(*response) qp->remove_rq_wr(chunk) chunk->clear_qp() pass_wc -> void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> & amp; & amp;v) -> notify() -> void RDMAConnectedSocketImpl::notify eventfd_write(notify_fd, event_val) -> eventfd_read(notify_fd, & event_val) <- ssize_t RDMAConnectedSocketImpl::read <- process new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot) int r = p->listen(sa, opt) -> int RDMAServerSocketImpl::listen server_setup_socket = net.create_socket(sa.get_family(), true) -> socket_cloexec net.set_nonblock net.set_socket_options ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len()) system call ::listen backlog=512 *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p)) cond.notify_all() -> Notify waiting threads dispatch_event_external -> void EventCenter::dispatch_event_external external_events.push_back(e) wakeup() write(notify_send_fd, & amp;buf, sizeof(buf)) buf=c -> notify_receive_fd, wake up epoll_wait event.wait() msgr->add_dispatcher_head( & amp;dispatcher) ready() p->start() -> void Processor::start() worker->center.create_file_event listen_handler -> pro->accept() -> void Processor::accept() msgr->start() -> int AsyncMessenger::start() msgr->wait() -> void AsyncMessenger::wait() The client establishes a connection, src/test/msgr/perf_msgr_client.cc, gdb --args ceph_perf_msgr_client 175.16.53.62:10001 10 1 1 0 4096 perf_msgr_client.cc -> main MessengerClient client(public_msgr_type, args[0], think_time) client.ready Messenger *msgr = Messenger::create msgr->set_default_policy -> Policy(bool l, bool s ... msgr->start() -> int AsyncMessenger::start() if (!did_bind) The client does not need bind set_myaddrs(newaddrs) -> void Messenger::set_endpoint_addr _init_local_connection() -> void _init_local_connection() ms_deliver_handle_fast_connect(local_connection.get()) -> void ms_deliver_handle_fast_connect notifies each fast scheduler of new connections. This function is called whenever a new connection is started or reconnected fast_dispatchers is empty? ConnectionRef conn = msgr->connect_to_osd(addrs) Connect to OSD -> ConnectionRef connect_to_osd -> ConnectionRef AsyncMessenger::connect_to AsyncConnectionRef conn = _lookup_conn(av) first looks for the connection in the connection pool conn = create_connect(av, type, false) Not found, create a new connection -> AsyncConnectionRef AsyncMessenger::create_connect Worker *w = stack->get_worker() auto conn = ceph::make_ref<AsyncConnection> -> AsyncConnection::AsyncConnection constructs the connection recv_buf = new char[2*recv_max_prefetch] uses buffer reading to avoid small read overhead new ProtocolV2(this) ceph v2 protocol, which supports address vectors based on v1. After the banner exchange, peers exchange their address vectors. conn->connect(addrs, type, target) -> void AsyncConnection::connect -> _connect -> void AsyncConnection::_connect() state = STATE_CONNECTING initial state machine protocol->connect() -> void ProtocolV2::connect() -> state = START_CONNECT center->dispatch_event_external(read_handler) -> trigger state machine advancement -> process conns[addrs] = conn saves the connection -> ceph::unordered_map<entity_addrvec_t, AsyncConnectionRef> conns unordered map ClientThread *t = new ClientThread(msgr, c, conn, msg_len, ops, think_time_us) -> ClientThread(Messenger *m Create a new client thread, construct data m->add_dispatcher_head( & amp;dispatcher) bufferptr ptr(msg_len) applies for data pointer -> buffer::ptr::ptr(unsigned l) : _off(0), _len(l) _raw = buffer::create(l).release() -> ceph::unique_leakable_ptr<buffer::raw> buffer::create, releases ownership of its stored pointer by returning its value and replacing it with a null pointer. This call does not destroy the managed object, but the unique_ptr object is relieved from the responsibility of deleting the object. Some other entity must be responsible for deleting the object at some point. To force the destruction of the pointed object, use the member function reset or perform an assignment to it buffer::create_aligned(len, sizeof(size_t)) -> ceph::unique_leakable_ptr<buffer::raw> buffer::create_aligned create_aligned_in_mempool -> mempool::mempool_buffer_anon Macro: f(buffer_anon) -> ceph::unique_leakable_ptr<buffer::raw> buffer::create_aligned_in_mempool, 1M: create_aligned_in_mempool (len=1048576, align=8, mempool=18) len >= CEPH_PAGE_SIZE * 2 If the length of the memory to be allocated is greater than 2 times CEPH_PAGE_SIZE(system page:sysconf(_SC_PAGESIZE))=8K, use native posix aligned allocation -> ceph::unique_leakable_ptr<buffer::raw>(new raw_posix_aligned(len , align)) -> raw_posix_aligned(unsigned l, unsigned _align) : raw(l) r = ::posix_memalign((void**)(void*) & amp;data, align, len); return raw_combined::create(len, align, mempool) -> src/common/buffer.cc -> static ceph::unique_leakable_ptr<buffer::raw> -> create(unsigned len, align = std::max<unsigned>(align, sizeof(void *)) = 8 size_t rawlen = round_up_to(sizeof(buffer::raw_combined) 96 size_t datalen = round_up_to(len, alignof(buffer::raw_combined)) 4096 int r = ::posix_memalign((void**)(void*) & amp;ptr, align, rawlen + datalen); 96 + 4096 new (ptr + datalen) raw_combined(ptr, len, align, mempool)) _raw->nref.store(1, std::memory_order_release) memset(ptr.c_str(), 0, msg_len) set to 0 data.append(ptr) fills data with all 0s -> void buffer::list::append -> void push_back(const ptr & amp; bp) _buffers.push_back(*ptr_node::create(bp).release()) _len + = bp.length() msgrs.push_back(msgr) clients.push_back(t) Cycles::init() -> void Cycles::init() Calibrate clock frequency uint64_t start = Cycles::rdtsc() client.start() -> void start() -> clients[i]->create("client") -> void Thread::create pthread_create( & amp;thread_id, thread_attr, _entry_func, (void*)this) -> void *Thread::_entry_func void *entry() override rewrite entry hobject_t hobj(oid, oloc.key -> struct object_t void build_hash_cache() crc32c MOSDOp *m = new MOSDOp -> MOSDOp(int inc, long tid, bufferlist msg_data(data) copy constructor?, copy data m->write(0, msg_len, msg_data) -> void write writes data to the peer through message msg, offset=0, len=4096, buffer_list=bl(msg_data) add_simple_op(CEPH_OSD_OP_WRITE, off, len) -> ops.push_back(osd_op) osd_op.op.extent.offset = off osd_op.op.extent.length = len ops.push_back(osd_op) data.claim(bl) clear() claim_append(bl) -> void buffer::list::claim_append requires appending without copying? _buffers.splice_back(bl._buffers) Splice back bl._buffers.clear_and_dispose() header.data_off = off conn->send_message(m) -> void ProtocolV2::send_message(Message *m) ssize_t RDMAConnectedSocketImpl::send out_queue[m->get_priority()].emplace_back connection->center->dispatch_event_external(connection->write_handler) -> void AsyncConnection::handle_write const auto out_entry = _get_next_outgoing() more = !out_queue.empty() If the sending queue is not empty, more is true, indicating that there is more data to be sent. write_message(out_entry.m, more) ssize_t total_send_size = connection->outgoing_bl.length() 4406=310 + 4096 connection->_try_send(more) -> cs.send(outgoing_bl, more) -> ssize_t RDMAConnectedSocketImpl::send size_t bytes = bl.length() 4KB:4406B=4096 + 310/1MB:1048886=1048576 + 310 pending_bl.claim_append(bl) Change variables, what is bl left for? Recycle? ssize_t r = submit(more) ssize_t -> RDMAConnectedSocketImpl::submit pending_bl.length() 4406 auto it = std::cbegin(pending_bl.buffers()) cbegin() and cend() are new to C++11. They return a const iterator and cannot be used to modify elements. Constant iterators while (it != pending_bl.buffers().end()) loop, slice, segment if (ib->is_tx_buffer(it->raw_c_str())) do not enter this branch msg/async/rdma: Use shared_ptr to manage Infiniband obj 1. Do not use raw pointers to manage Infiniband obj 2. Access Infiniband obj directly instead of from RDMA stack. This avoids caching RDMAStack obj in RDMAWorker and RDMADispatcher wait_copy_len + = it->length() = 32 tx_buffers.push_back(ib->get_tx_chunk_by_buffer(it->raw_c_str())) size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it); total_copied + = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it) -> size_t RDMAConnectedSocketImpl::tx_copy_chunk int RDMAWorker::get_reged_mem -> Get registered memory int Infiniband::get_tx_buffers -> get_send_buffers -> Infiniband::MemoryManager::Cluster::get_buffers size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r 131072>4406 The obtained memory meets the required size, 1MB, 131072*9=1179648 auto chunk_idx = tx_buffers.size() 9 chunks Chunk *current_chunk = tx_buffers[chunk_idx] size_t real_len = current_chunk->write((char*)addr + slice_write_len, start->length() - slice_write_len) -> uint32_t Infiniband::MemoryManager::Chunk::write memcpy(buffer + offset, buf, write_len) copy memory (loop copy) write_len 4406 pending_bl.clear() releases pb after copying post_work_request(tx_buffers) tx_buffers.size() = 1 while (current_buffer != tx_buffers.end()) ibv_post_send -> ibv_poll_cq triggers the sender/receiver -> int Infiniband::CompletionQueue::poll_cq <- void RDMADispatcher::polling() msgr->shutdown() stop = Cycles::rdtsc() ... NetworkStack::add_thread w->center.process_events -> C_handle_read -> conn->process() -> void AsyncConnection::process() worker->connect(target_addr, opts, & amp;cs) -> int RDMAWorker::connect ib->init() dispatcher->polling_start() new RDMAConnectedSocketImpl -> RDMAConnectedSocketImpl::RDMAConnectedSocketImpl read_handler(new C_handle_connection_read(this)) established_handler(new C_handle_connection_established(this)) p->try_connect(addr, opts) -> int RDMAConnectedSocketImpl::try_connect tcp_fd = net.nonblock_connect(peer_addr, opts.connect_bind_addr) -> generic_connect -> int NetHandler::generic_connect create_socket ::connect(s, addr.get_sockaddr(), addr.get_sockaddr_len()) syscall client connects to the server (socket) -> server triggers event (C_processor_accept) -> void Processor::accept() worker->center.create_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE, established_handler) -> established_handler -> int RDMAConnectedSocketImpl::handle_connection_established *socket = ConnectedSocket(std::move(csi)) center->create_file_event(cs.fd(), EVENT_READABLE, read_handler) -> state = STATE_CONNECTING_RE -> void AsyncConnection::process() (return to process) ... case STATE_CONNECTING_RE cs.is_connected() center->create_file_event EVENT_WRITABLE read_handler -> process logger->tinc -> void PerfCounters::tinc performance statistics (latency statistics) ... protocol->read_event() -> switch (state) -> determine status -> START_ACCEPT -> run_continuation(CONTINUATION(start_server_banner_exchange)) -> message state machine -> class ProtocolV2: public Protocol CONTINUATION_RUN(continuation) CtPtr ProtocolV2::read -> ssize_t AsyncConnection::read -> read_until read_bulk -> nread = cs.read(buf, len) -> ssize_t RDMAConnectedSocketImpl::read -> eventfd_read(notify_fd, & amp;event_val) read = read_buffers(buf,len) -> ssize_t RDMAConnectedSocketImpl::read_buffers buffer_prefetch() read-ahead -> void RDMAConnectedSocketImpl::buffer_prefetch ibv_wc* response = &cqe[i] chunk->prepare_read(response->byte_len) buffers.push_back(chunk) tmp = (*pchunk)->read(buf + read_size, len - read_size) -> uint32_t Infiniband::MemoryManager::Chunk::read memcpy(buf, buffer + offset, read_len); (*pchunk)->reset_read_chunk() sets both offset and bounds to 0 dispatcher->post_chunk_to_pool(*pchunk) -> void RDMADispatcher::post_chunk_to_pool ib->post_chunk_to_pool(chunk) update_post_backlog -> void RDMAConnectedSocketImpl::update_post_backlog Timeout processing: new C_handle_reap(this) local_worker->create_time_event( ReapDeadConnectionMaxPeriod... reap_dead Device attributes: device_attr (gdb) p device_attr $17 = {<!-- --> fw_ver = "16.33.1048", '\000' <repeats 53 times>, node_guid = 8550064101420093112, sys_image_guid = 8550064101420093112, max_mr_size = 18446744073709551615, page_size_cap = 18446744073709547520, vendor_id = 713, vendor_part_id = 4119, hw_ver = 0, max_qp = 131072, max_qp_wr = 32768, ---Type <return> to continue, or q <return> to quit--- device_cap_flags = 3983678518, max_sge = 30, max_sge_rd = 30, max_cq = 16777216, max_cqe = 4194303, max_mr = 16777216, max_pd = 8388608, max_qp_rd_atom = 16, max_ee_rd_atom = 0, max_res_rd_atom = 2097152, max_qp_init_rd_atom = 16, ---Type <return> to continue, or q <return> to quit--- max_ee_init_rd_atom = 0, atomic_cap = IBV_ATOMIC_HCA, max_ee = 0, max_rdd = 0, max_mw = 16777216, max_raw_ipv6_qp = 0, max_raw_ethy_qp = 0, max_mcast_grp = 2097152, max_mcast_qp_attach = 240, max_total_mcast_qp_attach = 503316480, max_ah = 2147483647, ---Type <return> to continue, or q <return> to quit--- max_fmr = 0, max_map_per_fmr = 0, max_srq = 8388608, max_srq_wr = 32767, max_srq_sge = 31, max_pkeys = 128, local_ca_ack_delay = 16 '\020', phys_port_cnt = 1 '\001' }