This note provides information on using the Dynamic Network Partitions feature for RTNetworking. The feature allows users to amend the configuration of the RTNetworking service at runtime by means of a Topic-API. The following sections explain the interface and restrictions on using the feature.
The API for retrieving and changing supported configuration for RTNetworking at runtime consists of a data-model, RTNetworking DDS entities and some preconditions for changing the configuration.
Interaction with the RTNetworking service happens by means of three topics in the __BUILT-IN PARTITION__. The ospl_rtn_ConfigRequest- and ospl_rtn_ConfigResponse-topic implement a one-to-many request-reply pattern. The ospl_rtn_ConfigState-topic implements a state-based pattern.
The RTNetworking service will create the following three topics with the listed QoS (only non-default QoS are listed):
The RTNetworking service will create the following three entities with the listed QoS (only non-default QoS are listed). RTNetworking publishes and subscribes in the __BUILT-IN PARTITION__.
There are a couple of preconditions to be fulfilled in order to get proper behaviour on dynamically added configuration.
For a description on the members of types used in the API, below the IDL for the API is included. The IDL contains annotations on members to clarify their use.
module org { module opensplice { module dds { module rtnetworking { /* A name-string-value pair */ struct NameStringValue_t { /* The name of the attribute */ string name; /* The value of the attribute */ string value; }; /* A name-octet-value pair */ struct NameOctetValue_t { /* The name of the attribute */ string name; /* The corresponding value of the attribute */ sequence<octet> value; }; /* Unique id for networking services and/or clients. * This structure is 16-bytes, allowing a GUID_t to * be used. * The networking service will fill the fields with * the BuiltinTopicKey_t of its participant, allowing * the information available in the * __BUILT-IN PARTITION__ to be used. */ struct Gid_t { unsigned long long prefix; /* Most significant 32 bits of 96-bit BuiltinTopicKey_t (systemId) */ unsigned long long suffix; /* Least significant 64 bits of 96-bit BuiltinTopicKey_t (localId and serial) */ }; /* Unique identification of a client request */ struct RequestId_t { /* Identifies the the entity that is issueing the request. */ Gid_t requestorId; /* Identifies the request within an entity. */ unsigned long requestId; }; struct ConfigRequest_t { /* Uniquely identifies the request. */ RequestId_t requestId; /* The scope of the request. All servers with a matching role * will process the request. * If both scope and serverIds are set, only servers in the * intersection of both sets process the request. */ string scope; /* Optional list of unique ids of the servers that are addressed * with this request if applicable. If an empty sequence is * provided, all servers that receive the request will process * the request (see note on scope). */ sequence<Gid_t> serverIds; /* The revision of the configuration the request should be * applied to. Default is don't care (0).*/ unsigned long configRevision; /* The revision the configuration request should lead to * after the request is applied. Default is don't care (0).*/ unsigned long resultConfigRevision; /* Sequence of configuration elements */ sequence<NameStringValue_t> config; /* Potential (product/vendor-specific) extensions */ sequence<NameOctetValue_t> extensions; }; //@Extensibility(EXTENSIBLE_EXTENSIBILITY) #pragma keylist ConfigRequest_t requestId.requestorId.prefix requestId.requestorId.suffix typedef unsigned short ConfigResult_t; const ConfigResult_t RESULT_OK = 0; const ConfigResult_t RESULT_ERROR = 1; const ConfigResult_t RESULT_PARSE_ERROR = 2; const ConfigResult_t RESULT_IMMUTABLE = 3; const ConfigResult_t RESULT_NOT_ALLOWED = 4; const ConfigResult_t RESULT_UNMATCHING_CONFIG_REVISION = 5; const ConfigResult_t RESULT_NOT_SUPPORTED = 6; struct ConfigResponse_t { /* Uniquely identifies the request this is a response to. */ RequestId_t requestId; /* Uniquely identifies the responder. */ Gid_t serverId; /* The result of processing the related ConfigRequest_t */ ConfigResult_t result; /* The revision of the configuration the requested config * was (tried to be) applied to. */ unsigned long configRevision; /* The revision the configuration request lead to after * the request was applied. */ unsigned long resultConfigRevision; /* Optional description on the result of processing the related ConfigRequest_t. */ string description; /* Potential (product/vendor-specific) extensions */ sequence<NameOctetValue_t> extensions; }; //@Extensibility(EXTENSIBLE_EXTENSIBILITY) #pragma keylist ConfigResponse_t serverId.prefix serverId.suffix struct ConfigState_t { /* Uniquely identifies the server. */ Gid_t serverId; /* Role of the server */ string role; /* Revision of the configuration currently in use by the service */ unsigned long configRevision; /* Sequence of current configuration elements */ sequence<NameStringValue_t> currentConfig; /* Potential (product/vendor-specific) extensions */ sequence<NameOctetValue_t> extensions; }; //@Extensibility(EXTENSIBLE_EXTENSIBILITY) #pragma keylist ConfigState_t serverId.prefix serverId.suffix }; }; }; };
Below is a (pseudo-)code example of topic-API interactions with RTNetworking services. Free's, checks of return-codes and allocations are left out in the example.
/* Obtain DP-factory */ dpf = DDS_DomainParticipantFactory_get_instance(); /* Create DP */ dp = DDS_DomainParticipantFactory_create_participant(dpf, DDS_DOMAIN_ID_DEFAULT, DDS_PARTICIPANT_QOS_DEFAULT, NULL, DDS_STATUS_MASK_NONE); /* Register Types */ /* ospl_rtn_ConfigResponse */ cRes_TS = org_opensplice_dds_rtnetworking_ConfigResponse_tTypeSupport__alloc(); cRes_TN = org_opensplice_dds_rtnetworking_ConfigResponse_tTypeSupport_get_type_name(cRes_TS); org_opensplice_dds_rtnetworking_ConfigResponse_tTypeSupport_register_type(cRes_TS, dp, cRes_TN); /* ospl_rtn_ConfigRequest */ cReq_TS = org_opensplice_dds_rtnetworking_ConfigRequest_tTypeSupport__alloc(); cReq_TN = org_opensplice_dds_rtnetworking_ConfigRequest_tTypeSupport_get_type_name(cReq_TS); org_opensplice_dds_rtnetworking_ConfigRequest_tTypeSupport_register_type(cReq_TS, dp, cReq_TN); /* ospl_rtn_ConfigState */ cs_TS = org_opensplice_dds_rtnetworking_ConfigState_tTypeSupport__alloc(); cs_TN = org_opensplice_dds_rtnetworking_ConfigState_tTypeSupport_get_type_name(cs_TS); org_opensplice_dds_rtnetworking_ConfigState_tTypeSupport_register_type(cs_TS, dp, cs_TN); /* Create Topics */ /* ospl_rtn_ConfigResponse */ cRes_TQ = DDS_TopicQos__alloc(); DDS_DomainParticipant_get_default_topic_qos(dp, cRes_TQ); cRes_TQ->history.kind = DDS_KEEP_ALL_HISTORY_QOS; cRes_TQ->reliability.kind = DDS_RELIABLE_RELIABILITY_QOS; cRes_TQ->destination_order.kind = DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS; cRes_T = DDS_DomainParticipant_create_topic(dp, "ospl_rtn_ConfigResponse", cRes_TN, cRes_TQ, NULL, DDS_STATUS_MASK_NONE); /* ospl_rtn_ConfigRequest */ cReq_TQ = DDS_TopicQos__alloc(); DDS_DomainParticipant_get_default_topic_qos(dp, cReq_TQ); cReq_TQ->durability.kind = DDS_PERSISTENT_DURABILITY_QOS; cReq_TQ->history.kind = DDS_KEEP_ALL_HISTORY_QOS; cReq_TQ->reliability.kind = DDS_RELIABLE_RELIABILITY_QOS; cReq_TQ->destination_order.kind = DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS; cReq_T = DDS_DomainParticipant_create_topic(dp, "ospl_rtn_ConfigRequest", cReq_TN, cReq_TQ, NULL, DDS_STATUS_MASK_NONE); /* ospl_rtn_ConfigState */ cs_TQ = DDS_TopicQos__alloc(); DDS_DomainParticipant_get_default_topic_qos(dp, cs_TQ); cs_TQ->durability.kind = DDS_TRANSIENT_DURABILITY_QOS; cs_TQ->history.kind = DDS_KEEP_LAST_HISTORY_QOS; cs_TQ->history.depth = 1; cs_TQ->reliability.kind = DDS_RELIABLE_RELIABILITY_QOS; cs_TQ->destination_order.kind = DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS; cs_T = DDS_DomainParticipant_create_topic(dp, "ospl_rtn_ConfigState", cs_TN, cs_TQ, NULL, DDS_STATUS_MASK_NONE); /* Create Subscriber */ sQ = DDS_SubscriberQos__alloc(); /* __alloc()'s are assumed to succeed */ DDS_DomainParticipant_get_default_subscriber_qos(dp, sQ); DDS_free(sQ->partition.name._buffer); sQ->partition.name._length = sQ->partition.name._maximum = 1; sQ->partition.name._release = TRUE; sQ->partition.name._buffer = DDS_StringSeq_allocbuf (sQ->partition.name._maximum); sQ->partition.name._buffer[0] = DDS_string_dup("__BUILT-IN PARTITION__"); s = DDS_DomainParticipant_create_subscriber(dp, sQ, NULL, DDS_STATUS_MASK_NONE); /* Create DataReaders */ /* ospl_rtn_ConfigResponse */ cRes_DRQ = DDS_DataReaderQos__alloc(); DDS_Subscriber_get_default_datareader_qos(s, cRes_DRQ); DDS_Subscriber_copy_from_topic_qos(s, cRes_DRQ, cRes_TQ); cRes_DR = DDS_Subscriber_create_datareader(s, cRes_T, cRes_DRQ, NULL, DDS_STATUS_MASK_NONE); /* ospl_rtn_ConfigState */ cs_DRQ = DDS_DataReaderQos__alloc(); /* __alloc()'s are assumed to succeed */ DDS_Subscriber_get_default_datareader_qos(s, cs_DRQ); DDS_Subscriber_copy_from_topic_qos(s, cs_DRQ, cs_TQ); cs_DR = DDS_Subscriber_create_datareader(s, cs_T, cs_DRQ, NULL, DDS_STATUS_MASK_NONE); /* Create Publisher */ pQ = DDS_PublisherQos__alloc(); /* __alloc()'s are assumed to succeed */ DDS_DomainParticipant_get_default_publisher_qos(dp, pQ); DDS_free(pQ->partition.name._buffer); pQ->partition.name._length = pQ->partition.name._maximum = 1; pQ->partition.name._release = TRUE; pQ->partition.name._buffer = DDS_StringSeq_allocbuf (pQ->partition.name._maximum); pQ->partition.name._buffer[0] = DDS_string_dup("__BUILT-IN PARTITION__"); p = DDS_DomainParticipant_create_publisher(dp, pQ, NULL, DDS_STATUS_MASK_NONE); /* Create DataWriter */ cReq_DWQ = DDS_DataWriterQos__alloc(); DDS_Publisher_get_default_datawriter_qos(p, cReq_DWQ); DDS_Publisher_copy_from_topic_qos(p, cReq_DWQ, cReq_TQ); cReq_DW = DDS_Publisher_create_datawriter(p, cReq_T, cReq_DWQ, NULL, DDS_STATUS_MASK_NONE); /* Retrieve TRANSIENT ospl_rtn_ConfigState instances; alternatively an empty ospl_rtn_ConfigRequest can be * written, as will be done a bit later as well. */ org_opensplice_dds_rtnetworking_ConfigState_tDataReader_wait_for_historical_data(cs_DR, &myTimeout); /* Create WaitSet and Conditions for readers */ /* ospl_rtn_ConfigResponse */ cRes_C = org_opensplice_dds_rtnetworking_ConfigResponse_tDataReader_get_statuscondition(cRes_DR); DDS_StatusCondition_set_enabled_statuses(cRes_C, DDS_DATA_AVAILABLE_STATUS); /* ospl_rtn_ConfigState */ cs_C = org_opensplice_dds_rtnetworking_ConfigState_tDataReader_get_statuscondition(cs_DR); DDS_StatusCondition_set_enabled_statuses(cs_C, DDS_DATA_AVAILABLE_STATUS); ws = DDS_WaitSet__alloc(); DDS_WaitSet_attach_condition(ws, cRes_C); DDS_WaitSet_attach_condition(ws, cs_C); conds = DDS_ConditionSeq__alloc(); /* Now wait for data; for now ospl_rtn_ConfigStates are expected for all live RTNetworking services * if Durability is properly configured. */ while((result = DDS_WaitSet_wait(ws, conds, &myTimeout)) == DDS_RETCODE_OK){ DDS_Condition c; unsigned i; for(i = 0; i < conds->_length; i++){ c = conds->_buffer[i]; if(c == cs_C) { /* cs_DR has data available */ configStateTakeAllAndPrint(cs_DR); /* Implementation is left as an exercise for the reader */ } else if (c == cRes_C) { /* cRes_DR has data available */ configResponseTakeAllAndPrint(cRes_DR); /* Implementation is left as an exercise for the reader */ } } } /* The output of the above would be something like (depending on the implementation of configStateTakeAllAndPrint * and configResponseTakeAllAndPrint): * * ConfigState from server [1091538F.9800000001] * { * role='myRole', * configRevision=0, * currentConfig[1] { * currentConfig[0] -> {name='Partitioning/NetworkPartitions', value='<!-- id: 1, hash: B8C3B880 --><NetworkPartition Address="225.0.0.1" Name="MyPart" Connected="true" Compression="false" MulticastTimeToLive="32" />'} * } * } * * * NOTE: The output contains a comment which can be used for consistency checking. These are implementation specific * and can't be guaranteed to be always available. The formatting of the XML in the ospl_rtn_ConfigState may be * different than in the configuration-file or ospl_rtn_configRequest. */ { /* Now write three ospl_rtn_ConfigRequests to show API behaviour. Results are printed afterwards. */ org_opensplice_dds_rtnetworking_ConfigRequest_t data; /* Write an empty (no configs) request. This should cause all available RTNetworking * services to send an ospl_rtn_ConfigResponse and an update on ospl_rtn_ConfigState. This * can be used to discover all currently available RTNetworking services. */ memset(&data, 0, sizeof(data)); /* Unique ID for this requestor */ data.requestId.requestorId.prefix = 0x8badf00ddeadbeef; data.requestId.requestorId.suffix = 0xdefec8edbadbeef0; /* Unique ID for this request */ data.requestId.requestId = 1; data.scope = DDS_string_dup("*"); /* Not supported yet; leave at "*" or "" */ /* data.serverIds -> Not supported yet; leave zeroed out */ data.configRevision = 0; /* Not supported yet; leave at 0 (don't care) */ data.resultConfigRevision = 0; /* Not supported yet; leave at 0 (don't care) */ /* data.config -> Empty config means: request update on ospl_rtn_ConfigState topic */ /* data.extensions -> Not needed; leave zeroed out */ org_opensplice_dds_rtnetworking_ConfigRequest_tDataWriter_write(cReq_DW, &data, DDS_HANDLE_NIL); /* The next request will try modification of an existing and currently "connected" NetworkPartition * "MyPart". All attributes of existing NetworkPartitions are currently immutable, so this should * cause services to respond with an error (RESULT_IMMUTABLE) and possibly a description. * If errors can be detected, the entire request will be ignored. First all supplied NetworkPartitions * will be checked for consistency. */ data.requestId.requestId++; data.config._length = data.config._maximum = 2; data.config._buffer = DDS_sequence_org_opensplice_dds_rtnetworking_NameStringValue_t_allocbuf(data.config._maximum); /* The name-field contains an XPATH expression relative to the NetworkService element. */ data.config._buffer[0].name = DDS_string_dup("Partitioning/NetworkPartitions"); /* The value-field contains a valid XML-snippet with a single root element. */ data.config._buffer[0].value = DDS_string_dup("<NetworkPartition Address=\"225.0.0.1\" Connected=\"false\" Name=\"MyPart\" />"); data.config._buffer[1].name = DDS_string_dup("Partitioning/PartitionMappings"); data.config._buffer[1].value = DDS_string_dup("<PartitionMapping DCPSPartitionTopic=\"MyPart.*\" NetworkPartition=\"MyPart\" />"); org_opensplice_dds_rtnetworking_ConfigRequest_tDataWriter_write(cReq_DW, &data, DDS_HANDLE_NIL); /* The next request will add a new NetworkPartition "MyPart2" and a related PartitionMapping. * This time the root XPATH will be used, containing multiple config-changes in a single XML- * snippet. */ data.requestId.requestId++; data.config._length = 1; /* The name-field contains an XPATH expression relative to the NetworkService element. By using an * empty XPATH, the root is used (directly under the NetworkService-element). */ data.config._buffer[0].name = DDS_string_dup(""); data.config._buffer[0].value = DDS_string_dup( "<Partitioning>" "<NetworkPartitions>" "<NetworkPartition Address=\"225.0.0.2\" Connected=\"true\" Name=\"MyPart2\" />" "</NetworkPartitions>" "<PartitionMappings>" "<PartitionMapping DCPSPartitionTopic=\"MyPart2.*\" NetworkPartition=\"MyPart2\" />" "</PartitionMappings>" "</Partitioning>"); org_opensplice_dds_rtnetworking_ConfigRequest_tDataWriter_write(cReq_DW, &data, DDS_HANDLE_NIL); } /* Take all data and print the results */ while((result = DDS_WaitSet_wait(ws, conds, &myTimeout)) == DDS_RETCODE_OK){ DDS_Condition c; unsigned i; for(i = 0; i < conds->_length; i++){ c = conds->_buffer[i]; if(c == cs_C) { /* cs_DR has data available */ configStateTakeAllAndPrint(cs_DR); /* Implementation is left as an exercise for the reader */ } else if (c == cRes_C) { /* cRes_DR has data available */ configResponseTakeAllAndPrint(cRes_DR); /* Implementation is left as an exercise for the reader */ } } } /* The output of the above would be something like (depending on the implementation of configStateTakeAllAndPrint * and configResponseTakeAllAndPrint): * * ConfigState from server [1091538F.9800000001] * { * role='myRole', * configRevision=0, * currentConfig[1] { * currentConfig[0] -> {name='Partitioning/NetworkPartitions', value='<!-- id: 1, hash: B8C3B880 --><NetworkPartition Address="225.0.0.1" Name="MyPart" Connected="true" Compression="false" MulticastTimeToLive="32" />'} * } * } * * ConfigResponse from server [1091538F.9800000001] for request [8BADF00DDEADBEEF.DEFEC8EDBADBEEF0]:1 * { * result=0 (RESULT_OK), * configRevision=0, * resultConfigRevision=0, * description='' * } * * ConfigResponse from server [1091538F.9800000001] for request [8BADF00DDEADBEEF.DEFEC8EDBADBEEF0]:2 * { * result=3 (RESULT_IMMUTABLE), * configRevision=0, * resultConfigRevision=0, * description='NetworkPartition 'MyPart' already exists with a different configuration.' * } * * ConfigState from server [1091538F.9800000001] * { * role='myRole', * configRevision=0, * currentConfig[3] { * currentConfig[0] -> {name='Partitioning/NetworkPartitions', value='<!-- id: 1, hash: B8C3B880 --><NetworkPartition Address="225.0.0.1" Name="MyPart" Connected="true" Compression="false" MulticastTimeToLive="32" />'} * currentConfig[1] -> {name='Partitioning/NetworkPartitions', value='<!-- id: 2, hash: 853EB638 --><NetworkPartition Address="225.0.0.2" Name="MyPart2" Connected="true" Compression="false" MulticastTimeToLive="32" />'} * currentConfig[2] -> {name='Partitioning/PartitionMappings', value='<!-- id: 2 --><PartitionMapping DCPSPartitionTopic="MyPart2.*" NetworkPartition="MyPart2" />'} * } * } * * ConfigResponse from server [1091538F.9800000001] for request [8BADF00DDEADBEEF.DEFEC8EDBADBEEF0]:3 * { * result=0 (RESULT_OK), * configRevision=0, * resultConfigRevision=0, * description='' * } */