ceph_msg message module_RDMA_Performance testing tool source code process analysis_src_test_msgr_perf_msgr_server_client.cc

src/test/msgr/perf_msgr_server.cc
server, server, establish connection
MessengerServer
Messenger *Messenger::create type=async + rdma, lname=server -> Messenger *Messenger::create
  new AsyncMessenger -> AsyncMessenger::AsyncMessenger
    dispatch_queue
      DispatchQueue(CephContext *cct, Messenger *msgr, string & name) local fast distribution, throttle
    lookup_or_create_singleton_object<StackSingleton>
    single->ready(transport_type)
       NetworkStack::create(cct, type) -> std::make_shared<RDMAStack>(c, t)
        RDMAStack::RDMAStack
          NetworkStack(cct, t) structure, the thread pool has 3 threads by default,
            create_worker(cct, type, worker_id) -> NetworkStack::create_worker -> new RDMAWorker(c, worker_id) -> RDMAWorker::RDMAWorker
              Worker(CephContext *c, unsigned worker_id) Stack is a network IO framework that encapsulates all necessary basic network interfaces, and then it manages thread work. Different network backends such as posix, dpdk and even RDMA need to inherit the Stack class to implement the necessary interfaces. So this will make it easy for others to integrate the network backend into ceph. Otherwise, each backend would need to implement the entire Messenger logic, such as reconnection, policy handling, session maintenance...
            w->center.init -> EventCenter::init
              driver = new EpollDriver(cct)
              driver->init(this, nevent) -> int EpollDriver::init
                events = (struct epoll_event*)calloc
                epfd = epoll_create(1024)
                fcntl(epfd, F_SETFD, FD_CLOEXEC)
              file_events.resize(nevent) 5000
              pipe_cloexec(fds, 0) -> pipe2(pipefd, O_CLOEXEC | flags) creates pipes, all are non-blocking
              notify_receive_fd = fds[0] receiving end, reading end
              notify_send_fd = fds[1] sending end, writing end
            workers.push_back(w)
          Infiniband::Infiniband
            device_name(cct->_conf->ms_async_rdma_device_name) Gets the rdma device name from the configuration TODO
            port_num(cct->_conf->ms_async_rdma_port_num) defaults to 1 and the port is also obtained from the configuration file
            verify_prereq -> void Infiniband::verify_prereq
              RDMAV_HUGEPAGES_SAFE Set safe large pages
              ibv_fork_init
              getrlimit(RLIMIT_MEMLOCK, & amp;limit) Gets the resource limit configuration
          get_num_worker 3
          for
            w->set_dispatcher(rdma_dispatcher)
            w->set_ib(ib)
    stack->start()
      std::function<void ()> thread = add_thread(i) not executed yet
        w->center.set_owner()
          notify_handler = new C_handle_notify(this, cct)
          create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler) sets epoll listening on the read end of the previous pipeline
            driver->add_event(fd, event->mask, mask)
              epoll_ctl(epfd, op, fd, & amp;ee)
            event->read_cb = ctxt sets read event callback
        w->initialize()
        w->init_done()
          init_cond.notify_all() notifies waiting threads to complete initialization
        while (!w->done)
          w->center.process_events loop processing events -> int EventCenter::process_events
            driver->event_wait(fired_events, & amp;tv) -> int EpollDriver::event_wait
              epoll_wait writes c on the write end to trigger execution here
              fired_events[event_id].fd = e->data.fd
            event = _get_file_event(fired_events[event_id].fd)
            cb = event->read_cb readable callback
            cb->do_request(fired_events[event_id].fd) handles events
              r = read(fd_or_id, c, sizeof(c)) Read the characters sent by the peer of the pipe, such as: c
            cur_process.swap(external_events)
      spawn_worker(i, std::move(thread)) starts a new thread and returns to the join controller
      workers[i]->wait_for_init() Wait for all workers to complete initialization
    local_connection = ceph::make_ref<AsyncConnection> -> AsyncConnection::AsyncConnection
      ms_connection_ready_timeout connection establishment timeout
      ms_connection_idle_timeout inactive time, if the connection at both ends is idle for more than 15 minutes (no active reading and writing), the connection is destroyed
      read_handler = new C_handle_read(this) -> conn->process()
        void AsyncConnection::process()
      write_handler = new C_handle_write(this) -> conn->handle_write()
        void AsyncConnection::handle_write
      write_callback_handler = new C_handle_write_callback(this) -> AsyncConnection::handle_write_callback -> AsyncConnection::write passes callback when writing
      wakeup_handler = new C_time_wakeup(this) -> void AsyncConnection::wakeup_from -> void AsyncConnection::process()
      tick_handler = new C_tick_wakeup(this)-> void AsyncConnection::tick timer()
        protocol->fault() handles errors
    init_local_connection
      void ms_deliver_handle_fast_connect
    reap_handler = new C_handle_reap(this)
      void AsyncMessenger::reap_dead harvests dead connections
    processors.push_back(new Processor(this, stack->get_worker(i), cct))
      Processor::Processor
        listen_handler(new C_processor_accept(this))
          void Processor::accept() waits for the event to be triggered (triggered after the client executes connect)
            listen_sockets -> while (true)
              msgr->get_stack()->get_worker()
              listen_socket.accept( & amp;cli_socket, opts, & amp;addr, w)
              msgr->add_accept
msgr->set_default_policy
dummy_auth.auth_registry.refresh_config()
msgr->set_auth_server( & amp;dummy_auth) initialization function, called before binding
server.start()
  msgr->bind(addr)
    AsyncMessenger::bind
      bindv -> int r = p->bind
      int Processor::bind
        listen_sockets.resize
        conf->ms_bind_retry_count 3 retries
        worker->center.submit_to lambda []()->void anonymous function
          c->in_thread()
            pthread_equal(pthread_self(), owner) this thread
          C_submit_event<func> event(std::move(f), false) f=listen
            void do_request -> f() -> listen -> worker->listen(listen_addr, k, opts, & amp;listen_sockets[k]) -> int RDMAWorker::listen is executed triggered by an event
              ib->init() -> void Infiniband::init
                newDeviceList(cct)
                  ibv_get_device_list 4 network port
                  if (cct->_conf->ms_async_rdma_cm)
                  new Device(cct, device_list[i]) -> Device::Device
                    ibv_open_device
                    ibv_get_device_name
                    ibv_query_device reference device attribute: device_attr
                get_device queries the device list according to the configured device name. The first one is taken by default, such as: mlx5_0
                binding_port -> void Device::binding_port
                  new Port(cct, ctxt, port_id) Port ID starts from 1 -> Port::Port
                    ibv_query_port(ctxt, port_num, &port_attr)
                    ibv_query_gid(ctxt, port_num, gid_idx, & amp;gid)
                    ib_physical_port = device->active_port->get_port_num() gets the physical port
                new ProtectionDomain(cct, device) -> Infiniband::ProtectionDomain::ProtectionDomain -> ibv_alloc_pd(device->ctxt)
                support_srq = cct->_conf->ms_async_rdma_support_srq shared receive queue srq
                rx_queue_len = device->device_attr.max_srq_wr ends up being 4096
                tx_queue_len = device->device_attr.max_qp_wr - 1 The sending queue reserves 1 WR for beacon, such as: 1024 1_K overloaded operator
                device->device_attr.max_cqe device allows 4194303 completion event
                memory_manager = new MemoryManager(cct, device, pd) -> Infiniband::MemoryManager::MemoryManager 128K -> mem_pool -> boost::pool
                memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len) -> void Infiniband::MemoryManager::create_tx_pool
                  send = new Cluster(*this, size)
                  send->fill(tx_num) -> int Infiniband::MemoryManager::Cluster::fill
                    base = (char*)manager.malloc(bytes) -> void* Infiniband::MemoryManager::malloc -> std::malloc(size) Standard allocation or allocation of huge pages (huge_pages_malloc)
                    ibv_reg_mr register memory
                    new(chunk) Chunk
                    free_chunks.push_back(chunk)
                create_shared_receive_queue
                  ibv_create_srq
                post_chunks_to_rq -> int Infiniband::post_chunks_to_rq
                  chunk = get_memory_manager()->get_rx_buffer() -> return reinterpret_cast<Chunk *>(rxbuf_pool.malloc())
                  ibv_post_srq_recv
              dispatcher->polling_start() -> void RDMADispatcher::polling_start
                ib->get_memory_manager()->set_rx_stat_logger(perf_logger) -> void PerfCounters::set
                tx_cc = ib->create_comp_channel(cct) -> Infiniband::CompletionChannel* Infiniband::create_comp_channel -> new Infiniband::CompletionChannel
                tx_cq = ib->create_comp_queue(cct, tx_cc)
                  cq->init() -> int Infiniband::CompletionChannel::init
                    ibv_create_comp_channel creates the channel -> NetHandler(cct).set_nonblock(channel->fd) sets non-blocking
                t = std::thread( & amp;RDDMADispatcher::polling, this) starts the polling thread rdma-polling -> void RDMADispatcher::polling
                  tx_cq->poll_cq(MAX_COMPLETIONS, wc)
                  handle_tx_event -> tx_chunks.push_back(chunk) -> post_tx_buffer
                    tx -> void RDMAWorker::handle_pending_message()
                  handle_rx_event -> void RDMADispatcher::handle_rx_event
                    conn->post_chunks_to_rq(1) Add a memory block (WR) to the receive queue -> int Infiniband::post_chunks_to_rq
                      ibv_post_srq_recv | ibv_post_recv
                    polled[conn].push_back(*response)
                    qp->remove_rq_wr(chunk)
                    chunk->clear_qp()
                    pass_wc -> void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> & amp; & amp;v) -> notify() -> void RDMAConnectedSocketImpl::notify
                      eventfd_write(notify_fd, event_val) -> eventfd_read(notify_fd, & event_val) <- ssize_t RDMAConnectedSocketImpl::read <- process
              new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot)
              int r = p->listen(sa, opt) -> int RDMAServerSocketImpl::listen
                server_setup_socket = net.create_socket(sa.get_family(), true) -> socket_cloexec
                net.set_nonblock
                net.set_socket_options
                ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len()) system call
                ::listen backlog=512
              *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p))
            cond.notify_all() -> Notify waiting threads
          dispatch_event_external -> void EventCenter::dispatch_event_external
            external_events.push_back(e)
            wakeup()
              write(notify_send_fd, & amp;buf, sizeof(buf)) buf=c -> notify_receive_fd, wake up epoll_wait
          event.wait()
  msgr->add_dispatcher_head( & amp;dispatcher)
    ready()
      p->start() -> void Processor::start()
        worker->center.create_file_event listen_handler -> pro->accept() -> void Processor::accept()
  msgr->start() -> int AsyncMessenger::start()
  msgr->wait() -> void AsyncMessenger::wait()
    

The client establishes a connection, src/test/msgr/perf_msgr_client.cc, gdb --args ceph_perf_msgr_client 175.16.53.62:10001 10 1 1 0 4096
perf_msgr_client.cc -> main
  MessengerClient client(public_msgr_type, args[0], think_time)
  client.ready
    Messenger *msgr = Messenger::create
    msgr->set_default_policy -> Policy(bool l, bool s ...
    msgr->start() -> int AsyncMessenger::start()
      if (!did_bind) The client does not need bind
      set_myaddrs(newaddrs) -> void Messenger::set_endpoint_addr
      _init_local_connection() -> void _init_local_connection()
        ms_deliver_handle_fast_connect(local_connection.get()) -> void ms_deliver_handle_fast_connect notifies each fast scheduler of new connections. This function is called whenever a new connection is started or reconnected fast_dispatchers is empty?
    ConnectionRef conn = msgr->connect_to_osd(addrs) Connect to OSD -> ConnectionRef connect_to_osd -> ConnectionRef AsyncMessenger::connect_to
      AsyncConnectionRef conn = _lookup_conn(av) first looks for the connection in the connection pool
      conn = create_connect(av, type, false) Not found, create a new connection -> AsyncConnectionRef AsyncMessenger::create_connect
        Worker *w = stack->get_worker()
        auto conn = ceph::make_ref<AsyncConnection> -> AsyncConnection::AsyncConnection constructs the connection
          recv_buf = new char[2*recv_max_prefetch] uses buffer reading to avoid small read overhead
          new ProtocolV2(this) ceph v2 protocol, which supports address vectors based on v1. After the banner exchange, peers exchange their address vectors.
        conn->connect(addrs, type, target) -> void AsyncConnection::connect -> _connect -> void AsyncConnection::_connect()
          state = STATE_CONNECTING initial state machine
          protocol->connect() -> void ProtocolV2::connect() -> state = START_CONNECT
          center->dispatch_event_external(read_handler) -> trigger state machine advancement -> process
        conns[addrs] = conn saves the connection -> ceph::unordered_map<entity_addrvec_t, AsyncConnectionRef> conns unordered map
    ClientThread *t = new ClientThread(msgr, c, conn, msg_len, ops, think_time_us) -> ClientThread(Messenger *m Create a new client thread, construct data
      m->add_dispatcher_head( & amp;dispatcher)
      bufferptr ptr(msg_len) applies for data pointer -> buffer::ptr::ptr(unsigned l) : _off(0), _len(l)
         _raw = buffer::create(l).release() -> ceph::unique_leakable_ptr<buffer::raw> buffer::create, releases ownership of its stored pointer by returning its value and replacing it with a null pointer. This call does not destroy the managed object, but the unique_ptr object is relieved from the responsibility of deleting the object. Some other entity must be responsible for deleting the object at some point. To force the destruction of the pointed object, use the member function reset or perform an assignment to it
          buffer::create_aligned(len, sizeof(size_t)) -> ceph::unique_leakable_ptr<buffer::raw> buffer::create_aligned
            create_aligned_in_mempool -> mempool::mempool_buffer_anon Macro: f(buffer_anon) -> ceph::unique_leakable_ptr<buffer::raw> buffer::create_aligned_in_mempool, 1M: create_aligned_in_mempool (len=1048576, align=8, mempool=18)
              len >= CEPH_PAGE_SIZE * 2 If the length of the memory to be allocated is greater than 2 times CEPH_PAGE_SIZE(system page:sysconf(_SC_PAGESIZE))=8K, use native posix aligned allocation -> ceph::unique_leakable_ptr<buffer::raw>(new raw_posix_aligned(len , align)) -> raw_posix_aligned(unsigned l, unsigned _align) : raw(l)
                r = ::posix_memalign((void**)(void*) & amp;data, align, len);
              return raw_combined::create(len, align, mempool) -> src/common/buffer.cc -> static ceph::unique_leakable_ptr<buffer::raw> -> create(unsigned len,
                align = std::max<unsigned>(align, sizeof(void *)) = 8
                size_t rawlen = round_up_to(sizeof(buffer::raw_combined) 96
                size_t datalen = round_up_to(len, alignof(buffer::raw_combined)) 4096
                int r = ::posix_memalign((void**)(void*) & amp;ptr, align, rawlen + datalen); 96 + 4096
                new (ptr + datalen) raw_combined(ptr, len, align, mempool))
         _raw->nref.store(1, std::memory_order_release)
      memset(ptr.c_str(), 0, msg_len) set to 0
      data.append(ptr) fills data with all 0s -> void buffer::list::append -> void push_back(const ptr & amp; bp)
        _buffers.push_back(*ptr_node::create(bp).release())
        _len + = bp.length()
    msgrs.push_back(msgr)
    clients.push_back(t)
  Cycles::init() -> void Cycles::init() Calibrate clock frequency
  uint64_t start = Cycles::rdtsc()
  client.start() -> void start() -> clients[i]->create("client") -> void Thread::create
    pthread_create( & amp;thread_id, thread_attr, _entry_func, (void*)this) -> void *Thread::_entry_func
    void *entry() override rewrite entry
      hobject_t hobj(oid, oloc.key -> struct object_t
        void build_hash_cache() crc32c
      MOSDOp *m = new MOSDOp -> MOSDOp(int inc, long tid,
      bufferlist msg_data(data) copy constructor?, copy data
      m->write(0, msg_len, msg_data) -> void write writes data to the peer through message msg, offset=0, len=4096, buffer_list=bl(msg_data)
        add_simple_op(CEPH_OSD_OP_WRITE, off, len) -> ops.push_back(osd_op)
           osd_op.op.extent.offset = off
           osd_op.op.extent.length = len
           ops.push_back(osd_op)
        data.claim(bl)
          clear()
          claim_append(bl) -> void buffer::list::claim_append requires appending without copying?
            _buffers.splice_back(bl._buffers) Splice back
            bl._buffers.clear_and_dispose()
        header.data_off = off
      conn->send_message(m) -> void ProtocolV2::send_message(Message *m) ssize_t RDMAConnectedSocketImpl::send
        out_queue[m->get_priority()].emplace_back
        connection->center->dispatch_event_external(connection->write_handler) -> void AsyncConnection::handle_write
          const auto out_entry = _get_next_outgoing()
          more = !out_queue.empty() If the sending queue is not empty, more is true, indicating that there is more data to be sent.
          write_message(out_entry.m, more)
            ssize_t total_send_size = connection->outgoing_bl.length() 4406=310 + 4096
            connection->_try_send(more) -> cs.send(outgoing_bl, more) -> ssize_t RDMAConnectedSocketImpl::send
              size_t bytes = bl.length() 4KB:4406B=4096 + 310/1MB:1048886=1048576 + 310
              pending_bl.claim_append(bl) Change variables, what is bl left for? Recycle?
              ssize_t r = submit(more) ssize_t -> RDMAConnectedSocketImpl::submit
                pending_bl.length() 4406
                auto it = std::cbegin(pending_bl.buffers()) cbegin() and cend() are new to C++11. They return a const iterator and cannot be used to modify elements. Constant iterators
                while (it != pending_bl.buffers().end()) loop, slice, segment
                if (ib->is_tx_buffer(it->raw_c_str())) do not enter this branch
                msg/async/rdma: Use shared_ptr to manage Infiniband obj
                1. Do not use raw pointers to manage Infiniband obj
                2. Access Infiniband obj directly instead of from RDMA stack. This avoids caching RDMAStack obj in RDMAWorker and RDMADispatcher
                wait_copy_len + = it->length() = 32
                tx_buffers.push_back(ib->get_tx_chunk_by_buffer(it->raw_c_str()))
                size_t copied = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it);
                total_copied + = tx_copy_chunk(tx_buffers, wait_copy_len, copy_start, it) -> size_t RDMAConnectedSocketImpl::tx_copy_chunk
                  int RDMAWorker::get_reged_mem -> Get registered memory int Infiniband::get_tx_buffers -> get_send_buffers -> Infiniband::MemoryManager::Cluster::get_buffers
                    size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r 131072>4406 The obtained memory meets the required size, 1MB, 131072*9=1179648
                  auto chunk_idx = tx_buffers.size() 9 chunks
                  Chunk *current_chunk = tx_buffers[chunk_idx]
                  size_t real_len = current_chunk->write((char*)addr + slice_write_len, start->length() - slice_write_len) -> uint32_t Infiniband::MemoryManager::Chunk::write
                    memcpy(buffer + offset, buf, write_len) copy memory (loop copy)
                  write_len 4406
                pending_bl.clear() releases pb after copying
                post_work_request(tx_buffers)
                  tx_buffers.size() = 1
                  while (current_buffer != tx_buffers.end())
                  ibv_post_send -> ibv_poll_cq triggers the sender/receiver -> int Infiniband::CompletionQueue::poll_cq <- void RDMADispatcher::polling()
      msgr->shutdown()
  stop = Cycles::rdtsc()
...
NetworkStack::add_thread
  w->center.process_events -> C_handle_read -> conn->process() -> void AsyncConnection::process()
    worker->connect(target_addr, opts, & amp;cs) -> int RDMAWorker::connect
      ib->init()
      dispatcher->polling_start()
      new RDMAConnectedSocketImpl -> RDMAConnectedSocketImpl::RDMAConnectedSocketImpl
        read_handler(new C_handle_connection_read(this))
        established_handler(new C_handle_connection_established(this))
      p->try_connect(addr, opts) -> int RDMAConnectedSocketImpl::try_connect
        tcp_fd = net.nonblock_connect(peer_addr, opts.connect_bind_addr) -> generic_connect -> int NetHandler::generic_connect
          create_socket
          ::connect(s, addr.get_sockaddr(), addr.get_sockaddr_len()) syscall client connects to the server (socket) -> server triggers event (C_processor_accept) -> void Processor::accept()
          worker->center.create_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE, established_handler) -> established_handler -> int RDMAConnectedSocketImpl::handle_connection_established

      *socket = ConnectedSocket(std::move(csi))
    center->create_file_event(cs.fd(), EVENT_READABLE, read_handler) -> state = STATE_CONNECTING_RE -> void AsyncConnection::process() (return to process)
    ...
    case STATE_CONNECTING_RE
      cs.is_connected()
      center->create_file_event EVENT_WRITABLE read_handler -> process
      logger->tinc -> void PerfCounters::tinc performance statistics (latency statistics)
    ...
    protocol->read_event() -> switch (state) -> determine status -> START_ACCEPT -> run_continuation(CONTINUATION(start_server_banner_exchange)) -> message state machine -> class ProtocolV2: public Protocol
      CONTINUATION_RUN(continuation)
      CtPtr ProtocolV2::read -> ssize_t AsyncConnection::read -> read_until
        read_bulk -> nread = cs.read(buf, len) -> ssize_t RDMAConnectedSocketImpl::read -> eventfd_read(notify_fd, & amp;event_val)
          read = read_buffers(buf,len) -> ssize_t RDMAConnectedSocketImpl::read_buffers
            buffer_prefetch() read-ahead -> void RDMAConnectedSocketImpl::buffer_prefetch
              ibv_wc* response = &cqe[i]
              chunk->prepare_read(response->byte_len)
              buffers.push_back(chunk)
            tmp = (*pchunk)->read(buf + read_size, len - read_size) -> uint32_t Infiniband::MemoryManager::Chunk::read
              memcpy(buf, buffer + offset, read_len);
            (*pchunk)->reset_read_chunk() sets both offset and bounds to 0
            dispatcher->post_chunk_to_pool(*pchunk) -> void RDMADispatcher::post_chunk_to_pool
              ib->post_chunk_to_pool(chunk)
            update_post_backlog -> void RDMAConnectedSocketImpl::update_post_backlog
            
Timeout processing:
new C_handle_reap(this)
  local_worker->create_time_event( ReapDeadConnectionMaxPeriod...
    reap_dead


Device attributes: device_attr
(gdb) p device_attr
$17 = {<!-- -->
  fw_ver = "16.33.1048", '\000' <repeats 53 times>,
  node_guid = 8550064101420093112,
  sys_image_guid = 8550064101420093112,
  max_mr_size = 18446744073709551615,
  page_size_cap = 18446744073709547520,
  vendor_id = 713,
  vendor_part_id = 4119,
  hw_ver = 0,
  max_qp = 131072,
  max_qp_wr = 32768,
---Type <return> to continue, or q <return> to quit---
  device_cap_flags = 3983678518,
  max_sge = 30,
  max_sge_rd = 30,
  max_cq = 16777216,
  max_cqe = 4194303,
  max_mr = 16777216,
  max_pd = 8388608,
  max_qp_rd_atom = 16,
  max_ee_rd_atom = 0,
  max_res_rd_atom = 2097152,
  max_qp_init_rd_atom = 16,
---Type <return> to continue, or q <return> to quit---
  max_ee_init_rd_atom = 0,
  atomic_cap = IBV_ATOMIC_HCA,
  max_ee = 0,
  max_rdd = 0,
  max_mw = 16777216,
  max_raw_ipv6_qp = 0,
  max_raw_ethy_qp = 0,
  max_mcast_grp = 2097152,
  max_mcast_qp_attach = 240,
  max_total_mcast_qp_attach = 503316480,
  max_ah = 2147483647,
---Type <return> to continue, or q <return> to quit---
  max_fmr = 0,
  max_map_per_fmr = 0,
  max_srq = 8388608,
  max_srq_wr = 32767,
  max_srq_sge = 31,
  max_pkeys = 128,
  local_ca_ack_delay = 16 '\020',
  phys_port_cnt = 1 '\001'
}