OpenSplice Java FACE API  v6.x
OpenSplice Future Airborne Capability Environment (FACE) Java API
SourceConnection.java
Go to the documentation of this file.
1 /*
2  * Vortex OpenSplice
3  *
4  * This software and documentation are Copyright 2006 to 2021 ADLINK
5  * Technology Limited, its affiliated companies and licensors. All rights
6  * reserved.
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 package org.vortex.FACE;
21 
22 import java.util.Set;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicLong;
26 import java.util.logging.Level;
27 
28 import org.omg.dds.core.AlreadyClosedException;
29 import org.omg.dds.core.policy.PolicyFactory;
30 import org.omg.dds.core.policy.Reliability;
31 import org.omg.dds.pub.DataWriter;
32 import org.omg.dds.pub.DataWriterQos;
33 import org.omg.dds.pub.Publisher;
34 import org.omg.dds.pub.PublisherQos;
35 
36 import FACE.CONNECTION_DIRECTION_TYPE;
37 import FACE.RETURN_CODE_TYPE;
38 import FACE.VALIDITY_TYPE;
39 
40 public class SourceConnection<TYPE> extends Connection<TYPE> {
41  private Publisher publisher = null;
42  private DataWriter<TYPE> dataWriter = null;
43  private final AtomicLong transactionid;
44 
46  Class<TYPE> dataType) {
47  super(description,dataType);
48  this.transactionid = new AtomicLong(0);
49  this.setupPublisher();
50  this.setupDataWriter();
51  }
52 
53  private void setupPublisher() {
54  PublisherQos qos = this.getDescription().getPublisherQos();
55  try {
56  if (qos == null) {
57  qos = this.getParticipant().getDefaultPublisherQos();
58  qos.withPolicy(PolicyFactory
59  .getPolicyFactory(this.getDescription().getEnvironment())
60  .Partition().withName(this.getDescription().getName()));
61  } else {
62  Set<String> names = qos.getPartition().getName();
63 
64  if (names.size() == 1 && names.contains("")) {
65  qos = qos.withPolicy(qos.getPartition().withName(
66  this.getDescription().getName()));
67  }
68  }
69  this.publisher = this.getParticipant().createPublisher(qos);
70  this.setLastMessageValidity(VALIDITY_TYPE.VALID);
71  } catch (AlreadyClosedException e) {
72  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
73  } catch (Exception exc) {
74  Logger.getInstance().log(exc.getMessage(), Level.SEVERE);
75  Logger.getInstance().log(exc.toString(), Level.FINEST);
76  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
77  }
78  }
79 
80  private void setupDataWriter() {
81  try {
82  DataWriterQos qos = this.getDescription().getDataWriterQos();
83 
84  if (qos == null) {
85  qos = this.publisher.getDefaultDataWriterQos();
86  }
87 
88  this.dataWriter = this.publisher.createDataWriter(this.getTopic());
89  this.setLastMessageValidity(VALIDITY_TYPE.VALID);
90  } catch (AlreadyClosedException e) {
91  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
92  } catch (Exception exc) {
93  Logger.getInstance().log(exc.getMessage(), Level.SEVERE);
94  Logger.getInstance().log(exc.toString(), Level.FINEST);
95  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
96  }
97  }
98 
99  @Override
100  public CONNECTION_DIRECTION_TYPE getDirection() {
101  return CONNECTION_DIRECTION_TYPE.SOURCE;
102  }
103 
104 
105  public RETURN_CODE_TYPE sendMessage(TYPE instanceData, long timeout, us.opengroup.FACE.LongHolder transaction_id) {
106  try {
107  if (this.dataWriter.getQos().getReliability().getKind() == Reliability.Kind.RELIABLE) {
108  long maxBlockingTime = this.dataWriter.getQos()
109  .getReliability().getMaxBlockingTime()
110  .getDuration(TimeUnit.NANOSECONDS);
111 
112  if (timeout > maxBlockingTime) {
114  "Supplied time-out > DataWriterQos.reliability.max_blocking_time ("
115  + timeout + " > " + maxBlockingTime + ").",
116  Level.WARNING);
117 
118  this.setLastMessageValidity(VALIDITY_TYPE.VALID);
119  return RETURN_CODE_TYPE.INVALID_PARAM;
120  }
121  }
122  try {
123  this.dataWriter.write(instanceData);
124  transaction_id.value = this.transactionid.incrementAndGet();
125  this.setLastMessageValidity(VALIDITY_TYPE.VALID);
126  } catch (AlreadyClosedException e) {
127  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
128  } catch (TimeoutException e) {
129  Logger.getInstance().log("Write time-out", Level.WARNING);
130  this.setLastMessageValidity(VALIDITY_TYPE.VALID);
131  return RETURN_CODE_TYPE.TIMED_OUT;
132  }
133  } catch (AlreadyClosedException e) {
134  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
135  } catch (Exception exc) {
136  Logger.getInstance().log(exc.getMessage(), Level.SEVERE);
137  Logger.getInstance().log(exc.toString(), Level.FINEST);
138  this.setLastMessageValidity(VALIDITY_TYPE.INVALID);
139  return RETURN_CODE_TYPE.CONNECTION_CLOSED;
140 
141  }
142  return RETURN_CODE_TYPE.NO_ERROR;
143  }
144 }
void log(String message, Level level)
Definition: Logger.java:47
DomainParticipant getParticipant()
Definition: Connection.java:79
CONNECTION_DIRECTION_TYPE getDirection()
SourceConnection(ConnectionDescription description, Class< TYPE > dataType)
Topic< TYPE > getTopic()
Definition: Connection.java:83
static Logger getInstance()
Definition: Logger.java:43
void setLastMessageValidity(VALIDITY_TYPE lastMessageValidity)
Definition: Connection.java:67
RETURN_CODE_TYPE sendMessage(TYPE instanceData, long timeout, us.opengroup.FACE.LongHolder transaction_id)
This is a typed class which will be generated by idlpp.
Definition: TS.java:25
ConnectionDescription getDescription()
Definition: Connection.java:75