OpenSplice ISO C++ 2 DCPS  v6.x
ISO C++ 2 OpenSplice Data Distribution Service Data-Centric Publish-Subscribe API
ContentFilteredTopic.hpp
Go to the documentation of this file.
1 #ifndef OMG_DDS_TOPIC_DETAIL_CONTENTFILTEREDTOPIC_HPP_
2 #define OMG_DDS_TOPIC_DETAIL_CONTENTFILTEREDTOPIC_HPP_
3 
4 /* Copyright 2010, Object Management Group, Inc.
5  * Copyright 2010, PrismTech, Inc.
6  * Copyright 2010, Real-Time Innovations, Inc.
7  * All rights reserved.
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  */
21 
22 #include <string>
23 #include <vector>
24 
26 #include <dds/core/types.hpp>
27 #include <dds/topic/Topic.hpp>
28 #include <dds/topic/Filter.hpp>
29 #include <org/opensplice/topic/TopicDescriptionDelegate.hpp>
30 #include <org/opensplice/core/ScopedLock.hpp>
31 
32 #include "u_topic.h"
33 #include "v_kernelParser.h"
34 
35 #ifdef OMG_DDS_CONTENT_SUBSCRIPTION_SUPPORT
36 
42 namespace dds {
43 namespace topic {
44 namespace detail {
45 
46 template <typename T>
47 class ContentFilteredTopic : public virtual org::opensplice::topic::TopicDescriptionDelegate
48 {
49 public:
50  ContentFilteredTopic(
51  const dds::topic::Topic<T>& topic,
52  const std::string& name,
53  const dds::topic::Filter& filter)
54  : org::opensplice::topic::TopicDescriptionDelegate(topic.domain_participant(), name, topic.type_name()),
55  myTopic(topic),
56  myFilter(filter)
57  {
58  ISOCPP_REPORT_STACK_DDS_BEGIN(topic);
59  validate_filter();
60  topic.delegate()->incrNrDependents();
61  this->myParticipant.delegate()->add_cfTopic(*this);
62  }
63 
64  virtual ~ContentFilteredTopic()
65  {
66  if (!this->closed) {
67  try {
68  this->close();
69  } catch (...) {
70  /* Empty: the exception throw should have already traced an error. */
71  }
72  }
73  }
74 
75  virtual void close()
76  {
77  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this);
78  org::opensplice::core::ScopedObjectLock scopedLock(*this);
79 
80  myTopic.delegate()->decrNrDependents();
81 
82  // Remove the ContentFilteredTopic from the list of topics in its participant.
83  this->myParticipant.delegate()->remove_cfTopic(*this);
84 
85  org::opensplice::core::ObjectDelegate::close();
86  }
87 
88  void
89  init(org::opensplice::core::ObjectDelegate::weak_ref_type weak_ref)
90  {
91  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this);
92 
93  /* Set weak_ref before passing ourselves to other isocpp objects. */
94  this->set_weak_ref(weak_ref);
95  /* Register topic at participant. */
96  this->myParticipant.delegate()->add_cfTopic(*this);
97  }
98 
99 private:
100  void validate_filter()
101  {
102  q_expr expr = NULL;
103  uint32_t length;
104  std::vector<c_value> params;
105 
106  length = myFilter.parameters_length();
107  if (length < 100) {
108  expr = v_parser_parse(myFilter.expression().c_str());
109  if (!expr ) {
110  ISOCPP_THROW_EXCEPTION(ISOCPP_INVALID_ARGUMENT_ERROR,
111  "filter_expression '%s' is invalid", myFilter.expression().c_str());
112  }
113  } else {
114  ISOCPP_THROW_EXCEPTION(ISOCPP_INVALID_ARGUMENT_ERROR,
115  "Invalid number of filter_parameters '%d', maximum is 99", length);
116  }
117 
118  u_topic uTopic = (u_topic)(myTopic.delegate()->get_user_handle());
119 
120  params = reader_parameters();
121  if (!u_topicContentFilterValidate2(uTopic, expr, params.empty() ? NULL : &params[0], (os_uint32)params.size())) {
122  q_dispose(expr);
123  ISOCPP_THROW_EXCEPTION(ISOCPP_INVALID_ARGUMENT_ERROR,
124  "filter_expression '%s' is invalid.", myFilter.expression().c_str());
125  }
126  q_dispose(expr);
127  }
128 
129 public:
130  std::string reader_expression() const
131  {
132  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this);
133  std::string rExpr;
134 
135  rExpr += "select * from ";
136  rExpr += myTopic.name();
137  rExpr += " where ";
138  rExpr += myFilter.expression();
139  return rExpr;
140  }
141 
142  std::vector<c_value> reader_parameters() const
143  {
144  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this);
145  std::vector<c_value> params;
146  uint32_t n, length;
147  org::opensplice::topic::FilterDelegate::const_iterator paramIterator;
148 
149  length = myFilter.parameters_length();
150  if (length != 0) {
151  for (n = 0, paramIterator = myFilter.begin(); n < length; n++, paramIterator++) {
152  params.push_back(c_stringValue(const_cast<char *>(paramIterator->c_str())));
153  }
154  }
155  return params;
156  }
157 
162  const dds::topic::Filter& filter() const
163  {
164  return myFilter;
165  }
166 
172  template <typename FWIterator>
173  void filter_parameters(const FWIterator& begin, const FWIterator& end)
174  {
175  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this);
176  ISOCPP_THROW_EXCEPTION(ISOCPP_UNSUPPORTED_ERROR, "Changing of Filter parameters is currently not supported.");
177  myFilter.parameters(begin, end);
178  validate_filter();
179  }
180 
181  const dds::topic::Topic<T>& topic() const
182  {
183  return myTopic;
184  }
185 
186  const std::string& filter_expression() const
187  {
188  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this);
189  return myFilter.expression();
190  }
191 
192  const dds::core::StringSeq filter_parameters() const
193  {
194  ISOCPP_REPORT_STACK_DELEGATE_BEGIN(this);
195  return dds::core::StringSeq(myFilter.begin(), myFilter.end());
196  }
197 
198 private:
199  dds::topic::Topic<T> myTopic;
200  dds::topic::Filter myFilter;
201 };
202 
203 }
204 }
205 }
206 
209 #endif /* OMG_DDS_CONTENT_SUBSCRIPTION_SUPPORT */
210 
211 #endif /* OMG_DDS_TOPIC_DETAIL_CONTENTFILTEREDTOPIC_HPP_ */
Filter objects contain SQL expressions that allow the application to specify a filter on the locally ...
Definition: Filter.hpp:57
Topic is the most basic description of the data to be published and subscribed.
Definition: Topic.hpp:36
Definition: array.hpp:23
std::vector< std::string > StringSeq
Definition: types.hpp:43