OpenSplice ISO C++ 2 FACE API  v6.x
OpenSplice Future Airborne Capability Environment (FACE) ISO C++ 2 API
Connection.hpp
Go to the documentation of this file.
1 /*
2  * Vortex OpenSplice
3  *
4  * This software and documentation are Copyright 2006 to 2021 ADLINK
5  * Technology Limited, its affiliated companies and licensors. All rights
6  * reserved.
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 #ifndef VORTEX_FACE_CONNECTION_HPP_
22 #define VORTEX_FACE_CONNECTION_HPP_
23 
24 #include "Vortex_FACE.hpp"
29 
30 namespace Vortex {
31 namespace FACE {
32 
33 template <typename TYPE>
34 class Connection : public AnyConnection
35 {
36 public:
38 
40  topic(dds::core::null),
41  reader(dds::core::null),
42  writer(dds::core::null),
43  waitset(dds::core::null),
44  participant(dds::core::null),
45  listener(NULL)
46  {
47  }
48 
49  virtual ~Connection()
50  {
51  /* Be sure to remove the listener before the reader is destroyed.
52  * Otherwise it could cause big problems (like core dumps). */
53  if (this->listener != NULL) {
54  (void)this->unregisterCallback();
55  }
56  }
57 
58  /* Get the Connection<TYPE>, related to the given connectionId. */
59  static typename Connection<TYPE>::shared_ptr
62  {
63  typename Connection<TYPE>::shared_ptr connection;
65  if (instance.get() != NULL) {
66  AnyConnection::shared_ptr any = instance->getConnection(connectionId);
67  if (any.get() != NULL) {
68  connection = OSPL_CXX11_STD_MODULE::dynamic_pointer_cast< Connection<TYPE> >(any);
69  if (connection.get() != NULL) {
70  status = ::FACE::NO_ERROR;
71  } else {
72  status = ::FACE::INVALID_CONFIG;
73  FACE_REPORT_ERROR(status, "Failed to get connection '%d' for type <%s>", (int)connectionId, Connection::typeName().c_str());
74  }
75  } else {
76  status = ::FACE::INVALID_PARAM;
77  FACE_REPORT_ERROR(status, "Failed to find connection '%d'", (int)connectionId);
78  }
79  } else {
80  status = ::FACE::INVALID_CONFIG;
81  FACE_REPORT_ERROR(status, "Instance not initialized");
82  }
83  return connection;
84  }
85 
86  /* Send the message (AKA write the sample). */
88  send(const TYPE &message,
90  {
92  if (this->writer != dds::core::null) {
93  try {
94  dds::pub::qos::DataWriterQos qos = this->writer.qos();
95  if ((qos.policy<dds::core::policy::Reliability>().kind() == dds::core::policy::ReliabilityKind::RELIABLE) &&
96  (this->copyIn(timeout) > qos.policy<dds::core::policy::Reliability>().max_blocking_time())) {
97  status = ::FACE::INVALID_PARAM;
98  } else {
99  this->writer.write(message);
100  AnyConnection::setLastValidity(::FACE::VALID);
101  status = ::FACE::NO_ERROR;
102  }
103  } catch (const dds::core::Exception& e) {
104  AnyConnection::setLastValidity(::FACE::INVALID);
106  } catch (...) {
107  AnyConnection::setLastValidity(::FACE::INVALID);
108  status = ::FACE::NO_ACTION;
109  assert(false);
110  }
111  } else {
112  FACE_REPORT_ERROR(status, "Write not allowed for '%s'<%s>", AnyConnection::getName().c_str(), Connection::typeName().c_str());
113  }
114  return status;
115  }
116 
117  /* Receive the message (AKA take the sample). */
119  receive(TYPE &message,
121  {
123  if (this->reader != dds::core::null) {
124  if (timeout >= 0 || timeout == ::FACE::INF_TIME_VALUE) {
125  try {
126  status = ::FACE::NO_ACTION;
127  if (timeout > 0) {
128  this->waitset.wait(this->copyIn(timeout));
129  }
130  dds::sub::LoanedSamples<TYPE> samples = this->reader.select().max_samples(1).take();
131  if (samples.length() == 1) {
132  const dds::sub::SampleInfo& info = samples.begin()->info();
133  if (info.valid()) {
134  message = samples.begin()->data();
135  AnyConnection::setLastValidity(::FACE::VALID);
136  status = ::FACE::NO_ERROR;
137  } else {
138  FACE_REPORT_ERROR(status, "Invalid data received for '%s'<%s>",
139  AnyConnection::getName().c_str(), Connection::typeName().c_str());
140  }
141  }
142  } catch (const dds::core::Exception& e) {
143  AnyConnection::setLastValidity(::FACE::INVALID);
145  } catch (...) {
146  AnyConnection::setLastValidity(::FACE::INVALID);
147  status = ::FACE::NO_ACTION;
148  assert(false);
149  }
150  } else {
151  status = ::FACE::INVALID_PARAM;
152  FACE_REPORT_ERROR(status, "Invalid timeout '%" PA_PRId64 "' for '%s'<%s>",
153  timeout, AnyConnection::getName().c_str(), Connection::typeName().c_str());
154  }
155  } else {
156  FACE_REPORT_ERROR(status, "Read not allowed for '%s'<%s>", AnyConnection::getName().c_str(), Connection::typeName().c_str());
157  }
158  return status;
159  }
160 
161  /* Register the data callback (AKA set reader listener). */
163  registerCallback(typename ::FACE::Read_Callback<TYPE>::send_event cb,
165  {
167  if (this->reader != dds::core::null) {
168  try {
169  if (this->listener == NULL) {
170  /* Register a new listener at the dds reader, to be triggered when data is available. */
171  this->listener = new Vortex::FACE::DataListener<TYPE>(Connection::typeName(), AnyConnection::getName(), cb);
172  this->reader.listener(this->listener, dds::core::status::StatusMask::data_available());
173  AnyConnection::setLastValidity(::FACE::VALID);
174  status = ::FACE::NO_ERROR;
175  } else {
176  FACE_REPORT_ERROR(status, "Cannot register multiple callbacks for connection '%s'<%s>, unregister the existing callback first", AnyConnection::getName().c_str(), Connection::typeName().c_str());
177  status = ::FACE::NO_ACTION;
178  }
179  } catch (const dds::core::Exception& e) {
180  AnyConnection::setLastValidity(::FACE::INVALID);
182  } catch (...) {
183  AnyConnection::setLastValidity(::FACE::INVALID);
184  status = ::FACE::NO_ACTION;
185  assert(false);
186  }
187  } else {
188  FACE_REPORT_ERROR(status, "No reader for connection '%s'<%s>, unable to register callback", AnyConnection::getName().c_str(), Connection::typeName().c_str());
189  }
190  return status;
191  }
192 
193  /* Un-register the data callback (AKA reset reader listener). */
196  {
198  if (this->reader != dds::core::null) {
199  try {
200  if (this->listener != NULL) {
201  /* Reset the listener from the reader and destroy it. */
202  this->reader.listener(NULL, dds::core::status::StatusMask::none());
203  delete this->listener;
204  this->listener = NULL;
205  AnyConnection::setLastValidity(::FACE::VALID);
206  status = ::FACE::NO_ERROR;
207  } else {
208  FACE_REPORT_ERROR(status, "Cannot unregister non-existent callback for connection '%s'<%s>", AnyConnection::getName().c_str(), Connection::typeName().c_str());
209  status = ::FACE::NO_ACTION;
210  }
211  } catch (const dds::core::Exception& e) {
212  AnyConnection::setLastValidity(::FACE::INVALID);
214  } catch (...) {
215  AnyConnection::setLastValidity(::FACE::INVALID);
216  status = ::FACE::NO_ACTION;
217  assert(false);
218  }
219  } else {
220  FACE_REPORT_ERROR(status, "No reader for connection '%s'<%s>, unable to unregister callback", AnyConnection::getName().c_str(), Connection::typeName().c_str());
221  }
222  return status;
223  }
224 
225  virtual int32_t getDomainId() const {
226  return this->participant.domain_id();
227  }
228 
229 protected:
230  virtual void
231  initWriter()
232  {
233  this->initParticipant();
234  this->initTopic();
235  dds::pub::Publisher publisher(this->participant, this->config->getPublisherQos());
236  this->writer = dds::pub::DataWriter<TYPE>(publisher,
237  this->topic,
238  this->config->getWriterQos());
239  }
240 
241  virtual void
242  initReader()
243  {
244  this->initParticipant();
245  this->initTopic();
246  dds::sub::Subscriber subscriber(this->participant, this->config->getSubscriberQos());
247  this->reader = dds::sub::DataReader<TYPE>(subscriber,
248  this->topic,
249  this->config->getReaderQos());
250  dds::core::cond::StatusCondition condition(this->reader);
251  condition.enabled_statuses(dds::core::status::StatusMask::data_available());
252  this->waitset = dds::core::cond::WaitSet();
253  this->waitset.attach_condition(condition);
254  }
255 
256 private:
257  void
258  initTopic()
259  {
260  if (this->topic == dds::core::null) {
261  this->topic = dds::topic::Topic<TYPE>(this->participant,
262  this->config->getTopicName(),
263  this->config->getTopicQos());
264  }
265  }
266 
267  void
268  initParticipant()
269  {
270  if (this->participant == dds::core::null) {
271  this->participant = dds::domain::DomainParticipant(this->config->getDomainId(),
272  this->config->getParticipantQos());
273  }
274  }
275 
276  static std::string
277  typeName()
278  {
279  return dds::topic::topic_type_name<TYPE>::value();
280  }
281 
282  dds::topic::Topic<TYPE> topic;
283  dds::sub::DataReader<TYPE> reader;
284  dds::pub::DataWriter<TYPE> writer;
285  dds::core::cond::WaitSet waitset;
286  dds::domain::DomainParticipant participant;
287  typename Vortex::FACE::DataListener<TYPE> *listener;
288 };
289 
290 }; /* namespace FACE */
291 }; /* namespace Vortex */
292 
293 
294 #endif /* VORTEX_FACE_CONNECTION_HPP_ */
virtual int32_t getDomainId() const
Definition: Connection.hpp:225
Vortex::FACE::smart_ptr_traits< AnyConnection >::shared_ptr shared_ptr
int64_t CONNECTION_ID_TYPE
::FACE::RETURN_CODE_TYPE registerCallback(typename ::FACE::Read_Callback< TYPE >::send_event cb, const ::FACE::WAITSET_TYPE &mask)
Definition: Connection.hpp:163
RETURN_CODE_TYPE
Definition: FACE_common.h:54
#define FACE_REPORT_ERROR(code,...)
SYSTEM_TIME_TYPE TIMEOUT_TYPE
Definition: FACE_common.h:49
::FACE::RETURN_CODE_TYPE send(const TYPE &message, const ::FACE::TIMEOUT_TYPE &timeout)
Definition: Connection.hpp:88
::FACE::RETURN_CODE_TYPE receive(TYPE &message, const ::FACE::TIMEOUT_TYPE &timeout)
Definition: Connection.hpp:119
const SYSTEM_TIME_TYPE INF_TIME_VALUE
Definition: FACE_common.h:50
virtual ::FACE::RETURN_CODE_TYPE unregisterCallback()
Definition: Connection.hpp:195
VORTEX_FACE_API ::FACE::RETURN_CODE_TYPE exceptionToReturnCode(const dds::core::Exception &e)
static Connection< TYPE >::shared_ptr get(const ::FACE::CONNECTION_ID_TYPE &connectionId, ::FACE::RETURN_CODE_TYPE &status)
Definition: Connection.hpp:60
dds::core::array< bool, 32 > WAITSET_TYPE
Vortex::FACE::smart_ptr_traits< Connection< TYPE > >::shared_ptr shared_ptr
Definition: Connection.hpp:37
Vortex::FACE::smart_ptr_traits< FaceInstance >::shared_ptr shared_ptr
static FaceInstance::shared_ptr getInstance()