OpenSplice ISO C++ 2 DCPS  v6.x
ISO C++ 2 OpenSplice Data Distribution Service Data-Centric Publish-Subscribe API
DataReaderImpl.hpp
Go to the documentation of this file.
1 /*
2  * Vortex OpenSplice
3  *
4  * This software and documentation are Copyright 2006 to 2021 ADLINK
5  * Technology Limited, its affiliated companies and licensors. All rights
6  * reserved.
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 #ifndef OSPL_DDS_SUB_TDATAREADER_IMPL_HPP_
22 #define OSPL_DDS_SUB_TDATAREADER_IMPL_HPP_
23 
28 /*
29  * OMG PSM class declaration
30  */
32 #include <dds/sub/Query.hpp>
34 
35 
36 
37 
38 /***************************************************************************
39  *
40  * dds/sub/DataReader<> WRAPPER implementation.
41  * Declaration can be found in dds/sub/DataReader.hpp
42  *
43  ***************************************************************************/
44 
45 // Implementation
46 
47 namespace dds
48 {
49 namespace sub
50 {
51 
52 //--------------------------------------------------------------------------------
53 // DATAREADER
54 //--------------------------------------------------------------------------------
55 
56 template <typename T>
57 DataReader<T>::Selector::Selector(DataReader& dr) : impl_(dr.delegate())
58 {
59 }
60 
61 template <typename T>
64 {
65  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
66 
67  impl_.instance(h);
68  return *this;
69 }
70 
71 
72 template <typename T>
75 {
76  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
77 
78  impl_.next_instance(h);
79  return *this;
80 }
81 
82 
83 template <typename T>
86 {
87  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
88 
89  impl_.filter_state(s);
90  return *this;
91 }
92 
93 template <typename T>
96 {
97  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
98 
99  impl_.filter_content(query);
100  return *this;
101 }
102 
103 template <typename T>
104 typename DataReader<T>::Selector&
106 {
107  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
108 
109  impl_.max_samples(n);
110  return *this;
111 }
112 
113 template <typename T>
116 {
117  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
118 
119  return impl_.read();
120 }
121 
122 template <typename T>
125 {
126  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
127 
128  return impl_.take();
129 }
130 
131 template <typename T>
132 template <typename SamplesFWIterator>
133 uint32_t
134 DataReader<T>::Selector::read(SamplesFWIterator sfit, uint32_t max_samples)
135 {
136  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
137 
138  return impl_.read(sfit, max_samples);
139 }
140 
141 template <typename T>
142 template <typename SamplesFWIterator>
143 uint32_t
144 DataReader<T>::Selector::take(SamplesFWIterator sfit, uint32_t max_samples)
145 {
146  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
147 
148  return impl_.take(sfit, max_samples);
149 }
150 
151 template <typename T>
152 template <typename SamplesBIIterator>
153 uint32_t
154 DataReader<T>::Selector::read(SamplesBIIterator sbit)
155 {
156  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
157 
158  return impl_.read(sbit);
159 }
160 
161 template <typename T>
162 template <typename SamplesBIIterator>
163 uint32_t
164 DataReader<T>::Selector::take(SamplesBIIterator sbit)
165 {
166  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
167 
168  return impl_.take(sbit);
169 }
170 
171 //--------------------------------------------------------------------------------
172 // DATAREADER::MANIPULATORSELECTOR
173 //--------------------------------------------------------------------------------
174 template <typename T>
176 ManipulatorSelector(DataReader& dr) : impl_(dr.delegate()) {}
177 
178 template <typename T>
179 bool
181 {
182  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
183 
184  return impl_.read_mode();
185 }
186 
187 template <typename T>
188 void
190 {
191  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
192 
193  impl_.read_mode(b);
194 }
195 
196 template <typename T>
199 {
200  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
201 
202  impl_.instance(h);
203  return *this;
204 }
205 
206 template <typename T>
209 {
210  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
211 
212  impl_.next_instance(h);
213  return *this;
214 }
215 
216 template <typename T>
219 {
220  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
221 
222  impl_ >> samples;
223  return *this;
224 }
225 
226 template <typename T>
229 {
230  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
231 
232  manipulator(*this);
233  return *this;
234 }
235 
237 template <typename T>
238 template <typename Functor>
241 {
242  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
243 
244  f(*this);
245  return *this;
246 }
249 template <typename T>
252 {
253  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
254 
255  impl_.filter_state(s);
256  return *this;
257 }
258 
259 template <typename T>
262 {
263  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
264 
265  impl_.filter_content(query);
266  return *this;
267 }
268 
269 template <typename T>
272 {
273  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this->impl_.get_reader());
274 
275  impl_.max_samples(n);
276  return *this;
277 }
278 
279 template <typename T>
281  const dds::sub::Subscriber& sub,
282  const dds::topic::Topic<T>& topic):
283  ::dds::core::Reference(new DELEGATE<T>(sub, topic, sub.is_nil() ? dds::sub::qos::DataReaderQos() : sub->default_datareader_qos()))
284 {
285  ISOCPP_REPORT_STACK_DDS_BEGIN(sub);
286 
287  this->delegate()->init(this->impl_);
288 }
289 
290 template <typename T>
292  const dds::sub::Subscriber& sub,
293  const ::dds::topic::Topic<T>& topic,
296  const dds::core::status::StatusMask& mask) :
297  ::dds::core::Reference(new DELEGATE<T>(sub, topic, qos, listener, mask))
298 {
299  ISOCPP_REPORT_STACK_DDS_BEGIN(sub);
300 
301  this->delegate()->init(this->impl_);
302 }
303 
304 #ifdef OMG_DDS_CONTENT_SUBSCRIPTION_SUPPORT
305 template <typename T>
307  const dds::sub::Subscriber& sub,
309  ::dds::core::Reference(new DELEGATE<T>(sub, topic, sub.default_datareader_qos()))
310 {
311  ISOCPP_REPORT_STACK_DDS_BEGIN(sub);
312 
313  this->delegate()->init(this->impl_);
314 }
315 
316 template <typename T>
318  const dds::sub::Subscriber& sub,
319  const ::dds::topic::ContentFilteredTopic<T>& topic,
322  const dds::core::status::StatusMask& mask) :
323  ::dds::core::Reference(new DELEGATE<T>(sub, topic, qos, listener, mask))
324 {
325  ISOCPP_REPORT_STACK_DDS_BEGIN(sub);
326 
327  this->delegate()->init(this->impl_);
328 }
329 #endif /* OMG_DDS_CONTENT_SUBSCRIPTION_SUPPORT */
330 
331 #ifdef OMG_DDS_MULTI_TOPIC_SUPPORT
332 template <typename T>
334  const dds::sub::Subscriber& sub,
335  const dds::topic::MultiTopic<T>& topic) :
336  ::dds::core::Reference(new DELEGATE<T>(sub, topic))
337 {
338  this->delegate()->init(this->impl_);
339 }
340 
341 template <typename T>
343  const dds::sub::Subscriber& sub,
344  const ::dds::topic::MultiTopic<T>& topic,
347  const dds::core::status::StatusMask& mask) :
348  ::dds::core::Reference(new DELEGATE<T>(sub, topic, qos, listener, mask))
349 {
350  this->delegate()->init(this->impl_);
351 }
352 #endif /* OMG_DDS_MULTI_TOPIC_SUPPORT */
353 
354 template <typename T>
356 
357 template <typename T>
360 {
361  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
362 
363  return this->delegate()->default_filter_state();
364 }
365 
366 template <typename T>
368 {
369  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
370 
371  this->delegate()->default_filter_state(status);
372  return *this;
373 }
374 
375 template <typename T>
377 {
378  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
379 
380  ls = this->read();
381  return *this;
382 }
383 
384 template <typename T>
387 {
388  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
389 
390  ManipulatorSelector selector(*this);
391  manipulator(selector);
392  return selector;
393 }
394 
396 template <typename T>
397 template <typename Functor>
400 {
401  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
402 
403  ManipulatorSelector selector(*this);
404  f(selector);
405  return selector;
406 }
409 template <typename T>
412 {
413  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
414 
415  return this->delegate()->read();
416 }
417 
418 template <typename T>
421 {
422  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
423 
424  return this->delegate()->take();
425 }
426 
427 
428 template <typename T>
429 template <typename SamplesFWIterator>
430 uint32_t
431 DataReader<T>::read(SamplesFWIterator sfit, uint32_t max_samples)
432 {
433  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
434 
435  return this->delegate()->read(sfit, max_samples);
436 }
437 
438 template <typename T>
439 template <typename SamplesFWIterator>
440 uint32_t
441 DataReader<T>::take(SamplesFWIterator sfit, uint32_t max_samples)
442 {
443  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
444 
445  return this->delegate()->take(sfit, max_samples);
446 }
447 
448 template <typename T>
449 template <typename SamplesBIIterator>
450 uint32_t
451 DataReader<T>::read(SamplesBIIterator sbit)
452 {
453  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
454 
455  return this->delegate()->read(sbit);
456 }
457 
458 template <typename T>
459 template <typename SamplesBIIterator>
460 uint32_t
461 DataReader<T>::take(SamplesBIIterator sbit)
462 {
463  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
464 
465  return this->delegate()->take(sbit);
466 }
467 
468 template <typename T>
471 {
472  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
473 
474  Selector selector(*this);
475  return selector;
476 }
477 
478 template <typename T>
481 {
482  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
483 
484  return this->delegate()->key_value(h);
485 }
486 
487 template <typename T>
488 T&
490 {
491  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
492 
493  return this->delegate()->key_value(sample, h);
494 }
495 
496 template <typename T>
499 {
500  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
501 
502  return this->delegate()->lookup_instance(key);
503 }
504 
505 template <typename T>
506 void
508  Listener* listener,
509  const dds::core::status::StatusMask& event_mask)
510 {
511  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
512 
513  this->delegate()->listener(listener, event_mask);
514 }
515 
516 template <typename T>
517 typename DataReader<T>::Listener*
519 {
520  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
521 
522  return this->delegate()->listener();
523 }
524 
525 }
526 }
527 
528 
529 
530 
531 /***************************************************************************
532  *
533  * dds/sub/detail/DataReader<> DELEGATE implementation.
534  * Declaration can be found in dds/sub/detail/DataReader.hpp
535  *
536  * Implementation and declaration have been separated because some circular
537  * dependencies, like with DataReaderListener and AnyDataReader.
538  *
539  ***************************************************************************/
540 
541 #include <dds/sub/AnyDataReader.hpp>
543 #include <dds/topic/Topic.hpp>
545 #include <org/opensplice/sub/AnyDataReaderDelegate.hpp>
546 
547 
548 template <typename T>
549 dds::sub::detail::DataReader<T>::DataReader(const dds::sub::Subscriber& sub,
550  const dds::topic::Topic<T>& topic,
551  const dds::sub::qos::DataReaderQos& qos,
553  const dds::core::status::StatusMask& mask)
554  : ::org::opensplice::sub::AnyDataReaderDelegate(qos, topic), sub_(sub)
555 {
556  ISOCPP_REPORT_STACK_DDS_BEGIN(topic);
557 
558  /* Create a implicit subscriber with the topic participant when needed. */
559  if (sub_.is_nil()) {
560  sub_ = dds::sub::Subscriber(topic->domain_participant());
561  }
562 
563  /* Merge the topic QoS implicitly when needed. */
564  if (topic.qos()->force_merge()) {
565  qos_ = topic.qos();
566  }
567 
568  common_constructor(listener, mask);
569 }
570 
571 template <typename T>
572 dds::sub::detail::DataReader<T>::DataReader(const dds::sub::Subscriber& sub,
574  const dds::sub::qos::DataReaderQos& qos,
576  const dds::core::status::StatusMask& mask)
577  : ::org::opensplice::sub::AnyDataReaderDelegate(qos, topic), sub_(sub)
578 {
579  ISOCPP_REPORT_STACK_DDS_BEGIN(topic);
580 
581  /* Create a implicit subscriber with the topic participant when needed. */
582  if (sub_.is_nil()) {
583  sub_ = dds::sub::Subscriber(topic->domain_participant());
584  }
585 
586  /* Merge the topic QoS implicitly when needed. */
587  if (topic.topic().qos()->force_merge()) {
588  qos_ = topic.topic().qos();
589  }
590 
591  common_constructor(listener, mask);
592 }
593 
594 template <typename T>
595 void
596 dds::sub::detail::DataReader<T>::common_constructor(
598  const dds::core::status::StatusMask& mask)
599 {
601  ISOCPP_THROW_EXCEPTION(ISOCPP_PRECONDITION_NOT_MET_ERROR, "DataReader cannot be created, topic information not found");
602  }
603 
604  org::opensplice::sub::qos::DataReaderQosDelegate drQos = qos_.delegate();
605 
606  // get and validate the kernel qos
607  drQos.check();
608  u_readerQos uQos = drQos.u_qos();
609 
610  u_subscriber uSubscriber = (u_subscriber)(sub_.delegate()->get_user_handle());
611 
612  std::string expression = this->AnyDataReaderDelegate::td_.delegate()->reader_expression();
613  std::vector<c_value> params = this->AnyDataReaderDelegate::td_.delegate()->reader_parameters();
614 
615  std::string name = "reader <" + this->AnyDataReaderDelegate::td_.name() + ">";
616  u_dataReader uReader = u_dataReaderNew(uSubscriber, name.c_str(), expression.c_str(),
617  params.empty() ? NULL : &params[0], (os_uint32)params.size(), uQos);
618  u_readerQosFree(uQos);
619 
620  if (!uReader) {
621  ISOCPP_THROW_EXCEPTION(ISOCPP_ERROR, "Failed to create DataReader");
622  } else {
623  this->AnyDataReaderDelegate::td_.delegate()->incrNrDependents();
624  }
625 
626  this->AnyDataReaderDelegate::setCopyOut(org::opensplice::topic::TopicTraits<T>::getCopyOut());
627  this->AnyDataReaderDelegate::setCopyIn(org::opensplice::topic::TopicTraits<T>::getCopyIn());
628 
629  this->userHandle = u_object(uReader);
630  this->listener_set((void*)listener, mask);
631  this->set_domain_id(this->sub_.delegate()->get_domain_id());
632 }
633 
634 template <typename T>
635 dds::sub::detail::DataReader<T>::~DataReader()
636 {
637  if (!this->closed) {
638  try {
639  close();
640  } catch (...) {
641 
642  }
643  }
644 }
645 
646 template <typename T>
647 void
648 dds::sub::detail::DataReader<T>::init(ObjectDelegate::weak_ref_type weak_ref)
649 {
650  /* Set weak_ref before passing ourselves to other isocpp objects. */
651  this->set_weak_ref(weak_ref);
652  /* Register writer at publisher. */
653  this->sub_.delegate()->add_datareader(*this);
654  /* Use listener dispatcher from the publisher. */
655  this->listener_dispatcher_set(this->sub_.delegate()->listener_dispatcher_get());
656  /* This only starts listening when the status mask shows interest. */
657  this->listener_enable();
658  /* Enable when needed. */
659  if (this->sub_.delegate()->is_enabled() && this->sub_.delegate()->is_auto_enable()) {
660  this->enable();
661  }
662 }
663 
664 template <typename T>
666 dds::sub::detail::DataReader<T>::default_filter_state()
667 {
668  org::opensplice::core::ScopedObjectLock scopedLock(*this);
669 
670  dds::sub::status::DataState state = this->status_filter_;
671 
672  scopedLock.unlock();
673 
674  return state;
675 }
676 
677 template <typename T>
678 void
679 dds::sub::detail::DataReader<T>::default_filter_state(const dds::sub::status::DataState& state)
680 {
681  org::opensplice::core::ScopedObjectLock scopedLock(*this);
682 
683  this->status_filter_ = state;
684 
685  scopedLock.unlock();
686 }
687 
688 
689 template <typename T>
692 {
694  dds::sub::detail::LoanedSamplesHolder<T> holder(samples);
695 
696  this->AnyDataReaderDelegate::read((u_dataReader)(this->userHandle), this->status_filter_, holder, -1);
697 
698  return samples;
699 }
700 
701 template <typename T>
704 {
706  dds::sub::detail::LoanedSamplesHolder<T> holder(samples);
707 
708  this->AnyDataReaderDelegate::take((u_dataReader)(this->userHandle), this->status_filter_, holder, -1);
709 
710  return samples;
711 }
712 
713 template <typename T>
714 template<typename SamplesFWIterator>
715 uint32_t
716 dds::sub::detail::DataReader<T>::read(SamplesFWIterator samples, uint32_t max_samples)
717 {
718  dds::sub::detail::SamplesFWInteratorHolder<T, SamplesFWIterator> holder(samples);
719 
720  this->AnyDataReaderDelegate::read((u_dataReader)(this->userHandle), this->status_filter_, holder, max_samples);
721 
722  return holder.get_length();
723 }
724 
725 template <typename T>
726 template<typename SamplesFWIterator>
727 uint32_t
728 dds::sub::detail::DataReader<T>::take(SamplesFWIterator samples, uint32_t max_samples)
729 {
730  dds::sub::detail::SamplesFWInteratorHolder<T, SamplesFWIterator> holder(samples);
731 
732  this->AnyDataReaderDelegate::take((u_dataReader)(this->userHandle), this->status_filter_, holder, max_samples);
733 
734  return holder.get_length();
735 }
736 
737 template <typename T>
738 template<typename SamplesBIIterator>
739 uint32_t
740 dds::sub::detail::DataReader<T>::read(SamplesBIIterator samples)
741 {
742  dds::sub::detail::SamplesBIIteratorHolder<T, SamplesBIIterator> holder(samples);
743 
744  this->AnyDataReaderDelegate::read((u_dataReader)(this->userHandle), this->status_filter_, holder, -1);
745 
746  return holder.get_length();
747 }
748 
749 template <typename T>
750 template<typename SamplesBIIterator>
751 uint32_t
752 dds::sub::detail::DataReader<T>::take(SamplesBIIterator samples)
753 {
754  dds::sub::detail::SamplesBIIteratorHolder<T, SamplesBIIterator> holder(samples);
755 
756  this->AnyDataReaderDelegate::take((u_dataReader)(this->userHandle), this->status_filter_, holder, -1);
757 
758  return holder.get_length();
759 }
760 
761 template <typename T>
763 dds::sub::detail::DataReader<T>::key_value(const dds::core::InstanceHandle& h)
764 {
765  T key_holder;
766 
767  this->AnyDataReaderDelegate::get_key_value((u_dataReader)(this->userHandle), h, &key_holder);
768 
769  return dds::topic::TopicInstance<T>(h, key_holder);
770 }
771 
772 template <typename T>
773 T&
774 dds::sub::detail::DataReader<T>::key_value(T& key, const dds::core::InstanceHandle& h)
775 {
776  this->AnyDataReaderDelegate::get_key_value((u_dataReader)(this->userHandle), h, &key);
777 
778  return key;
779 }
780 
781 template <typename T>
783 dds::sub::detail::DataReader<T>::lookup_instance(const T& key) const
784 {
785  dds::core::InstanceHandle handle(this->AnyDataReaderDelegate::lookup_instance((u_dataReader)(this->userHandle), &key));
786 
787  return handle;
788 }
789 
790 template <typename T>
792 dds::sub::detail::DataReader<T>::subscriber() const
793 {
794  this->check();
795 
796  return sub_;
797 }
798 
799 template <typename T>
800 void
801 dds::sub::detail::DataReader<T>::close()
802 {
804  this->listener_dispatcher_reset();
805 
806  org::opensplice::core::ScopedObjectLock scopedLock(*this);
807 
808  this->sub_.delegate()->remove_datareader(*this);
809 
810  // Remove our dependency on the topicdescription, and drop our reference to it,
811  // so that it can become garbage collected.
812  // It is important that we also drop our reference to the topicdescription, since
813  // subsequent dependencies between for example ContentFilteredTopic to Topic can
814  // only be dropped by the destructor of the ContentFilteredTopic.
815  this->AnyDataReaderDelegate::td_.delegate()->decrNrDependents();
816  this->AnyDataReaderDelegate::td_ = dds::topic::TopicDescription(dds::core::null);
817 
818  org::opensplice::sub::AnyDataReaderDelegate::close();
819 
820  scopedLock.unlock();
821 }
822 
823 
824 template <typename T>
826 dds::sub::detail::DataReader<T>::listener()
827 {
828  return reinterpret_cast<dds::sub::DataReaderListener<T>*>(this->listener_get());
829 }
830 
831 template <typename T>
832 void
833 dds::sub::detail::DataReader<T>::listener(
835  const dds::core::status::StatusMask& event_mask)
836 {
837  /* EntityDelegate takes care of thread safety. */
838  this->listener_set((void*)l, event_mask);
839  this->listener_enable();
840 }
841 
842 template <typename T>
844 dds::sub::detail::DataReader<T>::wrapper()
845 {
846  typename DataReader::ref_type ref =
847  OSPL_CXX11_STD_MODULE::dynamic_pointer_cast<DataReader<T> >(this->get_strong_ref());
849 
850  return reader;
851 }
852 
853 template <typename T>
854 void
855 dds::sub::detail::DataReader<T>::listener_notify(
856  ObjectDelegate::ref_type source,
857  uint32_t triggerMask,
858  void *eventData,
859  void *l)
860 {
861  /* The EntityDelegate takes care of the thread safety and always
862  * provides a listener and source. */
864  reinterpret_cast<dds::sub::DataReaderListener<T>*>(l);
865  assert(listener);
866 
867  /* Get DataWriter wrapper from given source EntityDelegate. */
868  typename DataReader::ref_type ref =
869  OSPL_CXX11_STD_MODULE::dynamic_pointer_cast<DataReader<T> >(source);
871 
872 
873  if (triggerMask & V_EVENT_DATA_AVAILABLE) {
874  ref->reset_data_available();
875  listener->on_data_available(reader);
876  }
877 
878  if (triggerMask & V_EVENT_SAMPLE_REJECTED) {
880  status.delegate().v_status(((v_readerStatus)eventData)->sampleRejected);
881  listener->on_sample_rejected(reader, status);
882  }
883 
884  if (triggerMask & V_EVENT_LIVELINESS_CHANGED) {
886  status.delegate().v_status(((v_readerStatus)eventData)->livelinessChanged);
887  listener->on_liveliness_changed(reader, status);
888  }
889 
890  if (triggerMask & V_EVENT_REQUESTED_DEADLINE_MISSED) {
892  status.delegate().v_status(((v_readerStatus)eventData)->deadlineMissed);
893  listener->on_requested_deadline_missed(reader, status);
894  }
895 
896  if (triggerMask & V_EVENT_REQUESTED_INCOMPATIBLE_QOS) {
898  status.delegate().v_status(((v_readerStatus)eventData)->incompatibleQos);
899  listener->on_requested_incompatible_qos(reader, status);
900  }
901 
902  if (triggerMask & V_EVENT_SAMPLE_LOST) {
904  status.delegate().v_status(((v_readerStatus)eventData)->sampleLost);
905  listener->on_sample_lost(reader, status);
906  }
907 
908  if (triggerMask & V_EVENT_SUBSCRIPTION_MATCHED) {
910  status.delegate().v_status(((v_readerStatus)eventData)->subscriptionMatch);
911  listener->on_subscription_matched(reader, status);
912  }
913 }
914 
915 template <typename T>
916 dds::sub::detail::DataReader<T>::Selector::Selector(typename DataReader<T>::ref_type dr)
917  : mode(SELECT_MODE_READ), reader(dr), state_filter_is_set_(false),
918  max_samples_((uint32_t)-1), query_(dds::core::null)
919 {
920 }
921 
922 template <typename T>
923 typename dds::sub::detail::DataReader<T>::Selector&
925 {
926  this->handle = h;
927  switch (this->mode) {
928  case SELECT_MODE_READ:
929  case SELECT_MODE_READ_INSTANCE:
930  case SELECT_MODE_READ_NEXT_INSTANCE:
931  this->mode = SELECT_MODE_READ_INSTANCE;
932  break;
933  case SELECT_MODE_READ_WITH_CONDITION:
934  case SELECT_MODE_READ_INSTANCE_WITH_CONDITION:
935  case SELECT_MODE_READ_NEXT_INSTANCE_WITH_CONDITION:
936  this->mode = SELECT_MODE_READ_INSTANCE_WITH_CONDITION;
937  break;
938  }
939 
940  return *this;
941 }
942 
943 template <typename T>
944 typename dds::sub::detail::DataReader<T>::Selector&
946 {
947  this->handle = h;
948  switch (this->mode) {
949  case SELECT_MODE_READ:
950  case SELECT_MODE_READ_INSTANCE:
951  case SELECT_MODE_READ_NEXT_INSTANCE:
952  this->mode = SELECT_MODE_READ_NEXT_INSTANCE;
953  break;
954  case SELECT_MODE_READ_WITH_CONDITION:
955  case SELECT_MODE_READ_INSTANCE_WITH_CONDITION:
956  case SELECT_MODE_READ_NEXT_INSTANCE_WITH_CONDITION:
957  this->mode = SELECT_MODE_READ_NEXT_INSTANCE_WITH_CONDITION;
958  break;
959  }
960 
961  return *this;
962 }
963 
964 template <typename T>
965 typename dds::sub::detail::DataReader<T>::Selector&
966 dds::sub::detail::DataReader<T>::Selector::filter_state(const dds::sub::status::DataState& s)
967 {
968  this->state_filter_ = s;
969  this->state_filter_is_set_ = true;
970  if ((this->mode == SELECT_MODE_READ_WITH_CONDITION) ||
971  (this->mode == SELECT_MODE_READ_INSTANCE_WITH_CONDITION) ||
972  (this->mode == SELECT_MODE_READ_NEXT_INSTANCE_WITH_CONDITION)) {
973  if (!this->query_.delegate()->modify_state_filter(this->state_filter_)) {
974  dds::sub::Query q(this->query_.data_reader(), this->query_.expression(), this->query_.delegate()->parameters());
975  q.delegate()->state_filter(this->state_filter_);
976  this->query_ = q;
977  }
978  }
979  return *this;
980 }
981 
982 template <typename T>
983 typename dds::sub::detail::DataReader<T>::Selector&
985 {
986  this->max_samples_ = n;
987  return *this;
988 }
989 
990 template <typename T>
991 typename dds::sub::detail::DataReader<T>::Selector&
992 dds::sub::detail::DataReader<T>::Selector::filter_content(
993  const dds::sub::Query& query)
994 {
995  if (this->state_filter_is_set_) {
996  if (!query.delegate()->modify_state_filter(this->state_filter_)) {
997  dds::sub::Query q(query.data_reader(), query.expression(), query.delegate()->parameters());
998  q.delegate()->state_filter(this->state_filter_);
999  this->query_ = q;
1000  } else {
1001  this->query_ = query;
1002  }
1003  } else {
1004  this->query_ = query;
1005  }
1006 
1007  switch (mode) {
1008  case SELECT_MODE_READ:
1009  mode = SELECT_MODE_READ_WITH_CONDITION;
1010  break;
1011  case SELECT_MODE_READ_INSTANCE:
1012  mode = SELECT_MODE_READ_INSTANCE_WITH_CONDITION;
1013  break;
1014  case SELECT_MODE_READ_NEXT_INSTANCE:
1015  mode = SELECT_MODE_READ_NEXT_INSTANCE_WITH_CONDITION;
1016  break;
1017  case SELECT_MODE_READ_WITH_CONDITION:
1018  case SELECT_MODE_READ_INSTANCE_WITH_CONDITION:
1019  case SELECT_MODE_READ_NEXT_INSTANCE_WITH_CONDITION:
1020  break;
1021  default:
1022  break;
1023  }
1024 
1025  return *this;
1026 }
1027 
1028 template <typename T>
1031 {
1032  return this->reader->read(*this);
1033 }
1034 
1035 template <typename T>
1038 {
1039  return this->reader->take(*this);
1040 }
1041 
1042 // --- Forward Iterators: --- //
1043 
1044 template <typename T>
1045 template<typename SamplesFWIterator>
1046 uint32_t
1047 dds::sub::detail::DataReader<T>::Selector::read(SamplesFWIterator sfit, uint32_t max_samples)
1048 {
1049  return this->reader->read(sfit, max_samples, *this);
1050 }
1051 
1052 template <typename T>
1053 template<typename SamplesFWIterator>
1054 uint32_t
1055 dds::sub::detail::DataReader<T>::Selector::take(SamplesFWIterator sfit, uint32_t max_samples)
1056 {
1057  return this->reader->take(sfit, max_samples, *this);
1058 }
1059 
1060 // --- Back-Inserting Iterators: --- //
1061 
1062 template <typename T>
1063 template<typename SamplesBIIterator>
1064 uint32_t
1066 {
1067  return this->reader->read(sbit, *this);
1068 }
1069 
1070 template <typename T>
1071 template<typename SamplesBIIterator>
1072 uint32_t
1074 {
1075  return this->reader->take(sbit, *this);
1076 }
1077 
1078 template <typename T>
1079 typename dds::sub::detail::DataReader<T>::SelectMode
1080 dds::sub::detail::DataReader<T>::Selector::get_mode() const
1081 {
1082  return this->mode;
1083 }
1084 
1085 template <typename T>
1086 dds::sub::detail::DataReader<T>::ManipulatorSelector::ManipulatorSelector(typename DataReader<T>::ref_type dr) :
1087  Selector(dr), read_mode_(true)
1088 {
1089 }
1090 
1091 template <typename T>
1092 bool
1093 dds::sub::detail::DataReader<T>::ManipulatorSelector::read_mode()
1094 {
1095  return read_mode_;
1096 }
1097 
1098 template <typename T>
1099 void
1100 dds::sub::detail::DataReader<T>::ManipulatorSelector::read_mode(bool b)
1101 {
1102  read_mode_ = b;
1103 }
1104 
1105 template <typename T>
1106 typename dds::sub::detail::DataReader<T>::ManipulatorSelector&
1107 dds::sub::detail::DataReader<T>::ManipulatorSelector::operator >>(dds::sub::LoanedSamples<T>& samples)
1108 {
1109  if(read_mode_)
1110  {
1111  samples = this->Selector::read();
1112  }
1113  else
1114  {
1115  samples = this->Selector::take();
1116  }
1117  return *this;
1118 }
1119 
1120 
1121 template <typename T>
1124 {
1125  u_query uQuery;
1126 
1128  dds::sub::detail::LoanedSamplesHolder<T> holder(samples);
1129 
1130  switch(selector.mode) {
1131  case SELECT_MODE_READ:
1132  this->AnyDataReaderDelegate::read((u_dataReader)(userHandle), selector.state_filter_, holder, selector.max_samples_);
1133  break;
1134  case SELECT_MODE_READ_INSTANCE:
1135  this->AnyDataReaderDelegate::read_instance((u_dataReader)(userHandle), selector.handle, selector.state_filter_, holder, selector.max_samples_);
1136  break;
1137  case SELECT_MODE_READ_NEXT_INSTANCE:
1138  this->AnyDataReaderDelegate::read_next_instance((u_dataReader)(userHandle), selector.handle, selector.state_filter_, holder, selector.max_samples_);
1139  break;
1140  case SELECT_MODE_READ_WITH_CONDITION:
1141  uQuery = selector.query_.delegate()->get_user_query();
1142  this->AnyDataReaderDelegate::read_w_condition(uQuery, holder, selector.max_samples_);
1143  break;
1144  case SELECT_MODE_READ_INSTANCE_WITH_CONDITION:
1145  uQuery = selector.query_.delegate()->get_user_query();
1146  this->AnyDataReaderDelegate::read_instance_w_condition(uQuery, selector.handle, holder, selector.max_samples_);
1147  break;
1148  case SELECT_MODE_READ_NEXT_INSTANCE_WITH_CONDITION:
1149  uQuery = selector.query_.delegate()->get_user_query();
1150  this->AnyDataReaderDelegate::read_next_instance_w_condition(uQuery, selector.handle, holder, selector.max_samples_);
1151  break;
1152  }
1153 
1154  return samples;
1155 }
1156 
1157 template <typename T>
1160 {
1161  u_query uQuery;
1162 
1164  dds::sub::detail::LoanedSamplesHolder<T> holder(samples);
1165 
1166  switch(selector.mode) {
1167  case SELECT_MODE_READ:
1168  this->AnyDataReaderDelegate::take((u_dataReader)(userHandle), selector.state_filter_, holder, selector.max_samples_);
1169  break;
1170  case SELECT_MODE_READ_INSTANCE:
1171  this->AnyDataReaderDelegate::take_instance((u_dataReader)(userHandle), selector.handle, selector.state_filter_, holder, selector.max_samples_);
1172  break;
1173  case SELECT_MODE_READ_NEXT_INSTANCE:
1174  this->AnyDataReaderDelegate::take_next_instance((u_dataReader)(userHandle), selector.handle, selector.state_filter_, holder, selector.max_samples_);
1175  break;
1176  case SELECT_MODE_READ_WITH_CONDITION:
1177  uQuery = selector.query_.delegate()->get_user_query();
1178  this->AnyDataReaderDelegate::take_w_condition(uQuery, holder, selector.max_samples_);
1179  break;
1180  case SELECT_MODE_READ_INSTANCE_WITH_CONDITION:
1181  uQuery = selector.query_.delegate()->get_user_query();
1182  this->AnyDataReaderDelegate::take_instance_w_condition(uQuery, selector.handle, holder, selector.max_samples_);
1183  break;
1184  case SELECT_MODE_READ_NEXT_INSTANCE_WITH_CONDITION:
1185  uQuery = selector.query_.delegate()->get_user_query();
1186  this->AnyDataReaderDelegate::take_next_instance_w_condition(uQuery, selector.handle, holder, selector.max_samples_);
1187  break;
1188  }
1189 
1190  return samples;
1191 }
1192 
1193 // --- Forward Iterators: --- //
1194 
1195 template <typename T>
1196 template<typename SamplesFWIterator>
1197 uint32_t
1198 dds::sub::detail::DataReader<T>::read(SamplesFWIterator samples,
1199  uint32_t max_samples, const Selector& selector)
1200 {
1201  u_query uQuery;
1202 
1203  dds::sub::detail::SamplesFWInteratorHolder<T, SamplesFWIterator> holder(samples);
1204 
1205  switch(selector.mode) {
1206  case SELECT_MODE_READ:
1207  this->AnyDataReaderDelegate::read((u_dataReader)(userHandle), selector.state_filter_, holder, max_samples);
1208  break;
1209  case SELECT_MODE_READ_INSTANCE:
1210  this->AnyDataReaderDelegate::read_instance((u_dataReader)(userHandle), selector.handle, selector.state_filter_, holder, max_samples);
1211  break;
1212  case SELECT_MODE_READ_NEXT_INSTANCE:
1213  this->AnyDataReaderDelegate::read_next_instance((u_dataReader)(userHandle), selector.handle, selector.state_filter_, holder, max_samples);
1214  break;
1215  case SELECT_MODE_READ_WITH_CONDITION:
1216  uQuery = selector.query_.delegate()->get_user_query();
1217  this->AnyDataReaderDelegate::read_w_condition(uQuery, holder, max_samples);
1218  break;
1219  case SELECT_MODE_READ_INSTANCE_WITH_CONDITION:
1220  uQuery = selector.query_.delegate()->get_user_query();
1221  this->AnyDataReaderDelegate::read_instance_w_condition(uQuery, selector.handle, holder, max_samples);
1222  break;
1223  case SELECT_MODE_READ_NEXT_INSTANCE_WITH_CONDITION:
1224  uQuery = selector.query_.delegate()->get_user_query();
1225  this->AnyDataReaderDelegate::read_next_instance_w_condition(uQuery, selector.handle, holder, max_samples);
1226  break;
1227  }
1228 
1229  return holder.get_length();
1230 }
1231 
1232 template <typename T>
1233 template<typename SamplesFWIterator>
1234 uint32_t
1235 dds::sub::detail::DataReader<T>::take(SamplesFWIterator samples,
1236  uint32_t max_samples, const Selector& selector)
1237 {
1238  u_query uQuery;
1239 
1240  dds::sub::detail::SamplesFWInteratorHolder<T, SamplesFWIterator> holder(samples);
1241 
1242  switch(selector.mode) {
1243  case SELECT_MODE_READ:
1244  this->AnyDataReaderDelegate::take((u_dataReader)(userHandle), selector.state_filter_, holder, max_samples);
1245  break;
1246  case SELECT_MODE_READ_INSTANCE:
1247  this->AnyDataReaderDelegate::take_instance((u_dataReader)(userHandle), selector.handle, selector.state_filter_, holder, max_samples);
1248  break;
1249  case SELECT_MODE_READ_NEXT_INSTANCE:
1250  this->AnyDataReaderDelegate::take_next_instance((u_dataReader)(userHandle), selector.handle, selector.state_filter_, holder, max_samples);
1251  break;
1252  case SELECT_MODE_READ_WITH_CONDITION:
1253  uQuery = selector.query_.delegate()->get_user_query();
1254  this->AnyDataReaderDelegate::take_w_condition(uQuery, holder, max_samples);
1255  break;
1256  case SELECT_MODE_READ_INSTANCE_WITH_CONDITION:
1257  uQuery = selector.query_.delegate()->get_user_query();
1258  this->AnyDataReaderDelegate::take_instance_w_condition(uQuery, selector.handle, holder, max_samples);
1259  break;
1260  case SELECT_MODE_READ_NEXT_INSTANCE_WITH_CONDITION:
1261  uQuery = selector.query_.delegate()->get_user_query();
1262  this->AnyDataReaderDelegate::take_next_instance_w_condition(uQuery, selector.handle, holder, max_samples);
1263  break;
1264  }
1265 
1266  return holder.get_length();
1267 }
1268 
1269 // --- Back-Inserting Iterators: --- //
1270 
1271 template <typename T>
1272 template<typename SamplesBIIterator>
1273 uint32_t
1274 dds::sub::detail::DataReader<T>::read(SamplesBIIterator samples, const Selector& selector)
1275 {
1276  u_query uQuery;
1277 
1278  dds::sub::detail::SamplesBIIteratorHolder<T, SamplesBIIterator> holder(samples);
1279 
1280  switch(selector.mode) {
1281  case SELECT_MODE_READ:
1282  this->AnyDataReaderDelegate::read((u_dataReader)(userHandle), selector.state_filter_, holder, selector.max_samples_);
1283  break;
1284  case SELECT_MODE_READ_INSTANCE:
1285  this->AnyDataReaderDelegate::read_instance((u_dataReader)(userHandle), selector.handle, selector.state_filter_, holder, selector.max_samples_);
1286  break;
1287  case SELECT_MODE_READ_NEXT_INSTANCE:
1288  this->AnyDataReaderDelegate::read_next_instance((u_dataReader)(userHandle), selector.handle, selector.state_filter_, holder, selector.max_samples_);
1289  break;
1290  case SELECT_MODE_READ_WITH_CONDITION:
1291  uQuery = selector.query_.delegate()->get_user_query();
1292  this->AnyDataReaderDelegate::read_w_condition(uQuery, holder, selector.max_samples_);
1293  break;
1294  case SELECT_MODE_READ_INSTANCE_WITH_CONDITION:
1295  uQuery = selector.query_.delegate()->get_user_query();
1296  this->AnyDataReaderDelegate::read_instance_w_condition(uQuery, selector.handle, holder, selector.max_samples_);
1297  break;
1298  case SELECT_MODE_READ_NEXT_INSTANCE_WITH_CONDITION:
1299  uQuery = selector.query_.delegate()->get_user_query();
1300  this->AnyDataReaderDelegate::read_next_instance_w_condition(uQuery, selector.handle, holder, selector.max_samples_);
1301  break;
1302  }
1303 
1304  return holder.get_length();
1305 }
1306 
1307 template <typename T>
1308 template<typename SamplesBIIterator>
1309 uint32_t
1310 dds::sub::detail::DataReader<T>::take(SamplesBIIterator samples, const Selector& selector)
1311 {
1312  u_query uQuery;
1313 
1314  dds::sub::detail::SamplesBIIteratorHolder<T, SamplesBIIterator> holder(samples);
1315 
1316  switch(selector.mode) {
1317  case SELECT_MODE_READ:
1318  this->AnyDataReaderDelegate::take((u_dataReader)(userHandle), selector.state_filter_, holder, selector.max_samples_);
1319  break;
1320  case SELECT_MODE_READ_INSTANCE:
1321  this->AnyDataReaderDelegate::take_instance((u_dataReader)(userHandle), selector.handle, selector.state_filter_, holder, selector.max_samples_);
1322  break;
1323  case SELECT_MODE_READ_NEXT_INSTANCE:
1324  this->AnyDataReaderDelegate::take_next_instance((u_dataReader)(userHandle), selector.handle, selector.state_filter_, holder, selector.max_samples_);
1325  break;
1326  case SELECT_MODE_READ_WITH_CONDITION:
1327  uQuery = selector.query_.delegate()->get_user_query();
1328  this->AnyDataReaderDelegate::take_w_condition(uQuery, holder, selector.max_samples_);
1329  break;
1330  case SELECT_MODE_READ_INSTANCE_WITH_CONDITION:
1331  uQuery = selector.query_.delegate()->get_user_query();
1332  this->AnyDataReaderDelegate::take_instance_w_condition(uQuery, selector.handle, holder, selector.max_samples_);
1333  break;
1334  case SELECT_MODE_READ_NEXT_INSTANCE_WITH_CONDITION:
1335  uQuery = selector.query_.delegate()->get_user_query();
1336  this->AnyDataReaderDelegate::take_next_instance_w_condition(uQuery, selector.handle, holder, selector.max_samples_);
1337  break;
1338  }
1339 
1340  return holder.get_length();
1341 }
1342 
1343 
1344 namespace dds
1345 {
1346 namespace sub
1347 {
1348 
1370 template <typename SELECTOR>
1371 SELECTOR& read(SELECTOR& selector)
1372 {
1373  selector.read_mode(true);
1374  return selector;
1375 }
1376 
1398 template <typename SELECTOR>
1399 SELECTOR& take(SELECTOR& selector)
1400 {
1401  selector.read_mode(false);
1402  return selector;
1403 }
1404 
1424 inline dds::sub::functors::MaxSamplesManipulatorFunctor
1425 max_samples(uint32_t n)
1426 {
1427  return dds::sub::functors::MaxSamplesManipulatorFunctor(n);
1428 }
1429 
1454 inline dds::sub::functors::ContentFilterManipulatorFunctor
1456 {
1457  return dds::sub::functors::ContentFilterManipulatorFunctor(query);
1458 }
1459 
1460 
1486 inline dds::sub::functors::StateFilterManipulatorFunctor
1488 {
1489  return dds::sub::functors::StateFilterManipulatorFunctor(s);
1490 }
1491 
1515 inline dds::sub::functors::InstanceManipulatorFunctor
1517 {
1518  return dds::sub::functors::InstanceManipulatorFunctor(h);
1519 }
1520 
1555 inline dds::sub::functors::NextInstanceManipulatorFunctor
1557 {
1558  return dds::sub::functors::NextInstanceManipulatorFunctor(h);
1559 }
1560 
1561 }
1562 }
1563 
1564 // End of implementation
1565 
1566 #endif /* OSPL_DDS_SUB_TDATAREADER_IMPL_HPP_ */
DataReader events Listener.
Definition: DataReader.hpp:37
dds::sub::status::DataState default_filter_state()
Listener * listener() const
ManipulatorSelector & next_instance(const dds::core::InstanceHandle &handle)
A Subscriber is the object responsible for the actual reception of the data resulting from its subscr...
Definition: Subscriber.hpp:53
dds::sub::functors::MaxSamplesManipulatorFunctor max_samples(uint32_t n)
static StatusMask none()
Definition: State.hpp:205
virtual void on_data_available(DataReader< T > &reader)=0
DataReader & operator>>(dds::sub::LoanedSamples< T > &ls)
dds::topic::TopicInstance< T > key_value(const dds::core::InstanceHandle &h)
Reference(dds::core::null_type &)
A TopicInstance encapsulates a dds::sub::Sample and its associated dds::core::InstanceHandle.
Topic is the most basic description of the data to be published and subscribed.
Definition: Topic.hpp:36
This class provides the basic mechanism for an application to specify Quality of Service attributes f...
Selector & content(const dds::sub::Query &query)
ContentFilteredTopic is a specialization of TopicDescription that allows for content-based subscripti...
ManipulatorSelector & operator>>(dds::sub::LoanedSamples< T > &samples)
Support functionality to check if a given object type is a Topic.
Definition: TopicTraits.hpp:30
virtual void on_sample_rejected(DataReader< T > &reader, const dds::core::status::SampleRejectedStatus &status)=0
Selector & max_samples(uint32_t maxsamples)
This class is the base for Topic, ContentFilteredTopic and MultiTopic.
Query objects contain expressions that allow the application to specify a filter on the locally avail...
Definition: Query.hpp:61
dds::sub::LoanedSamples< T > take()
dds::sub::qos::DataReaderQos qos() const
virtual void on_requested_deadline_missed(DataReader< T > &reader, const dds::core::status::RequestedDeadlineMissedStatus &status)=0
DataReader allows the applicatin to access published sample data.
Definition: DataReader.hpp:34
dds::sub::qos::DataReaderQos default_datareader_qos() const
virtual void on_requested_incompatible_qos(DataReader< T > &reader, const dds::core::status::RequestedIncompatibleQosStatus &status)=0
virtual void on_liveliness_changed(DataReader< T > &reader, const dds::core::status::LivelinessChangedStatus &status)=0
void parameters(const FWIterator &begin, const FWIterator end)
Definition: QueryImpl.hpp:131
Class to hold sample DataState information.
Definition: DataState.hpp:371
Selector & state(const dds::sub::status::DataState &state)
ManipulatorSelector & state(const dds::sub::status::DataState &state)
DataReader(const dds::sub::Subscriber &sub, const ::dds::topic::Topic< T > &topic)
ManipulatorSelector & max_samples(uint32_t n)
SELECTOR & take(SELECTOR &selector)
const AnyDataReader & data_reader() const
Definition: QueryImpl.hpp:165
dds::sub::functors::NextInstanceManipulatorFunctor next_instance(const dds::core::InstanceHandle &h)
Class to hold the handle associated with in sample instance.
dds::sub::functors::ContentFilterManipulatorFunctor content(const dds::sub::Query &query)
dds::sub::LoanedSamples< T > read()
Selector & next_instance(const dds::core::InstanceHandle &handle)
ManipulatorSelector & content(const dds::sub::Query &query)
virtual void on_sample_lost(DataReader< T > &reader, const dds::core::status::SampleLostStatus &status)=0
SELECTOR & read(SELECTOR &selector)
Definition: array.hpp:23
const std::string & expression() const
Definition: QueryImpl.hpp:70
LoanedSamples< T > read()
dds::sub::functors::StateFilterManipulatorFunctor state(const dds::sub::status::DataState &s)
Selector & instance(const dds::core::InstanceHandle &handle)
const dds::domain::DomainParticipant & domain_participant() const
const dds::topic::Topic< T > & topic() const
StatusMask is a bitmap or bitset field.
Definition: State.hpp:144
Base class for reference-counted objects.
Definition: Reference.hpp:94
This class encapsulates and automates the management of loaned samples.
LoanedSamples< T > take()
dds::topic::qos::TopicQos qos() const
ManipulatorSelector & instance(const dds::core::InstanceHandle &handle)
virtual void on_subscription_matched(DataReader< T > &reader, const dds::core::status::SubscriptionMatchedStatus &status)=0
dds::sub::functors::InstanceManipulatorFunctor instance(const dds::core::InstanceHandle &h)
const dds::core::InstanceHandle lookup_instance(const T &key) const