Directory
- Subscriber
-
- SubscriberQos
- SubscriberListener
- Create Subscriber
- DataReader
- SampleInfo
- Read data
-
- Get data through callback
- Processed by starting a waiting thread
Subscriber plays the role of a container, and there can be many DataReaders in it. They use the same SubscriberQos configuration of Subscriber. Subscriber can carry DataReader objects of different Topics and data types.
Subscriber
SubscriberQos
The default Qos configuration can be obtained through the get_default_subscriber_qos() function of the DomainParticipant instance.
SubscriberListener
The change of Subscriber’s status will trigger the callback function call of SubscriberListener. Users can implement custom callbacks through inheritance. Added callback: on_data_on_readers()
Create Subscriber
Created through DomainParticipant’s create_subscriber() function, the SubscriberQos parameter is required, and the SubscriberListener and StatusMask parameters are optional.
//Create DomainParticipant instance participant Subscriber* subscriber_with_default_qos = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT); if (nullptr == subscriber_with_default_qos) {<!-- --> // Error return; }
Create a Subscriber based on Profile: the string name parameter used to identify the subscriber is required, listener and StatusMask are optional
Subscriber* subscriber_with_profile = participant->create_subscriber_with_profile("subscriber_profile"); if (nullptr == subscriber_with_profile) {<!-- --> // Error return; }
Delete Subscriber: You need to delete all entities (DataReaders) in Subscriber first, and then call delete_subscriber()
to delete Subscriber
//Delete the entities the subscriber created if (subscriber->delete_contained_entities() != ReturnCode_t::RETCODE_OK) {<!-- --> // Subscriber failed to delete the entities it created return; } // Delete the Subscriber if (participant->delete_subscriber(subscriber) != ReturnCode_t::RETCODE_OK) {<!-- --> // Error return; }
DataReader
DataReader only belongs to the Subscriber that created it. Each DataReader supports binding to a single Topic when it is created, so the Topic must be created before the DataReader is created. The data written on the pub side can be obtained through the DataReader::read_next_sample() or DataReader::take_next_sample() function.
DataReaderQos
DataReaderListener is used to monitor changes in DataReader status. There are the following callback member functions:
- on_data_available
- on_subscription_matched
Create a DataReader: Bind to the Topic to transmit data and DataReaderQos are required parameters, DataReaderListener and StatusMask are optional
DataReader* data_reader_with_default_qos = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT); if (nullptr == data_reader_with_default_qos) {<!-- --> // Error return; }
Create DataReader based on Profile
Create a DataReader using a custom PayloadPool (Why use a custom PayloadPool?)
// A DataReaderQos must be provided to the creation method DataReaderQos qos; // Create PayloadPool std::shared_ptr<CustomPayloadPool> payload_pool = std::make_shared<CustomPayloadPool>(); DataReader* data_reader = subscriber->create_datareader(topic, qos, nullptr, StatusMask::all(), payload_pool); if (nullptr == data_reader) {<!-- --> // Error return; }
DeleteDataReader
In the same way, you need to delete all entities belonging to DataReader (QueryConditions) before deleting DataReader.
// Delete the entities the DataReader created if (data_reader->delete_contained_entities() != ReturnCode_t::RETCODE_OK) {<!-- --> // DataReader failed to delete the entities it created. return; } // Delete the DataReader if (subscriber->delete_datareader(data_reader) != ReturnCode_t::RETCODE_OK) {<!-- --> // Error return; }
SampleInfo
An important data structure that provides metadata information about related data for each DataReader, including:
- The status of the data sample, such as whether it has been modified or read
- The source of the data sample, such as the publisher’s instance handle or a public instance handle
- The sequence number of the data sample, this is important to ensure the sequential receipt of data samples
- The timestamp of the data sample, i.e. when the sample was written or modified
- Whether the data sample is valid. Invalid data samples usually indicate that the life cycle of the instance has ended.
Read data
Receive and consume the data read by DataReader through reading or taking. The implementations of these functions for reading and taking are the same:
- DataReader::read_next_sample / DataReader::take_next_sample
- DataReader::read(), DataReader::read_instance(), DataReader::read_next_instance() / DataReader::take(), DataReader::take_instance(), DataReader::take_next_instance(): Get a collection of samples that meet specific conditions
Get data through callback
Two Listener callbacks, you can customize the listener to inherit from DataReaderListener:
- on_data_available()
- on_data_readers()
Processing by starting a waiting thread
- asynchronous
// Create a DataReader DataReader* data_reader = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT); if (nullptr == data_reader) {<!-- --> // Error return; } // Prepare a wait-set to wait for data on the DataReader WaitSet wait_set; StatusCondition & amp; condition = data_reader->get_statuscondition(); condition.set_enabled_statuses(StatusMask::data_available()); wait_set.attach_condition(condition); // Create a data and SampleInfo instance Foo data; SampleInfo info; //Define a timeout of 5 seconds eprosima::fastrtps::Duration_t timeout (5, 0); // Loop reading data as it arrives // This will make the current thread to be dedicated exclusively to // waiting and reading data until the remote DataWriter dies while (true) {<!-- --> ConditionSeq active_conditions; if (ReturnCode_t::RETCODE_OK == wait_set.wait(active_conditions, timeout)) {<!-- --> while (ReturnCode_t::RETCODE_OK == data_reader->take_next_sample( & amp;data, & amp;info)) {<!-- --> if (info.valid_data) {<!-- --> // Do something with the data std::cout << "Received new data value for topic " << topic->get_name() << std::endl; } else {<!-- --> // If the remote writer is not alive, we exit the reading loop std::cout << "Remote writer for topic " << topic->get_name() << " is dead" << std::endl; break; } } } else {<!-- --> std::cout << "No data this time" << std::endl; } }
- Synchronously, wait until data arrives or time times out through the DataReader::wait_for_unread_message() function
// Create a DataReader DataReader* data_reader = subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT); if (nullptr == data_reader) {<!-- --> // Error return; } // Create a data and SampleInfo instance Foo data; SampleInfo info; //Define a timeout of 5 seconds eprosima::fastrtps::Duration_t timeout (5, 0); // Loop reading data as it arrives // This will make the current thread to be dedicated exclusively to // waiting and reading data until the remote DataWriter dies while (true) {<!-- --> if (data_reader->wait_for_unread_message(timeout)) {<!-- --> if (ReturnCode_t::RETCODE_OK == data_reader->take_next_sample( & amp;data, & amp;info)) {<!-- --> if (info.valid_data) {<!-- --> // Do something with the data std::cout << "Received new data value for topic " << topic->get_name() << std::endl; } else {<!-- --> // If the remote writer is not alive, we exit the reading loop std::cout << "Remote writer for topic " << topic->get_name() << " is dead" << std::endl; break; } } } else {<!-- --> std::cout << "No data this time" << std::endl; } }