OpenSplice Java FACE API  v6.x
OpenSplice Future Airborne Capability Environment (FACE) Java API
DestinationConnection.java
Go to the documentation of this file.
1 /*
2  * Vortex OpenSplice
3  *
4  * This software and documentation are Copyright 2006 to 2021 ADLINK
5  * Technology Limited, its affiliated companies and licensors. All rights
6  * reserved.
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 package org.vortex.FACE;
21 
22 import java.io.IOException;
23 import java.util.Set;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicLong;
27 import java.util.logging.Level;
28 
29 import org.omg.dds.core.AlreadyClosedException;
30 import org.omg.dds.core.WaitSet;
31 import org.omg.dds.core.event.DataAvailableEvent;
32 import org.omg.dds.core.policy.PolicyFactory;
33 import org.omg.dds.core.status.DataAvailableStatus;
34 import org.omg.dds.sub.DataReader;
35 import org.omg.dds.sub.DataReader.Selector;
36 import org.omg.dds.sub.DataReaderAdapter;
37 import org.omg.dds.sub.DataReaderQos;
38 import org.omg.dds.sub.InstanceState;
39 import org.omg.dds.sub.Sample;
40 import org.omg.dds.sub.Sample.Iterator;
41 import org.omg.dds.sub.Subscriber;
42 import org.omg.dds.sub.SubscriberQos;
43 
44 import FACE.CONNECTION_DIRECTION_TYPE;
45 import FACE.RETURN_CODE_TYPE;
46 import FACE.RETURN_CODE_TYPEHolder;
47 import FACE.VALIDITY_TYPE;
48 
49 public class DestinationConnection<TYPE> extends Connection<TYPE> {
50  private Subscriber subscriber;
51  private DataReader<TYPE> dataReader;
52  private Selector<TYPE> selector;
53  private WaitSet waitset;
54  private Read_Callback<TYPE> callback;
55  private final AtomicLong transactionid;
56 
58  Class<TYPE> dataType) {
59  super(description, dataType);
60  this.callback = null;
61  this.transactionid = new AtomicLong(0);
62  this.setupSubscriber();
63  this.setupDataReader();
64  this.setupSelector();
65  this.setupWaitset();
66  }
67 
68  private void setupSubscriber() {
69  try {
70  SubscriberQos qos = this.getDescription().getSubscriberQos();
71 
72  if (qos == null) {
73  qos = this.getParticipant().getDefaultSubscriberQos();
74  qos.withPolicy(PolicyFactory
75  .getPolicyFactory(this.getDescription().getEnvironment())
76  .Partition().withName(this.getDescription().getName()));
77  } else {
78  Set<String> names = qos.getPartition().getName();
79 
80  if (names.size() == 1 && names.contains("")) {
81  qos = qos.withPolicy(qos.getPartition().withName(
82  this.getDescription().getName()));
83  }
84  }
85  this.subscriber = this.getParticipant().createSubscriber(qos);
86  this.setLastMessageValidity(VALIDITY_TYPE.VALID);
87  } catch (AlreadyClosedException e) {
88  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
89  } catch (Exception exc) {
90  Logger.getInstance().log(exc.getMessage(), Level.SEVERE);
91  Logger.getInstance().log(exc.toString(), Level.FINEST);
92  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
93  }
94  }
95 
96  private void setupDataReader() {
97  try {
98  DataReaderQos qos = this.getDescription().getDataReaderQos();
99 
100  if (qos == null) {
101  qos = this.subscriber.getDefaultDataReaderQos();
102  }
103  this.dataReader = this.subscriber
104  .createDataReader(this.getTopic(), qos);
105  this.setLastMessageValidity(VALIDITY_TYPE.VALID);
106  } catch (AlreadyClosedException e) {
107  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
108  } catch (Exception exc) {
109  Logger.getInstance().log(exc.getMessage(), Level.SEVERE);
110  Logger.getInstance().log(exc.toString(), Level.FINEST);
111  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
112  }
113  }
114 
115  private void setupWaitset() {
116  try {
117  this.waitset = this.getDescription().getEnvironment().getSPI()
118  .newWaitSet();
119  this.waitset.attachCondition(this.selector.getCondition());
120  this.setLastMessageValidity(VALIDITY_TYPE.VALID);
121  } catch (AlreadyClosedException e) {
122  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
123  } catch (Exception exc) {
124  Logger.getInstance().log(exc.getMessage(), Level.SEVERE);
125  Logger.getInstance().log(exc.toString(), Level.FINEST);
126  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
127  }
128  }
129 
130  private void setupSelector() {
131  try {
132  this.selector = this.dataReader.select();
133  this.selector = this.selector.dataState(this.subscriber
134  .createDataState().withAnySampleState().withAnyViewState()
135  .with(InstanceState.ALIVE));
136  this.selector = this.selector.maxSamples(1);
137  this.setLastMessageValidity(VALIDITY_TYPE.VALID);
138  } catch (AlreadyClosedException e) {
139  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
140  } catch (Exception exc) {
141  Logger.getInstance().log(exc.getMessage(), Level.SEVERE);
142  Logger.getInstance().log(exc.toString(), Level.FINEST);
143  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
144  }
145  }
146 
147 
148  @Override
149  public CONNECTION_DIRECTION_TYPE getDirection() {
150  return CONNECTION_DIRECTION_TYPE.DESTINATION;
151  }
152 
154  return callback;
155  }
156 
157  @Override
158  public void close() {
160  super.close();
161 
162  }
163 
164  public void receiveMessage(long timeout, us.opengroup.FACE.LongHolder transaction_id,
165  Holder<TYPE> message, int message_size,
166  RETURN_CODE_TYPEHolder return_code) {
167  try {
168  if (timeout == FACE.INF_TIME_VALUE.value) {
169  timeout = Long.MAX_VALUE;
170  }
171  this.waitset.waitForConditions(timeout, TimeUnit.NANOSECONDS);
172  Iterator<TYPE> samples = this.dataReader.take(this.selector);
173  Sample<TYPE> sample = samples.next();
174  this.setLastMessageValidity(VALIDITY_TYPE.VALID);
175 
176  if (sample != null) {
177  transaction_id.value = this.transactionid.incrementAndGet();
178  message.value = sample.getData();
179  return_code.value = RETURN_CODE_TYPE.NO_ERROR;
180  } else {
182  .log("receiveMessage took no data even though waitset did trigger.",
183  Level.SEVERE);
184  return_code.value = RETURN_CODE_TYPE.INVALID_CONFIG;
185  }
186 
187  } catch (TimeoutException e) {
188  Logger.getInstance().log("receiveMessage timed out.", Level.FINE);
189  return_code.value = RETURN_CODE_TYPE.TIMED_OUT;
190  this.setLastMessageValidity(VALIDITY_TYPE.VALID); /* Timeout is valid */
191  } catch (AlreadyClosedException e) {
192  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
193  } catch (Exception exc) {
194  Logger.getInstance().log(exc.getMessage(), Level.SEVERE);
195  Logger.getInstance().log(exc.toString(), Level.FINEST);
196  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
197  }
198  }
199 
200  @SuppressWarnings("unchecked")
201  public RETURN_CODE_TYPE registerCallback(
202  Read_CallbackHolder<TYPE> callback, int maxMessageSize) {
203  if (maxMessageSize > this.getStatus().MAX_MESSAGE_SIZE) {
204  return RETURN_CODE_TYPE.INVALID_PARAM;
205  }
206  this.writeLock();
207 
208  this.callback = callback.value;
209 
210  final Read_Callback<TYPE> cb = this.callback;
211  try {
212  this.dataReader.setListener(new DataReaderAdapter<TYPE>() {
213  private final Read_Callback<TYPE> callback = cb;
214 
215  @Override
216  public void onDataAvailable(DataAvailableEvent<TYPE> status) {
217  try {
218  Iterator<TYPE> samples = dataReader.take();
219  setLastMessageValidity(VALIDITY_TYPE.VALID);
220 
221  while (samples.hasNext()) {
222  Sample<TYPE> sample = samples.next();
223  TYPE data = sample.getData();
224 
225  if (data != null) {
226  if (sample.getInstanceState().equals(InstanceState.ALIVE)) {
227  RETURN_CODE_TYPEHolder holder = new RETURN_CODE_TYPEHolder();
228  Holder<TYPE> dataHolder = new Holder<TYPE>();
229 
230  dataHolder.value = data;
231  holder.value = RETURN_CODE_TYPE.NO_ERROR;
232 
233  try {
234  readLock();
235  if (this.callback != null) {
236  this.callback.send_event(
237  transactionid.incrementAndGet(), dataHolder,
239  null, holder);
240  }
241  } catch (Exception e) {
243  "Exception occurred during send_event callback: " + e.getMessage(),
244  Level.SEVERE);
245  Logger.getInstance().log(e.toString(), Level.FINEST);
246  setLastMessageValidity(VALIDITY_TYPE.INVALID);
247  } finally {
248  readUnlock();
249  }
250 
251  if (holder.value != RETURN_CODE_TYPE.NO_ERROR) {
253  "send_event callback returned " + holder.value,
254  Level.SEVERE);
255  }
256  }
257  }
258  }
259  samples.close();
260  } catch (IOException i) {
262  .log("Iterator.close() failed (" + i.getMessage()
263  + ").", Level.SEVERE);
264  setLastMessageValidity(VALIDITY_TYPE.INVALID);
265  } catch (AlreadyClosedException e) {
266  setLastMessageValidity(VALIDITY_TYPE.INVALID);
267  } catch (Exception e) {
269  "Exception occurred: " + e.getMessage(),
270  Level.SEVERE);
271  Logger.getInstance().log(e.toString(), Level.FINEST);
272  setLastMessageValidity(VALIDITY_TYPE.INVALID);
273  }
274  }
275 
276  }, DataAvailableStatus.class);
277  } catch (AlreadyClosedException e) {
278  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
279  } catch (Exception exc) {
280  Logger.getInstance().log(exc.getMessage(), Level.SEVERE);
281  Logger.getInstance().log(exc.toString(), Level.FINEST);
282  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
283  }
284  this.writeUnlock();
285 
286  return RETURN_CODE_TYPE.NO_ERROR;
287  }
288 
289 
290  public RETURN_CODE_TYPE unregisterCallback() {
291  RETURN_CODE_TYPE result = RETURN_CODE_TYPE.NO_ACTION;
292 
293  this.writeLock();
294 
295  if (this.callback != null) {
296  try {
297  this.dataReader.setListener(null);
298  this.setLastMessageValidity(VALIDITY_TYPE.VALID);
299  } catch(AlreadyClosedException e) {
300  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
301  }
302  this.callback = null;
303  result = RETURN_CODE_TYPE.NO_ERROR;
304  }
305  this.writeUnlock();
306 
307  return result;
308  }
309 }
void log(String message, Level level)
Definition: Logger.java:47
DomainParticipant getParticipant()
Definition: Connection.java:79
RETURN_CODE_TYPE registerCallback(Read_CallbackHolder< TYPE > callback, int maxMessageSize)
Topic< TYPE > getTopic()
Definition: Connection.java:83
static Logger getInstance()
Definition: Logger.java:43
TRANSPORT_CONNECTION_STATUS_TYPE getStatus()
Definition: Connection.java:59
void receiveMessage(long timeout, us.opengroup.FACE.LongHolder transaction_id, Holder< TYPE > message, int message_size, RETURN_CODE_TYPEHolder return_code)
void setLastMessageValidity(VALIDITY_TYPE lastMessageValidity)
Definition: Connection.java:67
This is a typed class which will be generated by idlpp.
Definition: TS.java:25
DestinationConnection(ConnectionDescription description, Class< TYPE > dataType)
void send_event(long transaction_id, Holder< TYPE > message, long message_type_id, int message_size, boolean[] waitset, FACE.RETURN_CODE_TYPEHolder return_code)
ConnectionDescription getDescription()
Definition: Connection.java:75