OpenSplice Java 5 DCPS  v6.x
OpenSplice Java 5 OpenSplice Data Distribution Service Data-Centric Publish-Subscribe API
PublisherImpl.java
Go to the documentation of this file.
1 /*
2  * Vortex OpenSplice
3  *
4  * This software and documentation are Copyright 2006 to 2024 ADLINK
5  * Technology Limited, its affiliated companies and licensors. All rights
6  * reserved.
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 package org.opensplice.dds.pub;
22 
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 
31 import org.omg.dds.core.Duration;
33 import org.omg.dds.core.status.Status;
34 import org.omg.dds.pub.DataWriter;
36 import org.omg.dds.pub.DataWriterQos;
37 import org.omg.dds.pub.Publisher;
39 import org.omg.dds.pub.PublisherQos;
40 import org.omg.dds.topic.Topic;
41 import org.omg.dds.topic.TopicQos;
54 
55 public class PublisherImpl
56 extends
57 DomainEntityImpl<DDS.Publisher, DomainParticipantImpl, DDS.DomainParticipant, PublisherQos, PublisherListener, PublisherListenerImpl>
58 implements Publisher {
59  private final HashMap<DDS.DataWriter, DataWriter<?>> writers;
60 
64  Collection<Class<? extends Status>> statuses) {
65  super(environment, parent, parent.getOld());
66  DDS.PublisherQos oldQos;
67 
68  if (qos == null) {
69  throw new IllegalArgumentExceptionImpl(this.environment,
70  "Supplied PublisherQos is null.");
71  }
72 
73  try {
74  oldQos = ((PublisherQosImpl) qos).convert();
75  } catch (ClassCastException e) {
76  throw new IllegalArgumentExceptionImpl(this.environment,
77  "Cannot create Publisher with non-OpenSplice qos");
78  }
79 
80  if (listener != null) {
81  this.listener = new PublisherListenerImpl(this.environment, this,
82  listener, true);
83  } else {
84  this.listener = null;
85  }
86  DDS.Publisher old = this.parent.getOld().create_publisher(oldQos,
87  this.listener,
88  StatusConverter.convertMask(this.environment, statuses));
89 
90  if (old == null) {
91  Utilities.throwLastErrorException(this.environment);
92  }
93  this.setOld(old);
94  this.writers = new HashMap<DDS.DataWriter, DataWriter<?>>();
95 
96  if (this.listener != null) {
97  this.listener.setInitialised();
98  }
99  }
100 
101  private void setListener(PublisherListener listener, int mask) {
102  PublisherListenerImpl wrapperListener;
103  int rc;
104 
105  if (listener != null) {
106  wrapperListener = new PublisherListenerImpl(this.environment, this,
107  listener);
108  } else {
109  wrapperListener = null;
110  }
111  rc = this.getOld().set_listener(wrapperListener, mask);
113  "Publisher.setListener() failed.");
114 
115  this.listener = wrapperListener;
116  }
117 
118  @Override
119  public void setListener(PublisherListener listener) {
120  this.setListener(listener, StatusConverter.getAnyMask());
121  }
122 
123  @Override
124  public void setListener(PublisherListener listener,
125  Collection<Class<? extends Status>> statuses) {
126  this.setListener(listener,
127  StatusConverter.convertMask(this.environment, statuses));
128  }
129 
130  @Override
131  public void setListener(PublisherListener listener,
132  Class<? extends Status>... statuses) {
133  this.setListener(listener,
134  StatusConverter.convertMask(this.environment, statuses));
135  }
136 
137  @Override
138  public PublisherQos getQos() {
139  DDS.PublisherQosHolder holder = new DDS.PublisherQosHolder();
140  int rc = this.getOld().get_qos(holder);
142  "Publisher.getQos() failed.");
143 
144  return PublisherQosImpl.convert(this.environment, holder.value);
145  }
146 
147  @Override
148  public void setQos(PublisherQos qos) {
150 
151  if (qos == null) {
153  "Supplied PublisherQos is null.");
154  }
155  try {
156  q = (PublisherQosImpl) qos;
157  } catch (ClassCastException e) {
159  "Setting non-OpenSplice Qos not supported.");
160  }
161  int rc = this.getOld().set_qos(q.convert());
163  "Publisher.setQos() failed.");
164 
165  }
166 
167  @Override
168  public <TYPE> DataWriter<TYPE> createDataWriter(Topic<TYPE> topic) {
169  return this.createDataWriter(topic, this.getDefaultDataWriterQos(),
170  null, new HashSet<Class<? extends Status>>());
171  }
172 
173  @Override
174  public <TYPE> DataWriter<TYPE> createDataWriter(Topic<TYPE> topic,
176  Collection<Class<? extends Status>> statuses) {
178  AbstractTypeSupport<TYPE> typeSupport;
179 
180  if (topic == null) {
182  "Supplied Topic is null.");
183  }
184  synchronized(this.writers){
185  try {
186  typeSupport = (AbstractTypeSupport<TYPE>) topic.getTypeSupport();
187  writer = typeSupport.createDataWriter(this,
188  (TopicImpl<TYPE>) topic, qos, listener, statuses);
189  } catch (ClassCastException e) {
191  "Cannot create DataWriter with non-OpenSplice Topic");
192  }
193  this.writers.put(writer.getOld(), writer);
194  }
195  return writer;
196  }
197 
198  @Override
199  public <TYPE> DataWriter<TYPE> createDataWriter(Topic<TYPE> topic,
201  Class<? extends Status>... statuses) {
202  return createDataWriter(topic, qos, listener, Arrays.asList(statuses));
203  }
204 
205  @Override
206  public <TYPE> DataWriter<TYPE> createDataWriter(Topic<TYPE> topic,
207  DataWriterQos qos) {
208  return this.createDataWriter(topic, qos, null,
209  new HashSet<Class<? extends Status>>());
210  }
211 
212  public <TYPE> DataWriter<TYPE> lookupDataWriter(DDS.DataWriter writer) {
213  DataWriter<?> dw;
214 
215  synchronized (this.writers) {
216  dw = this.writers.get(writer);
217  }
218  if (dw != null) {
219  return dw.cast();
220  }
221  return null;
222  }
223 
224  @SuppressWarnings("unchecked")
225  @Override
226  public <TYPE> DataWriter<TYPE> lookupDataWriter(String topicName) {
227  if (topicName == null) {
229  "Supplied topicName is null.");
230  }
231  synchronized (this.writers) {
232  for (DataWriter<?> writer : this.writers.values()) {
233  if (topicName.equals(writer.getTopic().getName())) {
234  try {
235  return (DataWriter<TYPE>) writer;
236  } catch (ClassCastException e) {
238  this.environment,
239  "Cannot cast DataWriter to desired type.");
240  }
241  }
242  }
243  }
244  return null;
245  }
246 
247  @SuppressWarnings("unchecked")
248  @Override
249  public <TYPE> DataWriter<TYPE> lookupDataWriter(Topic<TYPE> topic) {
250  if (topic == null) {
252  "Supplied Topic is null.");
253  }
254  synchronized (this.writers) {
255  for (DataWriter<?> writer : this.writers.values()) {
256  if (topic.equals(writer.getTopic())) {
257  try {
258  return (DataWriter<TYPE>) writer;
259  } catch (ClassCastException e) {
261  this.environment,
262  "Cannot cast DataWriter to desired type.");
263  }
264  }
265  }
266  }
267  return null;
268  }
269 
270  @Override
271  public void closeContainedEntities() {
272  synchronized(this.writers){
273  HashMap<DDS.DataWriter, DataWriter<?>> copyWriter = new HashMap<DDS.DataWriter, DataWriter<?>>(this.writers);
274  for (DataWriter<?> writer : copyWriter.values()) {
275  try {
276  writer.close();
277  } catch (AlreadyClosedException a) {
278  /* Entity may be closed concurrently by application */
279  }
280  }
281  }
282  }
283 
284  @Override
285  public void suspendPublications() {
286  int rc = this.getOld().suspend_publications();
288  "Publisher.suspendPublications() failed.");
289 
290  }
291 
292  @Override
293  public void resumePublications() {
294  int rc = this.getOld().resume_publications();
296  "Publisher.resumePublications() failed.");
297  }
298 
299  @Override
300  public void beginCoherentChanges() {
301  int rc = this.getOld().begin_coherent_changes();
303  "Publisher.beginCoherentChanges() failed.");
304  }
305 
306  @Override
307  public void endCoherentChanges() {
308  int rc = this.getOld().end_coherent_changes();
310  "Publisher.endCoherentChanges() failed.");
311  }
312 
313  @Override
314  public void waitForAcknowledgments(Duration maxWait)
315  throws TimeoutException {
316  int rc = this.getOld().wait_for_acknowledgments(
317  Utilities.convert(this.environment,
318  maxWait));
320  "Publisher.waitForAcknowledgments() failed.");
321  }
322 
323  @Override
324  public void waitForAcknowledgments(long maxWait, TimeUnit unit)
325  throws TimeoutException {
327  maxWait, unit));
328  }
329 
330  @Override
332  DDS.DataWriterQosHolder holder = new DDS.DataWriterQosHolder();
333  int rc = this.getOld().get_default_datawriter_qos(holder);
335  "Publisher.getDefaultDataWriterQos() failed.");
336 
337  return DataWriterQosImpl.convert(this.environment, holder.value);
338  }
339 
340  @Override
342  if (qos == null) {
344  "Supplied DataWriterQos is null.");
345  }
346  try {
347  this.getOld().set_default_datawriter_qos(
348  ((DataWriterQosImpl) qos)
349  .convert());
350  } catch (ClassCastException e) {
352  "Non-OpenSplice DataWriterQos not supported.");
353  }
354 
355  }
356 
357  @Override
359  DataWriterQosImpl result;
360 
361  if (tQos == null) {
363  "Supplied TopicQos is null.");
364  }
365  if (dwQos == null) {
367  "Supplied DataWriterQos is null.");
368  }
369  try {
370  result = (DataWriterQosImpl) dwQos;
371  } catch (ClassCastException e) {
373  "Non-OpenSplice DataWriterQos not supported.");
374  }
375  result.mergeTopicQos(tQos);
376 
377  return result;
378  }
379 
380  @Override
382  DDS.StatusCondition oldCondition = this.getOld().get_statuscondition();
383 
384  if (oldCondition == null) {
386  }
388  oldCondition, this);
389  }
390 
391  @Override
393  return this.parent;
394  }
395 
396  @Override
397  protected void destroy() {
398  this.closeContainedEntities();
399  this.parent.destroyPublisher(this);
400  }
401 
402  public void destroyDataWriter(
404  DDS.DataWriter old = dataWriter.getOld();
405  int rc = this.getOld().delete_datawriter(old);
406  synchronized(this.writers){
407  this.writers.remove(old);
408  }
410  "DataWriter.close() failed.");
411  }
412 }
static PublisherQosImpl convert(OsplServiceEnvironment env, DDS.PublisherQos oldQos)
void destroyDataWriter(EntityImpl< DDS.DataWriter, ?, ?, ?, ?> dataWriter)
abstract void close()
Halt communication and dispose the resources held by this Entity.
static void checkReturnCodeWithTimeout(int retCode, OsplServiceEnvironment environment, String message)
Definition: Utilities.java:176
The DomainParticipant object plays several roles:
A StatusCondition object is an immutable object that specifies Condition that is associated with each...
void waitForAcknowledgments(long maxWait, TimeUnit unit)
This operation blocks the calling thread until either all data written by the reliable org...
void setListener(PublisherListener listener, Collection< Class<? extends Status >> statuses)
public< OTHER > DataWriter< OTHER > cast()
Cast this data writer to the given type, or throw an exception if the cast fails. ...
void endCoherentChanges()
This operation terminates the &#39;coherent set&#39; initiated by the matching call to beginCoherentChanges()...
static DDS.Duration_t convert(OsplServiceEnvironment environment, Duration d)
Definition: Utilities.java:232
org.omg.dds.domain.DomainParticipant getParent()
DataWriter allows the application to set the value of the data to be published under a given org...
Definition: DataWriter.java:86
void beginCoherentChanges()
This operation requests that the application will begin a &#39;coherent set&#39; of modifications using org...
static DataWriterQosImpl convert(OsplServiceEnvironment env, DDS.DataWriterQos oldQos)
void waitForAcknowledgments(Duration maxWait)
This operation blocks the calling thread until either all data written by the reliable org...
static Set< Class<? extends Status > > convertMask(OsplServiceEnvironment environment, int state)
void setListener(PublisherListener listener, Class<? extends Status >... statuses)
StatusCondition< Publisher > getStatusCondition()
abstract AbstractDataWriter< TYPE > createDataWriter(PublisherImpl publisher, AbstractTopic< TYPE > topic, DataWriterQos qos, DataWriterListener< TYPE > listener, Collection< Class<? extends Status >> statuses)
void closeContainedEntities()
This operation closes all the entities that were created by means of the "create" operations on the P...
static void checkReturnCode(int retCode, OsplServiceEnvironment environment, String message)
Definition: Utilities.java:33
DataWriterQos getDefaultDataWriterQos()
This operation retrieves the default value of the DataWriter QoS, that is, the QoS policies which wil...
TypeSupport< TYPE > getTypeSupport()
Returns the org.omg.dds.type.TypeSupport used to create this TopicDescription.
Since a org.omg.dds.pub.DataWriter is a kind of org.omg.dds.core.Entity, it has the ability to have a...
The target object was previously closed and therefore cannot process the operation.
void setListener(PublisherListener listener)
PublisherImpl(OsplServiceEnvironment environment, DomainParticipantImpl parent, PublisherQos qos, PublisherListener listener, Collection< Class<? extends Status >> statuses)
DataWriterQos copyFromTopicQos(DataWriterQos dwQos, TopicQos tQos)
This operation copies the policies in the org.omg.dds.topic.Topic QoS to the corresponding policies i...
A Publisher is the object responsible for the actual dissemination of publications.
Definition: Publisher.java:71
Duration newDuration(long duration, TimeUnit unit)
Construct a org.omg.dds.core.Duration of the given magnitude.
A span of elapsed time expressed with nanosecond precision.
Definition: Duration.java:35
void resumePublications()
This operation indicates to the Service that the application has completed the multiple changes initi...
void suspendPublications()
This operation indicates to the Service that the application is about to make multiple modifications ...
static void throwLastErrorException(OsplServiceEnvironment environment)
Definition: Utilities.java:182
Topic is the most basic description of the data to be published and subscribed.
Definition: Topic.java:55
Since a org.omg.dds.pub.Publisher is a kind of org.omg.dds.core.Entity, it has the ability to have a ...
void setDefaultDataWriterQos(DataWriterQos qos)
This operation sets a default value of the DataWriter QoS policies, which will be used for newly crea...
Status is the abstract root class for all communication status objects.
Definition: Status.java:41