OpenSplice Java 5 DCPS  v6.x
OpenSplice Java 5 OpenSplice Data Distribution Service Data-Centric Publish-Subscribe API
DataWriterProtobuf.java
Go to the documentation of this file.
1 /*
2  * Vortex OpenSplice
3  *
4  * This software and documentation are Copyright 2006 to 2024 ADLINK
5  * Technology Limited, its affiliated companies and licensors. All rights
6  * reserved.
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 package org.opensplice.dds.pub;
22 
23 import java.util.Collection;
24 import java.util.Set;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 
28 import org.omg.dds.core.Duration;
32 import org.omg.dds.core.Time;
37 import org.omg.dds.core.status.Status;
38 import org.omg.dds.pub.DataWriter;
40 import org.omg.dds.pub.DataWriterQos;
42 import org.omg.dds.topic.Topic;
52 
53 public class DataWriterProtobuf<PROTOBUF_TYPE, DDS_TYPE> extends
54  AbstractDataWriter<PROTOBUF_TYPE> {
55  private final TopicProtobuf<PROTOBUF_TYPE> topic;
56  private final ReflectionDataWriter<DDS_TYPE> reflectionWriter;
58 
59  @SuppressWarnings("unchecked")
61  PublisherImpl parent, TopicProtobuf<PROTOBUF_TYPE> topic,
62  DataWriterQos qos, DataWriterListener<PROTOBUF_TYPE> listener,
63  Collection<Class<? extends Status>> statuses) {
64  super(environment, parent);
65 
66  if (qos == null) {
67  throw new IllegalArgumentExceptionImpl(this.environment,
68  "Supplied DataWriterQos is null.");
69  }
70  if (topic == null) {
71  throw new IllegalArgumentExceptionImpl(this.environment,
72  "Supplied Topic is null.");
73  }
74  DDS.DataWriterQos oldQos;
75 
76  this.topic = topic;
77  this.typeSupport = (TypeSupportProtobuf<PROTOBUF_TYPE, DDS_TYPE>) topic
78  .getTypeSupport();
79 
80  try {
81  oldQos = ((DataWriterQosImpl) qos).convert();
82  } catch (ClassCastException e) {
83  throw new IllegalArgumentExceptionImpl(this.environment,
84  "Cannot create DataWriter with non-OpenSplice qos");
85  }
86 
87  if (listener != null) {
88  this.listener = new DataWriterListenerImpl<PROTOBUF_TYPE>(
89  this.environment, this, listener, true);
90  } else {
91  this.listener = null;
92  }
93  DDS.DataWriter old = this.parent.getOld().create_datawriter(
94  topic.getOld(), oldQos, this.listener,
95  StatusConverter.convertMask(this.environment, statuses));
96 
97  if (old == null) {
98  Utilities.throwLastErrorException(this.environment);
99  }
100  this.setOld(old);
101  this.reflectionWriter = new ReflectionDataWriter<DDS_TYPE>(
102  this.environment, this.getOld(), this.typeSupport
104  this.topic.retain();
105 
106  if (this.listener != null) {
107  this.listener.setInitialised();
108  }
109  }
110 
111  @Override
112  public void dispose(InstanceHandle instanceHandle) throws TimeoutException {
113  this.reflectionWriter.dispose(instanceHandle);
114  }
115 
116  @Override
117  public void assertLiveliness() {
118  this.reflectionWriter.assertLiveliness();
119  }
120 
121  @SuppressWarnings("unchecked")
122  @Override
123  public <OTHER> org.omg.dds.pub.DataWriter<OTHER> cast() {
124  DataWriter<OTHER> other;
125  try {
126  other = (DataWriter<OTHER>) this;
127  } catch (ClassCastException cce) {
128  throw new IllegalOperationExceptionImpl(this.environment,
129  "Unable to perform requested cast.");
130  }
131  return other;
132 
133  }
134 
135  @Override
136  public void dispose(InstanceHandle instanceHandle,
137  PROTOBUF_TYPE instanceData) throws TimeoutException {
138  this.reflectionWriter.dispose(instanceHandle,
139  this.typeSupport.protobufToDds(instanceData));
140  }
141 
142  @Override
143  public void dispose(InstanceHandle instanceHandle,
144  PROTOBUF_TYPE instanceData, Time sourceTimestamp)
145  throws TimeoutException {
146  this.reflectionWriter.dispose(instanceHandle,
147  this.typeSupport.protobufToDds(instanceData), sourceTimestamp);
148 
149  }
150 
151  @Override
152  public void dispose(InstanceHandle instanceHandle,
153  PROTOBUF_TYPE instanceData, long sourceTimeStamp, TimeUnit unit)
154  throws TimeoutException {
155  this.reflectionWriter.dispose(instanceHandle,
156  this.typeSupport.protobufToDds(instanceData), sourceTimeStamp,
157  unit);
158  }
159 
160  @Override
161  public PROTOBUF_TYPE getKeyValue(InstanceHandle instanceHandle) {
162  DDS_TYPE keyValue = this.reflectionWriter.getKeyValue(instanceHandle);
163 
164  if (keyValue != null) {
165  return this.typeSupport.ddsKeyToProtobuf(keyValue);
166  }
167  return null;
168  }
169 
170  @Override
171  public PROTOBUF_TYPE getKeyValue(PROTOBUF_TYPE instanceData,
172  InstanceHandle instanceHandle) {
173  DDS_TYPE keyValue = this.reflectionWriter.getKeyValue(
174  this.typeSupport.protobufToDds(instanceData), instanceHandle);
175 
176  if (keyValue != null) {
177  return this.typeSupport.ddsKeyToProtobuf(keyValue);
178  }
179  return null;
180  }
181 
182  @Override
184  return this.reflectionWriter.getLivelinessLostStatus();
185  }
186 
187  @Override
189  InstanceHandle instanceHandle) {
190  return this.reflectionWriter.getMatchedSubscriptionData(instanceHandle);
191  }
192 
193  @Override
194  public Set<InstanceHandle> getMatchedSubscriptions() {
195  return this.reflectionWriter.getMatchedSubscriptions();
196  }
197 
198  @Override
200  return this.reflectionWriter.getOfferedDeadlineMissedStatus();
201  }
202 
203  @Override
205  return this.reflectionWriter.getOfferedIncompatibleQosStatus();
206  }
207 
208  @Override
210  return this.reflectionWriter.getPublicationMatchedStatus();
211  }
212 
213  @Override
215  DDS.StatusCondition oldCondition = this.getOld().get_statuscondition();
216 
217  if (oldCondition == null) {
218  Utilities.throwLastErrorException(this.environment);
219  }
221  this.environment, oldCondition, this);
222  }
223 
224  @Override
226  return this.topic;
227  }
228 
229  @Override
230  public InstanceHandle lookupInstance(PROTOBUF_TYPE instanceData) {
231  return this.reflectionWriter.lookupInstance(this.typeSupport
232  .protobufToDds(instanceData));
233  }
234 
235  @Override
236  public InstanceHandle registerInstance(PROTOBUF_TYPE instanceData)
237  throws TimeoutException {
238  return this.reflectionWriter.registerInstance(this.typeSupport
239  .protobufToDds(instanceData));
240  }
241 
242  @Override
243  public InstanceHandle registerInstance(PROTOBUF_TYPE instanceData,
244  Time sourceTimestamp) throws TimeoutException {
245  return this.reflectionWriter.registerInstance(
246  this.typeSupport.protobufToDds(instanceData), sourceTimestamp);
247  }
248 
249  @Override
250  public InstanceHandle registerInstance(PROTOBUF_TYPE instanceData,
251  long sourceTimestamp, TimeUnit unit) throws TimeoutException {
252  return this.reflectionWriter.registerInstance(
253  this.typeSupport.protobufToDds(instanceData), sourceTimestamp,
254  unit);
255  }
256 
257  @Override
258  public void unregisterInstance(InstanceHandle handle)
259  throws TimeoutException {
260  this.reflectionWriter.unregisterInstance(handle);
261 
262  }
263 
264  @Override
265  public void unregisterInstance(InstanceHandle handle,
266  PROTOBUF_TYPE instanceData) throws TimeoutException {
267  this.reflectionWriter.unregisterInstance(handle,
268  this.typeSupport.protobufToDds(instanceData));
269  }
270 
271  @Override
272  public void unregisterInstance(InstanceHandle handle,
273  PROTOBUF_TYPE instanceData, Time sourceTimestamp)
274  throws TimeoutException {
275  this.reflectionWriter.unregisterInstance(handle,
276  this.typeSupport.protobufToDds(instanceData), sourceTimestamp);
277 
278  }
279 
280  @Override
281  public void unregisterInstance(InstanceHandle handle,
282  PROTOBUF_TYPE instanceData, long sourceTimestamp, TimeUnit unit)
283  throws TimeoutException {
284  this.reflectionWriter.unregisterInstance(handle,
285  this.typeSupport.protobufToDds(instanceData), sourceTimestamp,
286  unit);
287 
288  }
289 
290  @Override
291  public void waitForAcknowledgments(Duration maxWait)
292  throws TimeoutException {
293  this.reflectionWriter.waitForAcknowledgments(maxWait);
294  }
295 
296  @Override
297  public void waitForAcknowledgments(long maxWait, TimeUnit unit)
298  throws TimeoutException {
299  this.reflectionWriter.waitForAcknowledgments(maxWait, unit);
300  }
301 
302  @Override
303  public void write(PROTOBUF_TYPE instanceData) throws TimeoutException {
304  this.reflectionWriter.write(this.typeSupport
305  .protobufToDds(instanceData));
306  }
307 
308  @Override
309  public void write(PROTOBUF_TYPE instanceData, Time sourceTimestamp)
310  throws TimeoutException {
311  this.reflectionWriter.write(
312  this.typeSupport.protobufToDds(instanceData), sourceTimestamp);
313  }
314 
315  @Override
316  public void write(PROTOBUF_TYPE instanceData, InstanceHandle handle)
317  throws TimeoutException {
318  this.reflectionWriter.write(
319  this.typeSupport.protobufToDds(instanceData), handle);
320 
321  }
322 
323  @Override
324  public void write(PROTOBUF_TYPE instanceData, long sourceTimestamp,
325  TimeUnit unit) throws TimeoutException {
326  this.reflectionWriter.write(
327  this.typeSupport.protobufToDds(instanceData), sourceTimestamp,
328  unit);
329  }
330 
331  @Override
332  public void write(PROTOBUF_TYPE instanceData, InstanceHandle handle,
333  Time sourceTimestamp) throws TimeoutException {
334  this.reflectionWriter.write(
335  this.typeSupport.protobufToDds(instanceData), handle,
336  sourceTimestamp);
337  }
338 
339  @Override
340  public void write(PROTOBUF_TYPE instanceData, InstanceHandle handle,
341  long sourceTimestamp, TimeUnit unit) throws TimeoutException {
342  this.reflectionWriter.write(
343  this.typeSupport.protobufToDds(instanceData), handle,
344  sourceTimestamp, unit);
345  }
346 
347  @Override
349  return this.reflectionWriter.getQos();
350  }
351 
352  private void setListener(DataWriterListener<PROTOBUF_TYPE> listener,
353  int mask) {
355  int rc;
356 
357  if (listener != null) {
358  wrapperListener = new DataWriterListenerImpl<PROTOBUF_TYPE>(
359  this.environment, this, listener);
360  } else {
361  wrapperListener = null;
362  }
363  rc = this.getOld().set_listener(wrapperListener, mask);
364  Utilities.checkReturnCode(rc, this.environment,
365  "DataWriter.setListener() failed.");
366 
367  this.listener = wrapperListener;
368  }
369 
370  @Override
372  this.setListener(listener, StatusConverter.getAnyMask());
373  }
374 
375  @Override
377  Collection<Class<? extends Status>> statuses) {
378  this.setListener(listener,
379  StatusConverter.convertMask(this.environment, statuses));
380  }
381 
382  @Override
384  Class<? extends Status>... statuses) {
385  this.setListener(listener,
386  StatusConverter.convertMask(this.environment, statuses));
387  }
388 
389  @Override
390  public void setQos(DataWriterQos qos) {
391  this.reflectionWriter.setQos(qos);
392  }
393 
394  @Override
396  return this.environment;
397  }
398 
399  @Override
400  public void writeDispose(PROTOBUF_TYPE instanceData)
401  throws TimeoutException {
402  this.reflectionWriter.writeDispose(this.typeSupport
403  .protobufToDds(instanceData));
404  }
405 
406  @Override
407  public void writeDispose(PROTOBUF_TYPE instanceData, Time sourceTimestamp)
408  throws TimeoutException {
409  this.reflectionWriter.writeDispose(
410  this.typeSupport.protobufToDds(instanceData), sourceTimestamp);
411  }
412 
413  @Override
414  public void writeDispose(PROTOBUF_TYPE instanceData, InstanceHandle handle)
415  throws TimeoutException {
416  this.reflectionWriter.writeDispose(
417  this.typeSupport.protobufToDds(instanceData), handle);
418  }
419 
420  @Override
421  public void writeDispose(PROTOBUF_TYPE instanceData, long sourceTimestamp,
422  TimeUnit unit) throws TimeoutException {
423  this.reflectionWriter.writeDispose(
424  this.typeSupport.protobufToDds(instanceData), sourceTimestamp,
425  unit);
426  }
427 
428  @Override
429  public void writeDispose(PROTOBUF_TYPE instanceData, InstanceHandle handle,
430  Time sourceTimestamp) throws TimeoutException {
431  this.reflectionWriter.writeDispose(
432  this.typeSupport.protobufToDds(instanceData), handle,
433  sourceTimestamp);
434 
435  }
436 
437  @Override
438  public void writeDispose(PROTOBUF_TYPE instanceData,
439  InstanceHandle instanceHandle, long sourceTimestamp, TimeUnit unit)
440  throws TimeoutException {
441  this.reflectionWriter.writeDispose(
442  this.typeSupport.protobufToDds(instanceData), sourceTimestamp,
443  unit);
444  }
445 
446  @Override
447  protected void destroy() {
448  super.destroy();
449  this.topic.close();
450  }
451 }
OfferedDeadlineMissedStatus getOfferedDeadlineMissedStatus()
void unregisterInstance(InstanceHandle handle)
PROTOBUF_TYPE getKeyValue(PROTOBUF_TYPE instanceData, InstanceHandle instanceHandle)
This class contains the statistics about the discovered number of org.omg.dds.sub.DataReaders that are compatible with the org.omg.dds.pub.DataWriter to which the Status is attached.
void writeDispose(PROTOBUF_TYPE instanceData)
PublicationMatchedStatus getPublicationMatchedStatus()
void writeDispose(PROTOBUF_TYPE instanceData, InstanceHandle handle)
A StatusCondition object is an immutable object that specifies Condition that is associated with each...
OfferedIncompatibleQosStatus getOfferedIncompatibleQosStatus()
void unregisterInstance(InstanceHandle handle, PROTOBUF_TYPE instanceData, long sourceTimestamp, TimeUnit unit)
void writeDispose(PROTOBUF_TYPE instanceData, long sourceTimestamp, TimeUnit unit)
void writeDispose(PROTOBUF_TYPE instanceData, Time sourceTimestamp)
final TypeSupportProtobuf< PROTOBUF_TYPE, DDS_TYPE > typeSupport
InstanceHandle registerInstance(PROTOBUF_TYPE instanceData)
void writeDispose(PROTOBUF_TYPE instanceData, InstanceHandle instanceHandle, long sourceTimestamp, TimeUnit unit)
void write(PROTOBUF_TYPE instanceData, InstanceHandle handle)
void write(PROTOBUF_TYPE instanceData, InstanceHandle handle, Time sourceTimestamp)
InstanceHandle lookupInstance(PROTOBUF_TYPE instanceData)
void setListener(DataWriterListener< PROTOBUF_TYPE > listener, Collection< Class<? extends Status >> statuses)
OfferedIncompatibleQosStatus getOfferedIncompatibleQosStatus()
void waitForAcknowledgments(long maxWait, TimeUnit unit)
InstanceHandle registerInstance(PROTOBUF_TYPE instanceData, Time sourceTimestamp)
TypeSupportImpl< DDS_TYPE > getTypeSupportStandard()
DataWriter allows the application to set the value of the data to be published under a given org...
Definition: DataWriter.java:86
The DCPSSubscription topic communicates the existence of datareaders by means of the SubscriptionBuil...
static Set< Class<? extends Status > > convertMask(OsplServiceEnvironment environment, int state)
The liveliness that the org.omg.dds.pub.DataWriter has committed through its org.omg.dds.core.policy.Liveliness was not respected; thus org.omg.dds.sub.DataReader entities will consider the DataWriter as no longer "active.".
void dispose(InstanceHandle instanceHandle, PROTOBUF_TYPE instanceData, long sourceTimeStamp, TimeUnit unit)
An opaque handle that can be used to refer to a local or remote entity.
SubscriptionBuiltinTopicData getMatchedSubscriptionData(InstanceHandle instanceHandle)
static void checkReturnCode(int retCode, OsplServiceEnvironment environment, String message)
Definition: Utilities.java:33
StatusCondition< org.omg.dds.pub.DataWriter< PROTOBUF_TYPE > > getStatusCondition()
void writeDispose(PROTOBUF_TYPE instanceData, InstanceHandle handle, Time sourceTimestamp)
Since a org.omg.dds.pub.DataWriter is a kind of org.omg.dds.core.Entity, it has the ability to have a...
abstract PROTOBUF_TYPE ddsKeyToProtobuf(DDS_TYPE ddsData)
abstract DDS_TYPE protobufToDds(PROTOBUF_TYPE protobufData)
void write(PROTOBUF_TYPE instanceData)
void unregisterInstance(InstanceHandle handle, PROTOBUF_TYPE instanceData)
A org.omg.dds.core.policy.QosPolicy value was incompatible with what was requested.
SubscriptionBuiltinTopicData getMatchedSubscriptionData(InstanceHandle subscriptionHandle)
PROTOBUF_TYPE getKeyValue(InstanceHandle instanceHandle)
A span of elapsed time expressed with nanosecond precision.
Definition: Duration.java:35
InstanceHandle registerInstance(PROTOBUF_TYPE instanceData, long sourceTimestamp, TimeUnit unit)
void write(PROTOBUF_TYPE instanceData, InstanceHandle handle, long sourceTimestamp, TimeUnit unit)
OfferedDeadlineMissedStatus getOfferedDeadlineMissedStatus()
void dispose(InstanceHandle instanceHandle)
DDS implementations are rooted in this class, a concrete subclass of which can be instantiated based ...
void write(PROTOBUF_TYPE instanceData, long sourceTimestamp, TimeUnit unit)
void dispose(InstanceHandle instanceHandle, PROTOBUF_TYPE instanceData)
static void throwLastErrorException(OsplServiceEnvironment environment)
Definition: Utilities.java:182
Topic is the most basic description of the data to be published and subscribed.
Definition: Topic.java:55
void unregisterInstance(InstanceHandle handle, PROTOBUF_TYPE instanceData, Time sourceTimestamp)
The deadline that the org.omg.dds.pub.DataWriter has committed through its org.omg.dds.core.policy.Deadline was not respected for a specific instance.
void write(PROTOBUF_TYPE instanceData, Time sourceTimestamp)
void setListener(DataWriterListener< PROTOBUF_TYPE > listener)
A moment in time expressed with nanosecond precision (though not necessarily nanosecond accuracy)...
Definition: Time.java:34
void dispose(InstanceHandle instanceHandle, PROTOBUF_TYPE instanceData, Time sourceTimestamp)
Status is the abstract root class for all communication status objects.
Definition: Status.java:41
void setListener(DataWriterListener< PROTOBUF_TYPE > listener, Class<? extends Status >... statuses)