OpenSplice Java 5 DCPS  v6.x
OpenSplice Java 5 OpenSplice Data Distribution Service Data-Centric Publish-Subscribe API
SubscriberImpl.java
Go to the documentation of this file.
1 /*
2  * Vortex OpenSplice
3  *
4  * This software and documentation are Copyright 2006 to 2021 ADLINK
5  * Technology Limited, its affiliated companies and licensors. All rights
6  * reserved.
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 package org.opensplice.dds.sub;
22 
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.List;
29 
32 import org.omg.dds.core.status.Status;
33 import org.omg.dds.pub.DataWriter;
34 import org.omg.dds.sub.DataReader;
36 import org.omg.dds.sub.DataReaderQos;
37 import org.omg.dds.sub.Subscriber;
39 import org.omg.dds.sub.SubscriberQos;
41 import org.omg.dds.topic.TopicQos;
53 
54 public class SubscriberImpl
55  extends
56  DomainEntityImpl<DDS.Subscriber, DomainParticipantImpl, DDS.DomainParticipant, SubscriberQos, SubscriberListener, SubscriberListenerImpl>
57  implements Subscriber {
58  private final HashMap<DDS.DataReader, AbstractDataReader<?>> readers;
59  private final boolean isBuiltin;
60 
64  Collection<Class<? extends Status>> statuses) {
65  super(environment, parent, parent.getOld());
66  DDS.SubscriberQos oldQos;
67 
68  if (qos == null) {
69  throw new IllegalArgumentExceptionImpl(this.environment,
70  "Supplied SubscriberQos is null.");
71  }
72 
73  try {
74  oldQos = ((SubscriberQosImpl) qos).convert();
75  } catch (ClassCastException e) {
76  throw new IllegalArgumentExceptionImpl(this.environment,
77  "Cannot create Subscribe with non-OpenSplice qos");
78  }
79 
80  if (listener != null) {
81  this.listener = new SubscriberListenerImpl(this.environment, this,
82  listener, true);
83  } else {
84  this.listener = null;
85  }
86  DDS.Subscriber old = this.parent.getOld().create_subscriber(oldQos,
87  this.listener,
88  StatusConverter.convertMask(this.environment, statuses));
89 
90  if (old == null) {
91  Utilities.throwLastErrorException(this.environment);
92  }
93  this.setOld(old);
94  this.readers = new HashMap<DDS.DataReader, AbstractDataReader<?>>();
95  this.isBuiltin = false;
96 
97  if (this.listener != null) {
98  this.listener.setInitialised();
99  }
100  }
101 
103  DomainParticipantImpl parent, DDS.Subscriber oldSubscriber) {
104  super(environment, parent, parent.getOld());
105 
106  if (oldSubscriber == null) {
107  throw new IllegalArgumentExceptionImpl(environment,
108  "Supplied Subscriber is invalid (null).");
109  }
110  this.listener = null;
111  this.setOld(oldSubscriber);
112  this.readers = new HashMap<DDS.DataReader, AbstractDataReader<?>>();
113  this.isBuiltin = true;
114  }
115 
116  public boolean isBuiltin() {
117  return this.isBuiltin;
118  }
119 
120  private void setListener(SubscriberListener listener, int mask) {
121  SubscriberListenerImpl wrapperListener;
122  int rc;
123 
124  if (listener != null) {
125  wrapperListener = new SubscriberListenerImpl(this.environment,
126  this, listener);
127  } else {
128  wrapperListener = null;
129  }
130  rc = this.getOld().set_listener(wrapperListener, mask);
132  "Subscriber.setListener() failed.");
133 
134  this.listener = wrapperListener;
135  }
136 
137  @Override
138  public void setListener(SubscriberListener listener) {
139  this.setListener(listener, StatusConverter.getAnyMask());
140  }
141 
142  @Override
143  public void setListener(SubscriberListener listener,
144  Collection<Class<? extends Status>> statuses) {
145  this.setListener(listener,
146  StatusConverter.convertMask(this.environment, statuses));
147  }
148 
149  @Override
150  public void setListener(SubscriberListener listener,
151  Class<? extends Status>... statuses) {
152  this.setListener(listener,
153  StatusConverter.convertMask(this.environment, statuses));
154  }
155 
156  @Override
158  DDS.SubscriberQosHolder holder = new DDS.SubscriberQosHolder();
159  int rc = this.getOld().get_qos(holder);
161  "Subscriber.getQos() failed.");
162 
163  return SubscriberQosImpl.convert(this.environment, holder.value);
164  }
165 
166  @Override
167  public void setQos(SubscriberQos qos) {
169 
170  if (qos == null) {
172  "Supplied SubscriberQos is null.");
173  }
174  try {
175  q = (SubscriberQosImpl) qos;
176  } catch (ClassCastException e) {
178  "Setting non-OpenSplice Qos not supported.");
179  }
180  int rc = this.getOld().set_qos(q.convert());
182  "Subscriber.setQos() failed.");
183 
184  }
185 
186  @Override
187  public <TYPE> DataReader<TYPE> createDataReader(TopicDescription<TYPE> topic) {
188  return this.createDataReader(topic, this.getDefaultDataReaderQos(),
189  null, new HashSet<Class<? extends Status>>());
190  }
191 
192  @Override
193  public <TYPE> DataReader<TYPE> createDataReader(
195  DataReaderListener<TYPE> listener,
196  Collection<Class<? extends Status>> statuses) {
198  AbstractTypeSupport<TYPE> typeSupport;
199 
200  if (topic == null) {
202  "Supplied Topic is null.");
203  }
204  synchronized (this.readers) {
205  try {
206  typeSupport = (AbstractTypeSupport<TYPE>) topic
207  .getTypeSupport();
208  reader = typeSupport.createDataReader(this,
209  (TopicDescriptionExt<TYPE>) topic, qos, listener,
210  statuses);
211  this.readers.put(reader.getOld(), reader);
212  } catch (ClassCastException e) {
214  "Cannot create DataReader with non-OpenSplice Topic");
215  }
216  }
217  return reader;
218  }
219 
220  @Override
221  public <TYPE> DataReader<TYPE> createDataReader(
223  DataReaderListener<TYPE> listener,
224  Class<? extends Status>... statuses) {
225  return this.createDataReader(topic, qos, listener,
226  Arrays.asList(statuses));
227  }
228 
229  @Override
230  public <TYPE> DataReader<TYPE> createDataReader(
232  return this.createDataReader(topic, qos, null,
233  new HashSet<Class<? extends Status>>());
234  }
235 
236  @Override
237  public <TYPE> DataReader<TYPE> lookupDataReader(String topicName) {
238  if (topicName == null) {
240  "Supplied topicName is null.");
241  }
242  synchronized (this.readers) {
243  for (DataReader<?> reader : this.readers.values()) {
244  if (topicName.equals(reader.getTopicDescription().getName())) {
245  try {
246  return reader.cast();
247  } catch (ClassCastException e) {
249  this.environment,
250  "Cannot cast DataReader to desired type.");
251  }
252  }
253  }
254  DDS.DataReader builtinReader = this.getOld()
255  .lookup_datareader(topicName);
256 
257  if (builtinReader != null) {
258  return this.initBuiltinReader(builtinReader);
259  }
260  }
261  return null;
262  }
263 
264  @Override
265  public <TYPE> DataReader<TYPE> lookupDataReader(
266  TopicDescription<TYPE> topicDescription) {
267  if (topicDescription == null) {
269  "Supplied topicName is null.");
270  }
271  synchronized (this.readers) {
272  for (DataReader<?> reader : this.readers.values()) {
273  if (topicDescription.equals(reader.getTopicDescription())) {
274  try {
275  return reader.cast();
276  } catch (ClassCastException e) {
278  this.environment,
279  "Cannot cast DataReader to desired type.");
280  }
281  }
282  }
283  DDS.DataReader builtinReader = this.getOld()
284  .lookup_datareader(topicDescription.getName());
285 
286  if (builtinReader != null) {
287  return this.initBuiltinReader(builtinReader, topicDescription);
288  }
289  }
290  return null;
291  }
292 
293  private <TYPE> DataReaderImpl<TYPE> initBuiltinReader(
294  DDS.DataReader oldBuiltin) {
295  DataReaderImpl<TYPE> result = null;
296 
297  if (oldBuiltin != null) {
298  DDS.TopicDescription classicTopicDescription = oldBuiltin
299  .get_topicdescription();
300 
301  if (classicTopicDescription != null) {
303  .lookupTopicDescription(
304  classicTopicDescription.get_name());
305 
306  if (td != null) {
307  result = this.initBuiltinReader(oldBuiltin, td);
308  }
309  } else {
310  throw new DDSExceptionImpl(this.environment,
311  "Classic DataReader has no TopicDescription.");
312  }
313  }
314  return result;
315  }
316 
317  private <TYPE> DataReaderImpl<TYPE> initBuiltinReader(
318  DDS.DataReader oldBuiltin, TopicDescription<TYPE> td) {
319  DataReaderImpl<TYPE> result = null;
320 
321  if (oldBuiltin != null) {
322  result = new DataReaderImpl<TYPE>(this.environment, this,
323  (TopicDescriptionExt<TYPE>) td, oldBuiltin);
324  synchronized (this.readers) {
325  this.readers.put(result.getOld(), result);
326  }
327  }
328  return result;
329  }
330 
331  public <TYPE> DataReader<TYPE> lookupDataReader(DDS.DataReader old) {
332  DataReader<TYPE> result;
333 
334  synchronized (this.readers) {
335  AbstractDataReader<?> found = this.readers.get(old);
336 
337  if (found != null) {
338  result = found.cast();
339  } else if (this.isBuiltin) {
340  result = this.initBuiltinReader(old);
341  } else {
342  result = null;
343  }
344  }
345  return result;
346  }
347 
348  @Override
349  public void closeContainedEntities() {
350  synchronized (this.readers) {
351  HashMap<DDS.DataReader, AbstractDataReader<?>> copyReaders = new HashMap<DDS.DataReader, AbstractDataReader<?>>(this.readers);
352  for (AbstractDataReader<?> reader : copyReaders.values()) {
353  try {
354  reader.close();
355  } catch (AlreadyClosedException a) {
356  /* Entity may be closed concurrently by application */
357  }
358  }
359  }
360  }
361 
362  public Collection<DataReader<?>> getDataReaders(
363  Collection<DataReader<?>> readers) {
364  DDS.DataReaderSeqHolder oldReaders = new DDS.DataReaderSeqHolder();
365 
366  synchronized (this.readers) {
367  int rc = this.getOld().get_datareaders(oldReaders,
368  DDS.ANY_SAMPLE_STATE.value, DDS.ANY_VIEW_STATE.value,
369  DDS.ANY_INSTANCE_STATE.value);
371  "Subscriber.getDataReaders() failed.");
372 
373  for (DDS.DataReader oldReader : oldReaders.value) {
374  readers.add(this.readers.get(oldReader));
375  }
376  }
377  return readers;
378  }
379 
380  @Override
381  public Collection<DataReader<?>> getDataReaders() {
382  List<DataReader<?>> readers = new ArrayList<DataReader<?>>();
383  DDS.DataReaderSeqHolder oldReaders = new DDS.DataReaderSeqHolder();
384 
385  synchronized (this.readers) {
386  int rc = this.getOld().get_datareaders(oldReaders,
387  DDS.ANY_SAMPLE_STATE.value, DDS.ANY_VIEW_STATE.value,
388  DDS.ANY_INSTANCE_STATE.value);
390  "Subscriber.getDataReaders() failed.");
391 
392  for (DDS.DataReader oldReader : oldReaders.value) {
393  readers.add(this.readers.get(oldReader));
394  }
395  }
396  return readers;
397  }
398  @Override
399  public Collection<DataReader<?>> getDataReaders(DataState dataState) {
400  if (dataState == null) {
402  "Supplied DataState is null.");
403  }
404  List<DataReader<?>> readers = new ArrayList<DataReader<?>>();
405  DDS.DataReaderSeqHolder oldReaders = new DDS.DataReaderSeqHolder();
406 
407  try {
408  DataStateImpl state = (DataStateImpl) dataState;
409 
410  synchronized (this.readers) {
411  int rc = this.getOld().get_datareaders(oldReaders,
412  state.getOldSampleState(), state.getOldViewState(),
413  state.getOldInstanceState());
415  "Subscriber.getDataReaders() failed.");
416 
417  for (DDS.DataReader oldReader : oldReaders.value) {
418  readers.add(this.readers.get(oldReader));
419  }
420  }
421  } catch (ClassCastException e) {
423  "Non-OpenSplice DataState implementation not supported.");
424  }
425  return readers;
426  }
427 
428  @Override
429  public void notifyDataReaders() {
430  int rc = this.getOld().notify_datareaders();
432  "Subscriber.notifyDataReaders() failed.");
433 
434  }
435 
436  @Override
437  public void beginAccess() {
438  int rc = this.getOld().begin_access();
440  "Subscriber.beginAccess() failed.");
441  }
442 
443  @Override
444  public void endAccess() {
445  int rc = this.getOld().end_access();
447  "Subscriber.endAccess() failed.");
448  }
449 
450  @Override
452  DDS.DataReaderQosHolder holder = new DDS.DataReaderQosHolder();
453  int rc = this.getOld().get_default_datareader_qos(holder);
455  "Subscriber.getDefaultDataReaderQos() failed.");
456  return DataReaderQosImpl.convert(this.environment, holder.value);
457  }
458 
459  @Override
461  if (qos == null) {
463  "Supplied DataReaderQoS is null.");
464  }
465  try {
466  this.getOld().set_default_datareader_qos(
467  ((DataReaderQosImpl) qos)
468  .convert());
469  } catch (ClassCastException e) {
471  "Non-OpenSplice DataReaderQos not supported.");
472  }
473  }
474 
475  @Override
477  DataReaderQosImpl result;
478 
479  if (tQos == null) {
481  "Supplied TopicQos is null.");
482  }
483  if (drQos == null) {
485  "Supplied DataReaderQos is null.");
486  }
487  try {
488  result = (DataReaderQosImpl) drQos;
489  } catch (ClassCastException e) {
491  "Non-OpenSplice DataReaderQos not supported.");
492  }
493  result.mergeTopicQos(tQos);
494 
495  return result;
496  }
497 
498  @Override
500  DDS.StatusCondition oldCondition = this.getOld().get_statuscondition();
501 
502  if (oldCondition == null) {
504  }
506  oldCondition, this);
507  }
508 
509  @Override
511  return this.parent;
512  }
513 
514  @Override
516  return new DataStateImpl(this.environment);
517  }
518 
519  @Override
520  protected void destroy() {
521  this.closeContainedEntities();
522  this.parent.destroySubscriber(this);
523  }
524 
525  public void destroyDataReader(AbstractDataReader<?> dataReader) {
526  DDS.DataReader old = dataReader.getOld();
527  old.delete_contained_entities();
528  int rc = this.getOld().delete_datareader(old);
529  synchronized (this.readers) {
530  this.readers.remove(old);
531  }
533  "DataReader.close() failed.");
534  }
535 }
Since a org.omg.dds.sub.DataReader is a kind of org.omg.dds.core.Entity, it has the ability to have a...
A DataReader allows the application (1) to declare the data it wishes to receive (i.e., make a subscription) and (2) to access the data received by the attached org.omg.dds.sub.Subscriber.
SubscriberImpl(OsplServiceEnvironment environment, DomainParticipantImpl parent, SubscriberQos qos, SubscriberListener listener, Collection< Class<? extends Status >> statuses)
A Subscriber is the object responsible for the actual reception of the data resulting from its subscr...
Definition: Subscriber.java:69
The DomainParticipant object plays several roles:
A StatusCondition object is an immutable object that specifies Condition that is associated with each...
StatusCondition< Subscriber > getStatusCondition()
static SubscriberQosImpl convert(OsplServiceEnvironment env, DDS.SubscriberQos oldQos)
org.omg.dds.domain.DomainParticipant getParent()
Collection< DataReader<?> > getDataReaders()
This operation is equivalent to calling getDataReaders(DataState) with any sample state ( Subscriber...
A DataState encapsulates sets of sample states, view states, and instance states as a convenience...
void closeContainedEntities()
This operation closes all the entities that were created by means of the "create" operations on the S...
DataReaderQos getDefaultDataReaderQos()
This operation retrieves the default value of the DataReader QoS, that is, the QoS policies which wil...
SubscriberImpl(OsplServiceEnvironment environment, DomainParticipantImpl parent, DDS.Subscriber oldSubscriber)
DataWriter allows the application to set the value of the data to be published under a given org...
Definition: DataWriter.java:86
void beginAccess()
This operation indicates that the application is about to access the data samples in any of the org...
void setListener(SubscriberListener listener, Class<? extends Status >... statuses)
void setDefaultDataReaderQos(DataReaderQos qos)
This operation sets a default value of the DataReader QoS policies, which will be used for newly crea...
static Set< Class<? extends Status > > convertMask(OsplServiceEnvironment environment, int state)
void notifyDataReaders()
This operation invokes the operation org.omg.dds.sub.DataReaderListener#onDataAvailable(org.omg.dds.core.event.DataAvailableEvent) on the DataReaderListener objects attached to contained DataReader entities with a org.omg.dds.core.event.DataAvailableEvent that is considered changed.
Since a org.omg.dds.sub.Subscriber is a kind of org.omg.dds.core.Entity, it has the ability to have a...
static void checkReturnCode(int retCode, OsplServiceEnvironment environment, String message)
Definition: Utilities.java:33
void endAccess()
Indicates that the application has finished accessing the data samples in org.omg.dds.sub.DataReader objects managed by the Subscriber.
void destroyDataReader(AbstractDataReader<?> dataReader)
void setListener(SubscriberListener listener, Collection< Class<? extends Status >> statuses)
The target object was previously closed and therefore cannot process the operation.
Collection< DataReader<?> > getDataReaders(Collection< DataReader<?>> readers)
void setListener(SubscriberListener listener)
String getName()
Returns the name used to create the TopicDescription.
DataReaderQos copyFromTopicQos(DataReaderQos drQos, TopicQos tQos)
This operation copies the policies in the org.omg.dds.topic.TopicQos to the corresponding policies in...
static void throwLastErrorException(OsplServiceEnvironment environment)
Definition: Utilities.java:182
This interface is the base for org.omg.dds.topic.Topic, org.omg.dds.topic.ContentFilteredTopic, and org.omg.dds.topic.MultiTopic.
Collection< DataReader<?> > getDataReaders(DataState dataState)
This operation allows the application to access the org.omg.dds.sub.DataReader objects that contain s...
Status is the abstract root class for all communication status objects.
Definition: Status.java:41
abstract AbstractDataReader< TYPE > createDataReader(SubscriberImpl subscriber, TopicDescriptionExt< TYPE > topicDescription, DataReaderQos qos, DataReaderListener< TYPE > listener, Collection< Class<? extends Status >> statuses)
DataState createDataState()
Create and return a new modifiable DataState object.