21 #ifndef VORTEX_FACE_CONNECTION_HPP_ 22 #define VORTEX_FACE_CONNECTION_HPP_ 33 template <
typename TYPE>
40 topic(dds::core::null),
41 reader(dds::core::null),
42 writer(dds::core::null),
43 waitset(dds::core::null),
44 participant(dds::core::null),
53 if (this->listener != NULL) {
65 if (instance.get() != NULL) {
67 if (any.get() != NULL) {
68 connection = OSPL_CXX11_STD_MODULE::dynamic_pointer_cast<
Connection<TYPE> >(any);
69 if (connection.
get() != NULL) {
73 FACE_REPORT_ERROR(status,
"Failed to get connection '%d' for type <%s>", (
int)connectionId, Connection::typeName().c_str());
92 if (this->writer != dds::core::null) {
94 dds::pub::qos::DataWriterQos qos = this->writer.qos();
95 if ((qos.policy<dds::core::policy::Reliability>().kind() == dds::core::policy::ReliabilityKind::RELIABLE) &&
96 (this->copyIn(timeout) > qos.policy<dds::core::policy::Reliability>().max_blocking_time())) {
99 this->writer.write(message);
103 }
catch (
const dds::core::Exception& e) {
123 if (this->reader != dds::core::null) {
128 this->waitset.wait(this->copyIn(timeout));
130 dds::sub::LoanedSamples<TYPE> samples = this->reader.select().max_samples(1).take();
131 if (samples.length() == 1) {
132 const dds::sub::SampleInfo& info = samples.begin()->info();
134 message = samples.begin()->data();
142 }
catch (
const dds::core::Exception& e) {
167 if (this->reader != dds::core::null) {
169 if (this->listener == NULL) {
172 this->reader.listener(this->listener, dds::core::status::StatusMask::data_available());
176 FACE_REPORT_ERROR(status,
"Cannot register multiple callbacks for connection '%s'<%s>, unregister the existing callback first",
AnyConnection::getName().c_str(), Connection::typeName().c_str());
179 }
catch (
const dds::core::Exception& e) {
198 if (this->reader != dds::core::null) {
200 if (this->listener != NULL) {
202 this->reader.listener(NULL, dds::core::status::StatusMask::none());
203 delete this->listener;
204 this->listener = NULL;
211 }
catch (
const dds::core::Exception& e) {
226 return this->participant.domain_id();
233 this->initParticipant();
235 dds::pub::Publisher publisher(this->participant, this->config->getPublisherQos());
236 this->writer = dds::pub::DataWriter<TYPE>(publisher,
238 this->config->getWriterQos());
244 this->initParticipant();
246 dds::sub::Subscriber subscriber(this->participant, this->config->getSubscriberQos());
247 this->reader = dds::sub::DataReader<TYPE>(subscriber,
249 this->config->getReaderQos());
250 dds::core::cond::StatusCondition condition(this->reader);
251 condition.enabled_statuses(dds::core::status::StatusMask::data_available());
252 this->waitset = dds::core::cond::WaitSet();
253 this->waitset.attach_condition(condition);
260 if (this->topic == dds::core::null) {
261 this->topic = dds::topic::Topic<TYPE>(this->participant,
262 this->config->getTopicName(),
263 this->config->getTopicQos());
270 if (this->participant == dds::core::null) {
271 this->participant = dds::domain::DomainParticipant(this->config->getDomainId(),
272 this->config->getParticipantQos());
279 return dds::topic::topic_type_name<TYPE>::value();
282 dds::topic::Topic<TYPE> topic;
283 dds::sub::DataReader<TYPE> reader;
284 dds::pub::DataWriter<TYPE> writer;
285 dds::core::cond::WaitSet waitset;
286 dds::domain::DomainParticipant participant;
virtual int32_t getDomainId() const
Vortex::FACE::smart_ptr_traits< AnyConnection >::shared_ptr shared_ptr
int64_t CONNECTION_ID_TYPE
::FACE::RETURN_CODE_TYPE registerCallback(typename ::FACE::Read_Callback< TYPE >::send_event cb, const ::FACE::WAITSET_TYPE &mask)
#define FACE_REPORT_ERROR(code,...)
SYSTEM_TIME_TYPE TIMEOUT_TYPE
::FACE::RETURN_CODE_TYPE send(const TYPE &message, const ::FACE::TIMEOUT_TYPE &timeout)
::FACE::RETURN_CODE_TYPE receive(TYPE &message, const ::FACE::TIMEOUT_TYPE &timeout)
const SYSTEM_TIME_TYPE INF_TIME_VALUE
virtual ::FACE::RETURN_CODE_TYPE unregisterCallback()
VORTEX_FACE_API ::FACE::RETURN_CODE_TYPE exceptionToReturnCode(const dds::core::Exception &e)
static Connection< TYPE >::shared_ptr get(const ::FACE::CONNECTION_ID_TYPE &connectionId, ::FACE::RETURN_CODE_TYPE &status)
dds::core::array< bool, 32 > WAITSET_TYPE
Vortex::FACE::smart_ptr_traits< Connection< TYPE > >::shared_ptr shared_ptr
Vortex::FACE::smart_ptr_traits< FaceInstance >::shared_ptr shared_ptr
static FaceInstance::shared_ptr getInstance()