Subscriber of Fast DDS

Directory

  • Subscriber
    • SubscriberQos
    • SubscriberListener
    • Create Subscriber
  • DataReader
  • SampleInfo
  • Read data
    • Get data through callback
    • Processed by starting a waiting thread


Subscriber plays the role of a container, and there can be many DataReaders in it. They use the same SubscriberQos configuration of Subscriber. Subscriber can carry DataReader objects of different Topics and data types.

Subscriber

SubscriberQos

The default Qos configuration can be obtained through the get_default_subscriber_qos() function of the DomainParticipant instance.

SubscriberListener

The change of Subscriber’s status will trigger the callback function call of SubscriberListener. Users can implement custom callbacks through inheritance. Added callback: on_data_on_readers()

Create Subscriber

Created through DomainParticipant’s create_subscriber() function, the SubscriberQos parameter is required, and the SubscriberListener and StatusMask parameters are optional.

//Create DomainParticipant instance participant
Subscriber* subscriber_with_default_qos = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
if (nullptr == subscriber_with_default_qos) {<!-- -->
    // Error
    return;
}

Create a Subscriber based on Profile: the string name parameter used to identify the subscriber is required, listener and StatusMask are optional

Subscriber* subscriber_with_profile = participant->create_subscriber_with_profile("subscriber_profile");
if (nullptr == subscriber_with_profile) {<!-- -->
    // Error
    return;
}

Delete Subscriber: You need to delete all entities (DataReaders) in Subscriber first, and then call delete_subscriber() to delete Subscriber

//Delete the entities the subscriber created
if (subscriber->delete_contained_entities() != ReturnCode_t::RETCODE_OK) {<!-- -->
    // Subscriber failed to delete the entities it created
    return;
}
// Delete the Subscriber
if (participant->delete_subscriber(subscriber) != ReturnCode_t::RETCODE_OK) {<!-- -->
    // Error
    return;
}

DataReader

DataReader only belongs to the Subscriber that created it. Each DataReader supports binding to a single Topic when it is created, so the Topic must be created before the DataReader is created. The data written on the pub side can be obtained through the DataReader::read_next_sample() or DataReader::take_next_sample() function.
DataReaderQos
DataReaderListener is used to monitor changes in DataReader status. There are the following callback member functions:

  • on_data_available
  • on_subscription_matched
    Create a DataReader: Bind to the Topic to transmit data and DataReaderQos are required parameters, DataReaderListener and StatusMask are optional
DataReader* data_reader_with_default_qos = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
if (nullptr == data_reader_with_default_qos) {<!-- -->
    // Error
    return;
}

Create DataReader based on Profile
Create a DataReader using a custom PayloadPool (Why use a custom PayloadPool?)

// A DataReaderQos must be provided to the creation method
DataReaderQos qos;

// Create PayloadPool
std::shared_ptr<CustomPayloadPool> payload_pool = std::make_shared<CustomPayloadPool>();
DataReader* data_reader = subscriber->create_datareader(topic, qos, nullptr, StatusMask::all(), payload_pool);
if (nullptr == data_reader) {<!-- -->
    // Error
    return;
}

DeleteDataReader
In the same way, you need to delete all entities belonging to DataReader (QueryConditions) before deleting DataReader.

// Delete the entities the DataReader created
if (data_reader->delete_contained_entities() != ReturnCode_t::RETCODE_OK) {<!-- -->
    // DataReader failed to delete the entities it created.
    return;
}

// Delete the DataReader
if (subscriber->delete_datareader(data_reader) != ReturnCode_t::RETCODE_OK) {<!-- -->
    // Error
    return;
}

SampleInfo

An important data structure that provides metadata information about related data for each DataReader, including:

  • The status of the data sample, such as whether it has been modified or read
  • The source of the data sample, such as the publisher’s instance handle or a public instance handle
  • The sequence number of the data sample, this is important to ensure the sequential receipt of data samples
  • The timestamp of the data sample, i.e. when the sample was written or modified
  • Whether the data sample is valid. Invalid data samples usually indicate that the life cycle of the instance has ended.

Read data

Receive and consume the data read by DataReader through reading or taking. The implementations of these functions for reading and taking are the same:

  • DataReader::read_next_sample / DataReader::take_next_sample
  • DataReader::read(), DataReader::read_instance(), DataReader::read_next_instance() / DataReader::take(), DataReader::take_instance(), DataReader::take_next_instance(): Get a collection of samples that meet specific conditions

Get data through callback

Two Listener callbacks, you can customize the listener to inherit from DataReaderListener:

  • on_data_available()
  • on_data_readers()

Processing by starting a waiting thread

  1. asynchronous
// Create a DataReader
DataReader* data_reader =
        subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
if (nullptr == data_reader)
{<!-- -->
    // Error
    return;
}

// Prepare a wait-set to wait for data on the DataReader
WaitSet wait_set;
StatusCondition & amp; condition = data_reader->get_statuscondition();
condition.set_enabled_statuses(StatusMask::data_available());
wait_set.attach_condition(condition);

// Create a data and SampleInfo instance
Foo data;
SampleInfo info;

//Define a timeout of 5 seconds
eprosima::fastrtps::Duration_t timeout (5, 0);

// Loop reading data as it arrives
// This will make the current thread to be dedicated exclusively to
// waiting and reading data until the remote DataWriter dies
while (true) {<!-- -->
    ConditionSeq active_conditions;
    if (ReturnCode_t::RETCODE_OK == wait_set.wait(active_conditions, timeout)) {<!-- -->
        while (ReturnCode_t::RETCODE_OK == data_reader->take_next_sample( & amp;data, & amp;info)) {<!-- -->
            if (info.valid_data) {<!-- -->
                // Do something with the data
                std::cout << "Received new data value for topic "
                          << topic->get_name() << std::endl;
            } else {<!-- -->
                // If the remote writer is not alive, we exit the reading loop
                std::cout << "Remote writer for topic "
                          << topic->get_name() << " is dead" << std::endl;
                break;
            }
        }
    } else {<!-- -->
        std::cout << "No data this time" << std::endl;
    }
}
  1. Synchronously, wait until data arrives or time times out through the DataReader::wait_for_unread_message() function
// Create a DataReader
DataReader* data_reader = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
if (nullptr == data_reader) {<!-- -->
    // Error
    return;
}
// Create a data and SampleInfo instance
Foo data;
SampleInfo info;
//Define a timeout of 5 seconds
eprosima::fastrtps::Duration_t timeout (5, 0);

// Loop reading data as it arrives
// This will make the current thread to be dedicated exclusively to
// waiting and reading data until the remote DataWriter dies
while (true) {<!-- -->
    if (data_reader->wait_for_unread_message(timeout)) {<!-- -->
        if (ReturnCode_t::RETCODE_OK == data_reader->take_next_sample( & amp;data, & amp;info)) {<!-- -->
            if (info.valid_data) {<!-- -->
                // Do something with the data
                std::cout << "Received new data value for topic "
                          << topic->get_name() << std::endl;
            } else {<!-- -->
                // If the remote writer is not alive, we exit the reading loop
                std::cout << "Remote writer for topic "
                          << topic->get_name() << " is dead" << std::endl;
                break;
            }
        }
    } else {<!-- -->
        std::cout << "No data this time" << std::endl;
    }
}