20 package org.vortex.FACE;
    22 import java.io.IOException;
    24 import java.util.concurrent.TimeUnit;
    25 import java.util.concurrent.TimeoutException;
    26 import java.util.concurrent.atomic.AtomicLong;
    27 import java.util.logging.Level;
    29 import org.omg.dds.core.AlreadyClosedException;
    30 import org.omg.dds.core.WaitSet;
    31 import org.omg.dds.core.event.DataAvailableEvent;
    32 import org.omg.dds.core.policy.PolicyFactory;
    33 import org.omg.dds.core.status.DataAvailableStatus;
    34 import org.omg.dds.sub.DataReader;
    35 import org.omg.dds.sub.DataReader.Selector;
    36 import org.omg.dds.sub.DataReaderAdapter;
    37 import org.omg.dds.sub.DataReaderQos;
    38 import org.omg.dds.sub.InstanceState;
    39 import org.omg.dds.sub.Sample;
    40 import org.omg.dds.sub.Sample.Iterator;
    41 import org.omg.dds.sub.Subscriber;
    42 import org.omg.dds.sub.SubscriberQos;
    44 import FACE.CONNECTION_DIRECTION_TYPE;
    46 import FACE.RETURN_CODE_TYPEHolder;
    47 import FACE.VALIDITY_TYPE;
    50     private Subscriber subscriber;
    51     private DataReader<TYPE> dataReader;
    52     private Selector<TYPE> selector;
    53     private WaitSet waitset;
    55     private final AtomicLong transactionid;
    58             Class<TYPE> dataType) {
    59         super(description, dataType);
    61         this.transactionid = 
new AtomicLong(0);
    62         this.setupSubscriber();
    63         this.setupDataReader();
    68     private void setupSubscriber() {
    74                 qos.withPolicy(PolicyFactory
    78                 Set<String> names = qos.getPartition().getName();
    80                 if (names.size() == 1 && names.contains(
"")) {
    81                     qos = qos.withPolicy(qos.getPartition().withName(
    87         } 
catch (AlreadyClosedException e) {
    89         } 
catch (Exception exc) {
    96     private void setupDataReader() {
   101                 qos = this.subscriber.getDefaultDataReaderQos();
   103             this.dataReader = this.subscriber
   104                     .createDataReader(this.
getTopic(), qos);
   106         } 
catch (AlreadyClosedException e) {
   108         } 
catch (Exception exc) {
   115     private void setupWaitset() {
   119             this.waitset.attachCondition(this.selector.getCondition());
   121         } 
catch (AlreadyClosedException e) {
   123         } 
catch (Exception exc) {
   130     private void setupSelector() {
   132             this.selector = this.dataReader.select();
   133             this.selector = this.selector.dataState(this.subscriber
   134                     .createDataState().withAnySampleState().withAnyViewState()
   135                     .with(InstanceState.ALIVE));
   136             this.selector = this.selector.maxSamples(1);
   138         } 
catch (AlreadyClosedException e) {
   140         } 
catch (Exception exc) {
   150         return CONNECTION_DIRECTION_TYPE.DESTINATION;
   166             RETURN_CODE_TYPEHolder return_code) {
   168             if (timeout == 
FACE.INF_TIME_VALUE.value) {
   169                 timeout = Long.MAX_VALUE;
   171             this.waitset.waitForConditions(timeout, TimeUnit.NANOSECONDS);
   172             Iterator<TYPE> samples = this.dataReader.take(this.selector);
   173             Sample<TYPE> sample = samples.next();
   176             if (sample != null) {
   177                 transaction_id.value = this.transactionid.incrementAndGet();
   178                 message.
value = sample.getData();
   179                 return_code.value = RETURN_CODE_TYPE.NO_ERROR;
   182                 .
log(
"receiveMessage took no data even though waitset did trigger.",
   184                 return_code.value = RETURN_CODE_TYPE.INVALID_CONFIG;
   187         } 
catch (TimeoutException e) {
   189             return_code.value = RETURN_CODE_TYPE.TIMED_OUT;
   191         } 
catch (AlreadyClosedException e) {
   193         } 
catch (Exception exc) {
   200     @SuppressWarnings(
"unchecked")
   203         if (maxMessageSize > this.
getStatus().MAX_MESSAGE_SIZE) {
   204             return RETURN_CODE_TYPE.INVALID_PARAM;
   208         this.callback = callback.value;
   212             this.dataReader.setListener(
new DataReaderAdapter<TYPE>() {
   216                 public void onDataAvailable(DataAvailableEvent<TYPE> status) {
   218                         Iterator<TYPE> samples = dataReader.take();
   221                         while (samples.hasNext()) {
   222                             Sample<TYPE> sample = samples.next();
   223                             TYPE data = sample.getData();
   226                                 if (sample.getInstanceState().equals(InstanceState.ALIVE)) {
   227                                     RETURN_CODE_TYPEHolder holder = 
new RETURN_CODE_TYPEHolder();
   230                                     dataHolder.
value = data;
   231                                     holder.value = RETURN_CODE_TYPE.NO_ERROR;
   235                                         if (this.callback != null) {
   237                                                     transactionid.incrementAndGet(), dataHolder,
   241                                     } 
catch (Exception e) {
   243                                                 "Exception occurred during send_event callback: " + e.getMessage(),
   251                                     if (holder.value != RETURN_CODE_TYPE.NO_ERROR) {
   253                                                 "send_event callback returned " + holder.value,
   260                     } 
catch (IOException i) {
   262                         .
log(
"Iterator.close() failed (" + i.getMessage()
   263                         + 
").", Level.SEVERE);
   265                     } 
catch (AlreadyClosedException e) {
   267                     } 
catch (Exception e) {
   269                                 "Exception occurred: " + e.getMessage(),
   276             }, DataAvailableStatus.class);
   277         } 
catch (AlreadyClosedException e) {
   279         } 
catch (Exception exc) {
   286         return RETURN_CODE_TYPE.NO_ERROR;
   291         RETURN_CODE_TYPE result = RETURN_CODE_TYPE.NO_ACTION;
   295         if (this.callback != null) {
   297                 this.dataReader.setListener(null);
   299             } 
catch(AlreadyClosedException e) {
   302             this.callback = null;
   303             result = RETURN_CODE_TYPE.NO_ERROR;
 
ServiceEnvironment getEnvironment()
long getPlatformViewGuid()
void log(String message, Level level)
DomainParticipant getParticipant()
RETURN_CODE_TYPE registerCallback(Read_CallbackHolder< TYPE > callback, int maxMessageSize)
CONNECTION_DIRECTION_TYPE getDirection()
static Logger getInstance()
TRANSPORT_CONNECTION_STATUS_TYPE getStatus()
void receiveMessage(long timeout, us.opengroup.FACE.LongHolder transaction_id, Holder< TYPE > message, int message_size, RETURN_CODE_TYPEHolder return_code)
void setLastMessageValidity(VALIDITY_TYPE lastMessageValidity)
RETURN_CODE_TYPE unregisterCallback()
SubscriberQos getSubscriberQos()
This is a typed class which will be generated by idlpp. 
Read_Callback< TYPE > getCallback()
DataReaderQos getDataReaderQos()
DestinationConnection(ConnectionDescription description, Class< TYPE > dataType)
void send_event(long transaction_id, Holder< TYPE > message, long message_type_id, int message_size, boolean[] waitset, FACE.RETURN_CODE_TYPEHolder return_code)
ConnectionDescription getDescription()