3. Reading and Writing Data¶
The previous chapter covered the definition and semantics of DDS topics, topic-instances and samples. It also described domains and partitions and the roles they play in organizing application data flows. This chapter examines the mechanisms provided by DDS for reading and writing data.
3.1. Writing Data¶
As already illustrated, writing data with DDS is as simple
as calling the write
method on the DataWriter
. Yet to
be able to take full advantage of DDS it is necessary to understand
the relationship between writers and topic-instances life-cycles.
To explain the difference between topics and the instances of a topic’s datatype, this Tutorial made the analogy between topics/topic datatypes and classes/objects in an Object-Oriented Programming language, such as Java or C++. Like objects, the instances of the topic’s datatype have:
an identity provided by their unique key value, and
a life-cycle.
The instance life-cycle of a topic’s datatype can be implicitly managed through
the semantics implied by the DataWriter
, or it can
be explicitly controlled via the DataWriter
API. The
instance life-cycle transition can have implications for
local and remote resource usage, thus it is important to
understand this aspect.
3.1.1. Topic-Instances Life-cycle¶
Before getting into the details of how the life-cycle is managed, let’s see which are the possible states.
An instance of a topic’s datatype is
ALIVE
if there is at least oneDataWriter
that has explicitly or implicitly (through a write) registered it. ADataWriter
that has registered an instance declares that it is committed to publishing potential updates for that instance as soon as they occur. For that reason, theDataWriter
has reserved resources to hold the administration for the instances and at least one of its samples.DataReaders
for this topic will also maintain a similar resource reservation for each registered instance. As long as an instance is registered by at least oneDataWriter
, it will be consideredALIVE
.An instance is in the
NOT_ALIVE_NO_WRITERS
state when there are no moreDataWriters
that have registered the instance. That means no moreDataWriters
have an intent to update the instance state and all of them released the resources they had previously claimed for it. In this stateDataReaders
no longer expect any incoming updates and so they may release their resources for the instance as well. Be aware that when a Writer forgets to unregister an instance it no longer intends to update, it does not only leak away the resources it had locally reserved for it, but it also leaks away the resources that all subscribingDataReaders
still have reserved for it in the expectation of future updates.Finally, the instance is
NOT_ALIVE_DISPOSED
if it was disposed either implicitly, due to some default QoS settings, or explicitly by means of a specificDataWriter
API call. TheNOT_ALIVE_DISPOSED
state indicates that the instance is no more relevant for the system and should basically be wiped from all storage. The big difference with theNOT_ALIVE_NO_WRITERS
state is that the latter only indicates that nobody intends to update the instance and does not say anything about the validity of the last known state.
As an example, when a publishing application crashes it might want to restart
on another node and obtain its last known state from the domain in which it
resides. In the mean time it has no intention to invalidate the last known
state for each of its instances or to wipe them from all storage in its domain.
Quite the opposite, it wants the last known state to remain available for
late-joiners, so that it can pick back up where it left off as soon as it is
restarted. So in this case the Writer needs to make sure its instances go from
ALIVE
to NOT_ALIVE_NO_WRITERS
after the crash, which may then go back
to ALIVE
after the publishing application has been restarted.
On the other hand, if the application gracefully terminates and wants to indicate
that its instances are no longer a concern to the DDS global data space, it may
want the state of its instances to go to NOT_ALIVE_DISPOSED
so that the rest of
the domain knows it can safely wipe away all of its samples in all of its storages.
3.1.2. Automatic Life-cycle Management¶
We will illustrate the instances life-cycle management with an example.
If we look at the code in Automatic management of Instance life-cycle
and assume this is the only application writing data, the result of the
three write
operations is to create three new topic instances in the
system for the key values associated with the id = 1
, 2
, 3
(the TempSensorType
was defined in the
first chapter
as having a single attribute key named id
). These instances will be
in the ALIVE
state as long as this application is running, and will be
automatically registered (we could say ‘associated’) with the writer.
The default behavior for DDS is to then dispose the topic instances once
the DataWriter
object is destroyed, thus leading those instances to the
NOT_ALIVE_DISPOSED
state. The default settings can be overridden to
simply induce instances’ unregistration, causing in this case a transition from
ALIVE
to NOT_ALIVE_NO_WRITERS
.
#include <thread>
#include <chrono>
#include <TempControl_DCPS.hpp>
int main(int, char**) {
dds::domain::DomainParticipant dp(org::opensplice::domain::default_id());
dds::topic::Topic<tutorial::TempSensorType> topic(dp, "TempSensorTopic");
dds::pub::Publisher pub(dp);
dds::pub::DataWriter<tutorial::TempSensorType> dw(pub, topic);
//[NOTE #1]: Instances implicitly registered as part
// of the write.
// {id, temp hum scale}
dw << tutorial::TempSensorType(1, 25.0F, 65.0F, tutorial::CELSIUS);
dw << tutorial::TempSensorType(2, 26.0F, 70.0F, tutorial::CELSIUS);
dw << tutorial::TempSensorType(3, 27.0F, 75.0F, tutorial::CELSIUS);
std::this_thread::sleep_for(std::chrono::seconds(10));
//[NOTE #2]: Instances automatically unregistered and
// disposed as result of the destruction of the dw object
return 0;
}
3.1.3. Explicit Life-cycle Management¶
Topic-instances life-cycle can also be managed explicitly via the API
defined on the DataWriter
.
In this case the application programmer has full control of when instances are registered, unregistered and disposed.
Topic-instance registration is a good practice to follow when an application writes an instance very often and requires the lowest-latency write. In essence the act of explicitly registering an instance allows the middleware to reserve resources as well as optimize the instance lookup. Topic-instance unregistration provides a means for telling DDS that an application is done with writing a specific topic-instance, thus all the resources locally associated with can be safely released. Finally, disposing topic-instances gives a way of communicating to DDS that the instance is no longer relevant for the distributed system, thus whenever possible resources allocated with the specific instances should be released both locally and remotely. Explicit management of topic-instances life-cycle shows an example of how the DataWriter API can be used to register, unregister and dispose topic-instances.
In order to show the full life-cycle management, the default DataWriter
behavior has been changed so that instances are not automatically
disposed when unregistered. In addition, to keep the code compact it
takes advantage of the new C++11 auto
feature which leaves it to
the the compiler to infer the left-hand-side types from the right-hand-side
return-type.
Explicit management of topic-instances life-cycle shows an
application that writes four samples belonging to four different
topic-instances, respectively those with id = 1, 2, 3
. The
instances with id = 1, 2, 3
are explicitly registered by calling the
DataWriter::register_instance
method, while the instance with
id=0
is automatically registered as result of the write on the
DataWriter
.
To show the different possible state transitions, the
topic-instance with id=1
is explicitly unregistered, thus causing
it to transition to the NOT_ALIVE_NO_WRITER
state; the
topic-instance with id=2
is explicitly disposed, thus causing it
to transition to the NOT_ALIVE_DISPOSED
state. Finally, the
topic-instance with id=0,3
will be automatically unregistered,
as a result of the destruction of the objects dw
and
dwi3
respectively, thus transitioning to the state
NOT_ALIVE_NO_WRITER
.
Once again, as mentioned above, in this example the writer has been configured to ensure that topic-instances are not automatically disposed upon unregistration.
#include <iostream>
#include <TempControl_DCPS.hpp>
int main(int, char**) {
dds::domain::DomainParticipant dp(org::opensplice::domain::default_id());
dds::topic::Topic<tutorial::TempSensorType> topic(dp, "TempSensorTopic");
dds::pub::Publisher pub(dp);
//[NOTE #1]: Avoid topic-instance dispose on unregister
dds::pub::qos::DataWriterQos dwqos = pub.default_datawriter_qos()
<< dds::core::policy::WriterDataLifecycle::ManuallyDisposeUnregisteredInstances();
//[NOTE #2]: Creating DataWriter with custom QoS.
// QoS will be covered in detail in article #4.
dds::pub::DataWriter<tutorial::TempSensorType> dw(pub, topic, dwqos);
tutorial::TempSensorType data(0, 24.3F, 0.5F, tutorial::CELSIUS);
dw.write(data);
tutorial::TempSensorType key;
short id = 1;
key.id(id);
//[NOTE #3] Registering topic-instance explicitly
dds::core::InstanceHandle h1 = dw.register_instance(key);
id = 2;
key.id(id);
dds::core::InstanceHandle h2 = dw.register_instance(key);
id = 3;
key.id(id);
dds::core::InstanceHandle h3 = dw.register_instance(key);
dw << tutorial::TempSensorType(1, 24.3F, 0.5F, tutorial::CELSIUS);
dw << tutorial::TempSensorType(2, 23.5F, 0.6F, tutorial::CELSIUS);
dw << tutorial::TempSensorType(3, 21.7F, 0.5F, tutorial::CELSIUS);
// [NOTE #4]: unregister topic-instance with id=1
dw.unregister_instance(h1);
// [NOTE #5]: dispose topic-instance with id=2
dw.dispose_instance(h2);
//[NOTE #6]:topic-instance with id=3 will be unregistered as
// result of the dw object destruction
return 0;
}
3.1.4. Keyless Topics¶
Most of the discussion above has focused on keyed topics, but what about keyless topics? As explained in Topics, Domains and Partitions keyless topics are like singletons, in the sense that there is only one instance. As a result for keyless topics the state transitions are tied to the lifecycle of the data-writer.
3.1.5. Blocking or Non-Blocking Write?¶
One question that might arise at this point is whether the write is blocking or not. The short answer is that the write is non-blocking; however, as will be seen later on, there are cases in which, depending on settings, the write might block. In these cases, the blocking behaviour is necessary to avoid data-loss.
3.2. Accessing Data¶
DDS provides a mechanism to select the samples based on their content and state, and another to control whether samples have to be read or taken (removed from the cache).
3.2.1. Read vs. Take¶
The DDS provides data access through the DataReader
class which exposes
two semantics for data access: read and take.
The read semantics, implemented by the DataReader::read
method, gives
access to the data received by the DataReader
without removing it from
its cache. This means that the data will remain readable via an appropriate
read call.
The take semantics, implemented by the DataReader::take
method,
allows DDS to access the data received by the DataReader
by removing it
from its local cache. This means that once the data is taken, it is no longer
available for subsequent read or take operations.
The semantics provided by the read
and take
operations enable
you to use DDS as either a distributed cache or like a queuing system, or
both. This is a powerful combination that is rarely found in the same
middleware platform. This is one of the reasons why DDS is used in a variety
of systems sometimes as a high-performance distributed cache, or like
a high-performance messaging technology, and at yet other times as a
combination of the two. In addition, the read semantics is useful when
using topics to model distributed states, and the take semantics when
modeling distributed events.
3.2.2. Data and Meta-Data¶
The first part of this chapter showed how the DataWriter
can be
used to control the life-cycle of topic-instances. The topic-instance
life-cycle along with other information describing properties of
received data samples are made available to DataReader
and can be
used to select the data access via either a read
or take
.
Specifically, each data sample received by a DataWriter
has an associated
SampleInfo
describing the property of that sample. These properties
includes information on:
Sample State. The sample state can be
READ
orNOT_READ
depending on whether the sample has already been read or not.Instance State. As explained above, this indicates the status of the instance as being either
ALIVE
,NOT_ALIVE_NO_WRITERS
, orNOT_ALIVE_DISPOSED
.View State. The view state can be
NEW
orNOT_NEW
depending on whether this is the first sample ever received for the given topic-instance or not.
The SampleInfo
also contains a set of counters that allow you to
determine the number of times that a topic-instance has performed
certain status transitions, such as becoming alive after being disposed.
Finally, the SampleInfo
contains a timestamp
for the data and a
flag that tells wether the associated data sample is valid or not. This
latter flag is important since DDS might generate valid samples info
with invalid data to inform about state transitions such as an instance
being disposed.
3.2.3. Selecting Samples¶
Regardless of whether data are read or taken from DDS, the same mechanism
is used to express the sample selection. Thus, for brevity, the following
examples use the read
operation; to use the take
operation,
simply replace each occurrence of a read
with a take
.
DDS allows the selection of data based on state and content.
State-based selection is based on the values of the view state, instance state and sample state.
Content-based selection is based on the content of the sample.
3.2.3.1. State-based Selection¶
For instance, to get all of the data received, no
matter what the view, instance and sample state, issue a read
(or a take
) as follows:
dds::sub::LoanedSamples<tutorial::TempSensorType> samples;
samples = dr.select().state(dds::sub::status::DataState::any()).read();
On the other hand, to read (or take) only samples that have not been
read yet, issue a read
(or a take
) as follows:
samples = dr.select().state(dds::sub::status::SampleState::not_read()).read();
To read new valid data, meaning no samples with only a valid
SampleInfo
, issue a read
(or a take
) as follows:
samples = dr.select().state(dds::sub::status::DataState::new_data()).read();
Finally, to only read data associated to instances that are
making their appearance in the system for the first time, issue
a read
(or a take
) as follows:
dds::sub::status::DataState ds;
ds << dds::sub::status::SampleState::not_read()
<< dds::sub::status::ViewState::new_view()
<< dds::sub::status::InstanceState::alive();
samples = dr.select().state(ds).read();
Notice that this kind of read only and always gets the first sample written for each instance.
Although it might seem a strange use case, this is quite useful for all those applications that need to do something special whenever a new instance makes its first appearance in the system. An example could be a new airplane entering a new region of control; in this case the system would have to do quite a few things that are unique to this specific state transition.
It is also worth mentioning that if the status is omitted,
a read
(or a take
) can be used like this:
auto samples2 = dr.read();
This is equivalent to selecting samples with the
NOT_READ_SAMPLE_STATE
, ALIVE_INSTANCE_STATE
and
ANY_VIEW_STATE
.
finally, it should be noted that statuses enable data to be selected based on its meta-information.
3.2.3.2. Content-based Selection¶
Content-based selection is supported through queries. Although the concept of a query might seem to overlap with that of content filtering, the underlying idea is different.
Filtering is about controlling the data received by the data reader: the data that does not match the filter is not inserted into the data reader cache. On the other hand, queries are about selecting the data that is (already) in the data reader cache.
// Define the query expression
std::string expression =
"(temp NOT BETWEEN (%0 AND %1)) \
OR \
(hum NOT BETWEEN (%2 and %3))";
// Define the query parameters
std::vector<std::string> params = {"20.5", "21.5", "30", "50"};
dds::sub::Query query(dr, expression, params);
auto samples = dr.select().content(query).read();
The syntax supported by query expressions is identical to that used to define filter expressions; for convenience this is summarized in the table.
Constructed Type |
Example |
---|---|
= |
equal |
<> |
not equal |
> |
greater than |
< |
less than |
>= |
greater than or equal |
<= |
less than or equal |
BETWEEN |
between and inclusive range |
LIKE |
matches a string pattern |
The execution of the query is completely under user control and is
performed in the context of a read
or take
operation as shown in
ListingB [Listing:DDS:Query].
3.2.3.3. Instance-based Selection¶
In some instances you may want to only look at the data coming from a specific topic instance. As instances are identified by the values of their key attributes you may be tempted to use content filtering to discriminate between them. Although this would work perfectly well, it is not the most efficient way of selecting an instance. DDS provides another mechanism that allows you to pinpoint the instance you are interested in more efficiently than content filtering. In essence, each instance has an associated instance handle; this can be used to access the data from a given instance in a very efficient manner.
The listing Instance-based selection shows how this can be done.
tutorial::TempSensorType key;
key.id() = 123;
auto handle = dr.lookup_instance(key);
auto samples = dr.select().instance(handle).read();
3.2.4. Iterators or Containers?¶
The examples shown so far were ‘loaning’ the data from DDS: in other words, you did not have to provide the storage for the samples. The advantage of this style of read is that it allows ‘zero copy’ reads. However, if you want to store the data in a container of your choice you can use iterator-based read and take operations.
The iterator-based read/take API supports both forward iterators as well as back-inserting iterators. The API allows you to read (or take) data into whatever structure you’d like, so long as you can get a forward or a back-inserting iterator for it. Here we will focus on the forward-iterator-based API; back-inserting is pretty similar. you should be able to read data as follows:
// Forward iterator using array.
dds::sub::Sample<tutorial::TempSensorType> samples[MAXSAMPLES];
unsigned int readSamples = dr.read(&samples, MAXSAMPLES);
// Forward iterator using vector.
std::vector<dds::sub::Sample<tutorial::TempSensorType> > fSamples(MAXSAMPLES);
readSamples = dr.read(fSamples.begin(), MAXSAMPLES);
// Back-inserting iterator using vector.
std::vector<dds::sub::Sample<tutorial::TempSensorType> > biSamples;
uint32_t readBiSamples = dr.read(std::back_inserter(biSamples));
3.2.5. Blocking or Non-Blocking Read/Take?¶
The DDS read and take are always non-blocking. If no data is available to read then the call will return immediately. Likewise if there is less data than requested the call will gather what is available and return right away. The non-blocking nature of read/take operations ensures that these can be safely used by applications that poll for data.
3.3. Waiting and being Notified¶
One way of coordinating with DDS is to have the application poll for data by performing either a read or a take every so often. Polling might be the best approach for some classes of applications, the most common example being control applications that execute a control loop or a cyclic executive. In general, however, applications might want to be notified of the availability of data or perhaps be able to wait for its availability, as opposed to polling for it. DDS supports both synchronous and asynchronous coordination by means of wait-sets and listeners.
3.3.1. Waitsets¶
DDS provides a generic mechanism for waiting on conditions. One of the
supported kind of conditions are ReadConditions
which can be used to
wait for the availability data on one or more DataReaders
. This
functionality is provided by the Waitset
class, which can be
regarded as an object-oriented version of the Unix select
.
// Create the WaitSet
dds::core::cond::WaitSet ws;
// Create a ReadCondition for our DataReader and configure it for new data
dds::sub::cond::ReadCondition rc(dr, dds::sub::status::DataState::new_data());
// Attach the condition
ws += rc;
// Wait for new data to be available
ws.wait();
// Read the data
auto samples = dr.read();
std::for_each(samples.begin(),
samples.end(),
[](const dds::sub::Sample<tutorial::TempSensorType>& s) {
std::cout << s.data() << std::endl;
});
If we wanted to wait for temperature samples to be available we could
create a ReadCondition
on our DataReader
and make it wait for new
data by creating a WaitSet
and attaching the ReadCondition
to it
as shown in Using WaitSet to wait for data availability.
At this point, we can synchronize on the
availability of data, and there are two ways of doing it. One approach
is to invoke the Waitset::wait
method, which returns the list of
active conditions. These active conditions can then be iterated upon and
their associated datareaders can be accessed. The other approach is to
invoke the Waitset::dispatch
, which is demonstrated in a separate example.
As an alternative to iterating through the conditions yourself, DDS conditions
can be associated with functor objects which are then used to execute
application-specific logic when the condition is triggered. The DDS event-handling
mechanism allows you to bind anything you want to an event, meaning that you
could bind a function, a class method, or even a lambda-function as a functor
to the condition. You then attach the condition to the waitset in the same way,
but in this case you would invoke the Waitset::dispatch
function, that causes
the infrastructure to automatically invoke the functor associated with each triggered
conditions before unblocking, as is shown in Using WaitSet to dispatch to incoming data.
Notice that the execution of the functor happens in the context of the application
thread, prior to returning from the Waitset::dispatch
function.
// Create the WaitSet
dds::core::cond::WaitSet ws;
// Create a ReadCondition for our DataReader and configure it for new data
dds::sub::cond::ReadCondition rc(dr,
dds::sub::status::DataState::new_data(),
[](const dds::sub::ReadCondition& srcCond) {
dds::sub::DataReader<tutorial::TempSensorType> srcReader = srcCond.data_reader();
// Read the data
auto samples = srcReader.read();
std::for_each(samples.begin(),
samples.end(),
[](const dds::sub::Sample<tutorial::TempSensorType>& s) {
std::cout << s.data() << std::endl;
});
});
// Attach the condition
ws += rc;
// Wait for new data to be available
ws.dispatch();
3.3.2. Listeners¶
Another way of finding out when there is data to be read is to take
advantage of the events raised by DDS and notified asynchronously to
registered handlers. Thus, if we wanted a handler to be notified of the
availability of data, we would connect the appropriate handler with the
on_data_available
event raised by the DataReader
.
class TempSensorListener :
public dds::sub::NoOpDataReaderListener<tutorial::TempSensorType>
{
public:
virtual void on_data_available(
dds::sub::DataReader<tutorial::TempSensorType>& dr) {
auto samples = dr.read();
std::for_each(samples.begin(), samples.end(),
[](const dds::sub::Sample<tutorial::TempSensorType>& s) {
std::cout << s.data().id() << std::endl;
});
}
};
TempSensorListener listener;
dr.listener(&listener, dds::core::status::StatusMask::data_available());
The listing Using a listener to receive notification of data availability
shows how this can be done. The NoOpDataReaderListener
is a utility
class provided by the API that provides a trivial implementation for all
of the operations defined as part of the listener. This way, you can override
only those that are relevant for your application.
Something worth pointing out is that the handler code will execute in a middleware thread. As a result, when using listeners you should try to minimize the time spent in the listener itself.
3.4. Summary¶
This chapter has presented the various aspects involved in writing
and reading data with DDS. It described the topic-instance life-cycle,
explained how that can be managed via the DataWriter
and showcased
all the meta-information available to DataReader
. It explained
wait-sets and listeners and how these can be used to receive
indication of when data is available.
It is recommended again that the reader compiles and runs the examples and experiments with the programs developed so far.