20 package org.vortex.FACE;
22 import java.io.IOException;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicLong;
27 import java.util.logging.Level;
29 import org.omg.dds.core.AlreadyClosedException;
30 import org.omg.dds.core.WaitSet;
31 import org.omg.dds.core.event.DataAvailableEvent;
32 import org.omg.dds.core.policy.PolicyFactory;
33 import org.omg.dds.core.status.DataAvailableStatus;
34 import org.omg.dds.sub.DataReader;
35 import org.omg.dds.sub.DataReader.Selector;
36 import org.omg.dds.sub.DataReaderAdapter;
37 import org.omg.dds.sub.DataReaderQos;
38 import org.omg.dds.sub.InstanceState;
39 import org.omg.dds.sub.Sample;
40 import org.omg.dds.sub.Sample.Iterator;
41 import org.omg.dds.sub.Subscriber;
42 import org.omg.dds.sub.SubscriberQos;
44 import FACE.CONNECTION_DIRECTION_TYPE;
46 import FACE.RETURN_CODE_TYPEHolder;
47 import FACE.VALIDITY_TYPE;
50 private Subscriber subscriber;
51 private DataReader<TYPE> dataReader;
52 private Selector<TYPE> selector;
53 private WaitSet waitset;
55 private final AtomicLong transactionid;
58 Class<TYPE> dataType) {
59 super(description, dataType);
61 this.transactionid =
new AtomicLong(0);
62 this.setupSubscriber();
63 this.setupDataReader();
68 private void setupSubscriber() {
74 qos.withPolicy(PolicyFactory
78 Set<String> names = qos.getPartition().getName();
80 if (names.size() == 1 && names.contains(
"")) {
81 qos = qos.withPolicy(qos.getPartition().withName(
87 }
catch (AlreadyClosedException e) {
89 }
catch (Exception exc) {
96 private void setupDataReader() {
101 qos = this.subscriber.getDefaultDataReaderQos();
103 this.dataReader = this.subscriber
104 .createDataReader(this.
getTopic(), qos);
106 }
catch (AlreadyClosedException e) {
108 }
catch (Exception exc) {
115 private void setupWaitset() {
119 this.waitset.attachCondition(this.selector.getCondition());
121 }
catch (AlreadyClosedException e) {
123 }
catch (Exception exc) {
130 private void setupSelector() {
132 this.selector = this.dataReader.select();
133 this.selector = this.selector.dataState(this.subscriber
134 .createDataState().withAnySampleState().withAnyViewState()
135 .with(InstanceState.ALIVE));
136 this.selector = this.selector.maxSamples(1);
138 }
catch (AlreadyClosedException e) {
140 }
catch (Exception exc) {
150 return CONNECTION_DIRECTION_TYPE.DESTINATION;
166 RETURN_CODE_TYPEHolder return_code) {
168 if (timeout ==
FACE.INF_TIME_VALUE.value) {
169 timeout = Long.MAX_VALUE;
171 this.waitset.waitForConditions(timeout, TimeUnit.NANOSECONDS);
172 Iterator<TYPE> samples = this.dataReader.take(this.selector);
173 Sample<TYPE> sample = samples.next();
176 if (sample != null) {
177 transaction_id.value = this.transactionid.incrementAndGet();
178 message.
value = sample.getData();
179 return_code.value = RETURN_CODE_TYPE.NO_ERROR;
182 .
log(
"receiveMessage took no data even though waitset did trigger.",
184 return_code.value = RETURN_CODE_TYPE.INVALID_CONFIG;
187 }
catch (TimeoutException e) {
189 return_code.value = RETURN_CODE_TYPE.TIMED_OUT;
191 }
catch (AlreadyClosedException e) {
193 }
catch (Exception exc) {
200 @SuppressWarnings(
"unchecked")
203 if (maxMessageSize > this.
getStatus().MAX_MESSAGE_SIZE) {
204 return RETURN_CODE_TYPE.INVALID_PARAM;
208 this.callback = callback.value;
212 this.dataReader.setListener(
new DataReaderAdapter<TYPE>() {
216 public void onDataAvailable(DataAvailableEvent<TYPE> status) {
218 Iterator<TYPE> samples = dataReader.take();
221 while (samples.hasNext()) {
222 Sample<TYPE> sample = samples.next();
223 TYPE data = sample.getData();
226 if (sample.getInstanceState().equals(InstanceState.ALIVE)) {
227 RETURN_CODE_TYPEHolder holder =
new RETURN_CODE_TYPEHolder();
230 dataHolder.
value = data;
231 holder.value = RETURN_CODE_TYPE.NO_ERROR;
235 if (this.callback != null) {
237 transactionid.incrementAndGet(), dataHolder,
241 }
catch (Exception e) {
243 "Exception occurred during send_event callback: " + e.getMessage(),
251 if (holder.value != RETURN_CODE_TYPE.NO_ERROR) {
253 "send_event callback returned " + holder.value,
260 }
catch (IOException i) {
262 .
log(
"Iterator.close() failed (" + i.getMessage()
263 +
").", Level.SEVERE);
265 }
catch (AlreadyClosedException e) {
267 }
catch (Exception e) {
269 "Exception occurred: " + e.getMessage(),
276 }, DataAvailableStatus.class);
277 }
catch (AlreadyClosedException e) {
279 }
catch (Exception exc) {
286 return RETURN_CODE_TYPE.NO_ERROR;
291 RETURN_CODE_TYPE result = RETURN_CODE_TYPE.NO_ACTION;
295 if (this.callback != null) {
297 this.dataReader.setListener(null);
299 }
catch(AlreadyClosedException e) {
302 this.callback = null;
303 result = RETURN_CODE_TYPE.NO_ERROR;
ServiceEnvironment getEnvironment()
long getPlatformViewGuid()
void log(String message, Level level)
DomainParticipant getParticipant()
RETURN_CODE_TYPE registerCallback(Read_CallbackHolder< TYPE > callback, int maxMessageSize)
CONNECTION_DIRECTION_TYPE getDirection()
static Logger getInstance()
TRANSPORT_CONNECTION_STATUS_TYPE getStatus()
void receiveMessage(long timeout, us.opengroup.FACE.LongHolder transaction_id, Holder< TYPE > message, int message_size, RETURN_CODE_TYPEHolder return_code)
void setLastMessageValidity(VALIDITY_TYPE lastMessageValidity)
RETURN_CODE_TYPE unregisterCallback()
SubscriberQos getSubscriberQos()
This is a typed class which will be generated by idlpp.
Read_Callback< TYPE > getCallback()
DataReaderQos getDataReaderQos()
DestinationConnection(ConnectionDescription description, Class< TYPE > dataType)
void send_event(long transaction_id, Holder< TYPE > message, long message_type_id, int message_size, boolean[] waitset, FACE.RETURN_CODE_TYPEHolder return_code)
ConnectionDescription getDescription()