OpenSplice ISO C++ 2 DCPS  v6.x
ISO C++ 2 OpenSplice Data Distribution Service Data-Centric Publish-Subscribe API
TopicImpl.hpp
Go to the documentation of this file.
1 /*
2  * Vortex OpenSplice
3  *
4  * This software and documentation are Copyright 2006 to 2021 ADLINK
5  * Technology Limited, its affiliated companies and licensors. All rights
6  * reserved.
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 #ifndef OSPL_DDS_TOPIC_TTOPIC_HPP_
22 #define OSPL_DDS_TOPIC_TTOPIC_HPP_
23 
28 /*
29  * OMG PSM class declaration
30  */
31 #include <dds/topic/Topic.hpp>
32 #include "org/opensplice/topic/TopicTraits.hpp"
33 #include "org/opensplice/topic/TopicListener.hpp"
34 
35 // Implementation
36 
37 namespace dds
38 {
39 namespace topic
40 {
41 
42 
43 /***************************************************************************
44  *
45  * dds/topic/Topic<> WRAPPER implementation.
46  * Declaration can be found in dds/topic/Topic.hpp
47  *
48  ***************************************************************************/
49 
50 
51 template <typename T>
53  const std::string& topic_name) :
54  ::dds::core::Reference(new DELEGATE<T>(
55  dp,
56  topic_name,
57  "",
58  dp.is_nil() ? dds::topic::qos::TopicQos() : dp.default_topic_qos(),
59  NULL,
60  dds::core::status::StatusMask::none()))
61 {
62  ISOCPP_REPORT_STACK_DDS_BEGIN(dp);
63 
64  this->delegate()->init(this->impl_);
65 }
66 
67 template <typename T>
69  const std::string& topic_name,
70  const std::string& type_name) :
71  ::dds::core::Reference(new DELEGATE<T>(
72  dp,
73  topic_name,
74  type_name,
75  dp.is_nil() ? dds::topic::qos::TopicQos() : dp.default_topic_qos(),
76  NULL,
77  dds::core::status::StatusMask::none())),
78  ::dds::topic::AnyTopic(::dds::core::Reference< DELEGATE<T> >::delegate())
79 {
80  ISOCPP_REPORT_STACK_DDS_BEGIN(dp);
81 
82  this->delegate()->init(this->impl_);
83 }
84 
85 template <typename T>
87  const std::string& topic_name,
90  const dds::core::status::StatusMask& mask) :
91  ::dds::core::Reference(new DELEGATE<T>(
92  dp,
93  topic_name,
94  "",
95  qos,
96  listener,
97  mask)),
98  ::dds::topic::AnyTopic(::dds::core::Reference< DELEGATE<T> >::delegate())
99 {
100  ISOCPP_REPORT_STACK_DDS_BEGIN(dp);
101 
102  this->delegate()->init(this->impl_);
103 }
104 
105 template <typename T>
107  const std::string& topic_name,
108  const std::string& type_name,
111  const dds::core::status::StatusMask& mask) :
112  ::dds::core::Reference(new DELEGATE<T>(
113  dp,
114  topic_name,
115  type_name,
116  qos,
117  listener,
118  mask)),
119  ::dds::topic::AnyTopic(::dds::core::Reference< DELEGATE<T> >::delegate())
120 {
121  ISOCPP_REPORT_STACK_DDS_BEGIN(dp);
122 
123  this->delegate()->init(this->impl_);
124 }
125 
126 template <typename T>
127 Topic<T>::~Topic() { }
128 
129 template <typename T>
131  const ::dds::core::status::StatusMask& event_mask)
132 {
133  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
134 
135  this->delegate()->listener(listener, event_mask);
136 }
137 
138 template <typename T>
140 {
141  ISOCPP_REPORT_STACK_DDS_BEGIN(*this);
142 
143  return this->delegate()->listener();
144 }
145 
146 
147 }
148 }
149 
150 
151 
152 
153 /***************************************************************************
154  *
155  * dds/topic/detail/Topic<> DELEGATE implementation.
156  * Declaration can be found in dds/topic/detail/Topic.hpp
157  *
158  * Implementation and declaration have been separated because some circular
159  * dependencies, like with TopicListener and AnyTopic.
160  *
161  ***************************************************************************/
162 
164 #include <dds/topic/AnyTopic.hpp>
166 #include <org/opensplice/core/ScopedLock.hpp>
167 
168 template <typename T>
169 dds::topic::detail::Topic<T>::Topic(const dds::domain::DomainParticipant& dp,
170  const std::string& name,
171  const std::string& type_name,
174  const dds::core::status::StatusMask& mask)
175  : org::opensplice::topic::TopicDescriptionDelegate(dp, name, type_name),
176  org::opensplice::topic::AnyTopicDelegate(qos, dp, name, type_name)
177 {
178  ISOCPP_REPORT_STACK_NC_BEGIN();
179 
180  dds::domain::DomainParticipant participant = dds::core::null;
181 
182  /* The dp argument can be nil. Use the participant we know isn't nil because
183  * the TopicDescriptionDelegate would have created it when needed. */
184  participant = this->domain_participant();
185 
186  // Set the correct (IDL) type_name in the TopicDescription.
187  org::opensplice::topic::TopicDescriptionDelegate::myTypeName = org::opensplice::topic::TopicTraits<T>::getTypeName();
188 
189  // get and validate the kernel qos
190  org::opensplice::topic::qos::TopicQosDelegate tQos = qos.delegate();
191  tQos.check();
192  u_topicQos uTopicQos = tQos.u_qos();
193  u_participant uParticipant = participant->registerType(
194  org::opensplice::topic::TopicTraits<T>::getTypeName(),
195  org::opensplice::topic::TopicTraits<T>::getDescriptor(),
196  org::opensplice::topic::TopicTraits<T>::getDataRepresentationId(),
197  org::opensplice::topic::TopicTraits<T>::getTypeHash(),
198  org::opensplice::topic::TopicTraits<T>::getMetaData(),
199  org::opensplice::topic::TopicTraits<T>::getExtentions());
200 
201  u_topic uTopic = u_topicNew(
202  uParticipant,
203  name.c_str(),
204  org::opensplice::topic::TopicDescriptionDelegate::myTypeName.c_str(),
205  org::opensplice::topic::TopicTraits<T>::getKeyList(),
206  uTopicQos);
207 
208  u_topicQosFree(uTopicQos);
209 
210  if (!uTopic) {
211  ISOCPP_THROW_EXCEPTION(ISOCPP_ERROR, "Failed to create Topic");
212  }
213 
214  this->userHandle = (u_object)uTopic;
215  this->listener_set((void*)listener, mask);
216 }
217 
218 template <typename T>
219 dds::topic::detail::Topic<T>::Topic(const dds::domain::DomainParticipant& dp,
220  const std::string& name,
221  const std::string& type_name,
222  const dds::topic::qos::TopicQos& qos,
223  u_topic uTopic)
224  : org::opensplice::topic::TopicDescriptionDelegate(dp, name, type_name),
225  org::opensplice::topic::AnyTopicDelegate(qos, dp, name, type_name)
226 {
227  ISOCPP_REPORT_STACK_DDS_BEGIN(dp);
228  this->userHandle = (u_object)uTopic;
229  this->listener_set((void*)NULL, dds::core::status::StatusMask::none());
230 }
231 
232 
233 template <typename T>
234 dds::topic::detail::Topic<T>::~Topic()
235 {
236  if (!closed) {
237  try {
238  close();
239  } catch (...) {
240 
241  }
242  }
243 }
244 
245 template <typename T>
246 void
247 dds::topic::detail::Topic<T>::close()
248 {
250  this->listener_dispatcher_reset();
251 
252  org::opensplice::core::ScopedObjectLock scopedLock(*this);
253 
254  if (this->hasDependents()) {
255  ISOCPP_THROW_EXCEPTION(ISOCPP_PRECONDITION_NOT_MET_ERROR, "Topic still has unclosed dependencies (e.g. Readers/Writers/ContentFilteredTopics)");
256  }
257 
258  this->myParticipant.delegate()->remove_topic(*this);
259 
260  org::opensplice::core::EntityDelegate::close();
261 }
262 
263 template <typename T>
264 void
265 dds::topic::detail::Topic<T>::init(ObjectDelegate::weak_ref_type weak_ref)
266 {
267  /* Set weak_ref before passing ourselves to other isocpp objects. */
268  this->set_weak_ref(weak_ref);
269  /* Register topic at participant. */
270  this->myParticipant.delegate()->add_topic(*this);
271  /* Use listener dispatcher from the publisher. */
272  this->listener_dispatcher_set(this->myParticipant.delegate()->listener_dispatcher_get());
273  /* This only starts listening when the status mask shows interest. */
274  this->listener_enable();
275  /* Enable when needed. */
276  if (this->myParticipant.delegate()->is_auto_enable()) {
277  this->enable();
278  }
279 }
280 
281 template <typename T>
282 void
283 dds::topic::detail::Topic<T>::listener(TopicListener<T>* listener,
284  const ::dds::core::status::StatusMask& mask)
285 {
286  /* EntityDelegate takes care of thread safety. */
287  this->listener_set((void*)listener, mask);
288  this->listener_enable();
289 }
290 
291 template <typename T>
293 dds::topic::detail::Topic<T>::listener()
294 {
295  return reinterpret_cast<dds::topic::TopicListener<T>*>(this->listener_get());
296 }
297 
298 template <typename T>
300 dds::topic::detail::Topic<T>::wrapper()
301 {
302 
303  typename Topic::ref_type ref =
304  OSPL_CXX11_STD_MODULE::dynamic_pointer_cast<Topic<T> >(this->get_strong_ref());
306 
307  return topic;
308 }
309 
310 template <typename T>
311 void
312 dds::topic::detail::Topic<T>::listener_notify(
313  ObjectDelegate::ref_type source,
314  uint32_t triggerMask,
315  void *eventData,
316  void *l)
317 {
318  /* The EntityDelegate takes care of the thread safety and always
319  * provides a listener and source. */
320  dds::topic::TopicListener<T>* listener =
321  reinterpret_cast<dds::topic::TopicListener<T>*>(l);
322  assert(listener);
323 
324  /* Get Topic wrapper from given source EntityDelegate. */
325  typename Topic::ref_type ref =
326  OSPL_CXX11_STD_MODULE::dynamic_pointer_cast<Topic<T> >(source);
328 
329  if (triggerMask & V_EVENT_INCONSISTENT_TOPIC) {
331  status.delegate().v_status(((v_topicStatus)eventData)->inconsistentTopic);
332  listener->on_inconsistent_topic(topic, status);
333  }
334 
335  if (triggerMask & V_EVENT_ALL_DATA_DISPOSED ) {
338  if (extListener) {
340  status.delegate().v_status(((v_topicStatus)eventData)->allDataDisposed);
341  extListener->on_all_data_disposed(topic, status);
342  }
343  }
344 }
345 
346 template <typename T>
348 dds::topic::detail::Topic<T>::discover_topic(
350  const std::string& name,
351  const dds::core::Duration& timeout)
352 {
353  u_topic uTopic = dp.delegate()->lookup_topic(name, timeout);
354 
355  if (uTopic == NULL) {
356  return dds::core::null;
357  }
358 
359  os_char *uTypename = u_topicTypeName(uTopic);
360  std::string type_name = uTypename;
361  os_free(uTypename);
362 
363  u_topicQos uQos;
364  u_result uResult = u_topicGetQos(uTopic, &uQos);
365  ISOCPP_U_RESULT_CHECK_AND_THROW(uResult, "Failed to get user layer topic qos");
366 
368  qos.delegate().u_qos(uQos);
369  u_topicQosFree(uQos);
370 
371  typename dds::topic::Topic<T, dds::topic::detail::Topic>::DELEGATE_REF_T ref(new Topic<T>(dp, name, type_name, qos, uTopic));
372  ref->init(ref);
373 
374  return dds::topic::Topic<T>(ref);
375 }
376 
377 template <typename T>
378 void
379 dds::topic::detail::Topic<T>::discover_topics(
382  uint32_t max_size)
383 {
384  std::vector<u_topic> uTopics;
385 
386  dp.delegate()->lookup_topics(topic_type_name<T>::value(), uTopics, max_size);
387 
388  topics.clear();
389  topics.reserve(uTopics.size());
390 
391  for (std::vector<u_topic>::const_iterator it = uTopics.begin(); it != uTopics.end(); ++it) {
392  u_topic uTopic = *it;
393  os_char *topic_name = u_topicName(uTopic);
394  os_char *type_name = u_topicTypeName(uTopic);
395 
396  u_topicQos uQos;
397  u_result uResult = u_topicGetQos(uTopic, &uQos);
398  ISOCPP_U_RESULT_CHECK_AND_THROW(uResult, "Failed to get user layer topic qos");
399 
401  qos.delegate().u_qos(uQos);
402  u_topicQosFree(uQos);
403 
404  typename dds::topic::Topic<T, dds::topic::detail::Topic>::DELEGATE_REF_T ref(new Topic<T>(dp, topic_name, type_name, qos, uTopic));
405  ref->init(ref);
406 
407  os_free(topic_name);
408  os_free(type_name);
409 
410  topics.push_back(dds::topic::Topic<T>(ref));
411  }
412 }
413 
414 // End of implementation
415 
416 #endif /* OSPL_DDS_TOPIC_TTOPIC_HPP_ */
Typeless base class for the typed Topic.
Definition: AnyTopic.hpp:53
static StatusMask none()
Definition: State.hpp:205
Reference(dds::core::null_type &)
Topic proprietary events Listener.
Topic is the most basic description of the data to be published and subscribed.
Definition: Topic.hpp:36
virtual void on_inconsistent_topic(Topic< T > &topic, const dds::core::status::InconsistentTopicStatus &status)=0
virtual void on_all_data_disposed(dds::topic::Topic< T > &topic, const org::opensplice::core::status::AllDataDisposedTopicStatus &status)=0
A DomainParticipant represents the local membership of the application in a Domain.
const std::string & type_name() const
Definition: array.hpp:23
const dds::domain::DomainParticipant & domain_participant() const
Topic(const dds::domain::DomainParticipant &dp, const std::string &topic_name)
Definition: TopicImpl.hpp:52
StatusMask is a bitmap or bitset field.
Definition: State.hpp:144
This struct provides the basic mechanism for an application to specify Quality of Service attributes ...
Definition: TopicQos.hpp:67
Listener * listener() const
Definition: TopicImpl.hpp:139
dds::topic::qos::TopicQos qos() const
const std::string & name() const
Topic events Listener.
Definition: Topic.hpp:39
Support functionality to get the default type_name of a Topic type.
Definition: TopicTraits.hpp:46