Introduction

This note provides information on using the Dynamic Network Partitions feature for RTNetworking. The feature allows users to amend the configuration of the RTNetworking service at runtime by means of a Topic-API. The following sections explain the interface and restrictions on using the feature.

API reference - Introduction

The API for retrieving and changing supported configuration for RTNetworking at runtime consists of a data-model, RTNetworking DDS entities and some preconditions for changing the configuration.

Interaction with the RTNetworking service happens by means of three topics in the __BUILT-IN PARTITION__. The ospl_rtn_ConfigRequest- and ospl_rtn_ConfigResponse-topic implement a one-to-many request-reply pattern. The ospl_rtn_ConfigState-topic implements a state-based pattern.

API reference - Topics

The RTNetworking service will create the following three topics with the listed QoS (only non-default QoS are listed):

  1. ospl_rtn_ConfigRequest
    • DURABILITY.kind = PERSISTENT
    • HISTORY.kind = KEEPALL
    • RELIABILITY.kind = RELIABLE
    • DESTINATION_ORDER.kind = BY_SOURCE_TIMESTAMP
  2. ospl_rtn_ConfigResponse
    • HISTORY.kind = KEEPALL
    • RELIABILITY.kind = RELIABLE
    • DESTINATION_ORDER.kind = BY_SOURCE_TIMESTAMP
  3. ospl_rtn_ConfigState
    • DURABILITY.kind = TRANSIENT
    • RELIABILITY.kind = RELIABLE
    • DESTINATION_ORDER.kind = BY_SOURCE_TIMESTAMP
For a description of the types used for the topics, please refer to XXX.

API reference - Entities

The RTNetworking service will create the following three entities with the listed QoS (only non-default QoS are listed). RTNetworking publishes and subscribes in the __BUILT-IN PARTITION__.

  1. ospl_rtn_ConfigRequest-reader
    • HISTORY.kind = KEEPALL
    • RELIABILITY.kind = RELIABLE
    • DESTINATION_ORDER.kind = BY_SOURCE_TIMESTAMP
  2. ospl_rtn_ConfigResponse-writer
    • HISTORY.kind = KEEPALL
    • RELIABILITY.kind = RELIABLE
    • DESTINATION_ORDER.kind = BY_SOURCE_TIMESTAMP
  3. ospl_rtn_ConfigState-writer
    • DURABILITY.kind = TRANSIENT
    • HISTORY.kind = KEEPALL
    • DESTINATION_ORDER.kind = BY_SOURCE_TIMESTAMP

API reference - Preconditions

There are a couple of preconditions to be fulfilled in order to get proper behaviour on dynamically added configuration.

  1. Newly introduced NetworkPartitions and PartitionMappings only apply to new Topics and Partitions, created after the new mappings are in effect. In other words remapping of an existing Topic or Partition is not supported. If a Topic/Partition doesn’t match any of the PartitionMappings, it will be communicated over the GlobalPartition, so a new PartitionMapping has to be effectuated before a new Topic/Partition is used on a node. This also means that a newly added node should either have all NetworkPartitions and PartitionMappings pre-configured or not use the related Topics (until dynamically configured and effectuated).
    Rationale: When a Topic would be allowed to switch NetworkPartitions, the communication scope can change or communication can temporarily drop if one of the nodes doesn’t have the new NetworkPartition yet. This could cause loss of reliable communication which is obviously unwanted.
  2. Inbound traffic destined for an unknown NetworkPartition is dropped.
    Rationale: When traffic arriving on a node destined for an unknown NetworkPartition would delivered anyway (as if from the GlobalPartition), the above restriction (1) would be impossible to maintain during a gradual roll-out of new configuration settings.
  3. Topics with TRANSIENT or PERSISTENT Durability-QoS will not be aligned when a new PartitionMapping is effectuated.
    Rationale: The Durability service ensures that historical data published before a connection is established gets aligned. After that, the reliable connection ensures the state remains aligned. Establishing a partial) connection after initial alignment occurred will not trigger a new alignment action. If historical data needs to be consistent across nodes it will have to be ensured that all nodes have the new NetworkPartitions and PartitionMappings effectuated before publication of the related Topic is started on any of the involved nodes.

IDL data model

For a description on the members of types used in the API, below the IDL for the API is included. The IDL contains annotations on members to clarify their use.

module org {
  module opensplice {
    module dds {
      module rtnetworking {
        /* A name-string-value pair */
        struct NameStringValue_t {
          /* The name of the attribute */
          string name;

          /* The value of the attribute */
          string value;
        };

        /* A name-octet-value pair */
        struct NameOctetValue_t {
          /* The name of the attribute */
          string name;

          /* The corresponding value of the attribute */
          sequence<octet> value;
        };

        /* Unique id for networking services and/or clients.
         * This structure is 16-bytes, allowing a GUID_t to
         * be used.
         * The networking service will fill the fields with
         * the BuiltinTopicKey_t of its participant, allowing
         * the information available in the
         * __BUILT-IN PARTITION__ to be used. */
        struct Gid_t {
          unsigned long long prefix; /* Most significant 32 bits of 96-bit BuiltinTopicKey_t (systemId) */
          unsigned long long suffix; /* Least significant 64 bits of 96-bit BuiltinTopicKey_t (localId and serial) */
        };

        /* Unique identification of a client request */
        struct RequestId_t {
          /* Identifies the the entity that is issueing the request. */
          Gid_t requestorId;

          /* Identifies the request within an entity. */
          unsigned long requestId;
        };

        struct ConfigRequest_t {
          /* Uniquely identifies the request. */
          RequestId_t requestId;

          /* The scope of the request. All servers with a matching role
           * will process the request.
           * If both scope and serverIds are set, only servers in the
           * intersection of both sets process the request. */
          string scope;

          /* Optional list of unique ids of the servers that are addressed
           * with this request if applicable. If an empty sequence is
           * provided, all servers that receive the request will process
           * the request (see note on scope). */
          sequence<Gid_t> serverIds;

          /* The revision of the configuration the request should be
           * applied to. Default is don't care (0).*/
          unsigned long configRevision;

          /* The revision the configuration request should lead to
           * after the request is applied. Default is don't care (0).*/
          unsigned long resultConfigRevision;

          /* Sequence of configuration elements */
          sequence<NameStringValue_t> config;

          /* Potential (product/vendor-specific) extensions */
          sequence<NameOctetValue_t> extensions;
        }; //@Extensibility(EXTENSIBLE_EXTENSIBILITY)
        #pragma keylist ConfigRequest_t requestId.requestorId.prefix requestId.requestorId.suffix

        typedef unsigned short ConfigResult_t;

        const ConfigResult_t RESULT_OK                          = 0;
        const ConfigResult_t RESULT_ERROR                       = 1;
        const ConfigResult_t RESULT_PARSE_ERROR                 = 2;
        const ConfigResult_t RESULT_IMMUTABLE                   = 3;
        const ConfigResult_t RESULT_NOT_ALLOWED                 = 4;
        const ConfigResult_t RESULT_UNMATCHING_CONFIG_REVISION  = 5;
        const ConfigResult_t RESULT_NOT_SUPPORTED               = 6;

        struct ConfigResponse_t {
          /* Uniquely identifies the request this is a response to. */
          RequestId_t requestId;

          /* Uniquely identifies the responder. */
          Gid_t serverId;

          /* The result of processing the related ConfigRequest_t */
          ConfigResult_t result;

          /* The revision of the configuration the requested config
           * was (tried to be) applied to. */
          unsigned long configRevision;

          /* The revision the configuration request lead to after
           * the request was applied. */
          unsigned long resultConfigRevision;

          /* Optional description on the result of processing the related ConfigRequest_t. */
          string description;

          /* Potential (product/vendor-specific) extensions */
          sequence<NameOctetValue_t> extensions;
        }; //@Extensibility(EXTENSIBLE_EXTENSIBILITY)
        #pragma keylist ConfigResponse_t serverId.prefix serverId.suffix

        struct ConfigState_t {
          /* Uniquely identifies the server. */
          Gid_t serverId;

          /* Role of the server */
          string role;

          /* Revision of the configuration currently in use by the service */
          unsigned long configRevision;

          /* Sequence of current configuration elements */
          sequence<NameStringValue_t> currentConfig;

          /* Potential (product/vendor-specific) extensions */
          sequence<NameOctetValue_t> extensions;
        }; //@Extensibility(EXTENSIBLE_EXTENSIBILITY)
        #pragma keylist ConfigState_t serverId.prefix serverId.suffix
      };
    };
  };
};

Example

Below is a (pseudo-)code example of topic-API interactions with RTNetworking services. Free's, checks of return-codes and allocations are left out in the example.

/* Obtain DP-factory     */
dpf = DDS_DomainParticipantFactory_get_instance();
/* Create DP */
dp = DDS_DomainParticipantFactory_create_participant(dpf, DDS_DOMAIN_ID_DEFAULT, DDS_PARTICIPANT_QOS_DEFAULT, NULL, DDS_STATUS_MASK_NONE);
/* Register Types */
/* ospl_rtn_ConfigResponse */
cRes_TS = org_opensplice_dds_rtnetworking_ConfigResponse_tTypeSupport__alloc();
cRes_TN = org_opensplice_dds_rtnetworking_ConfigResponse_tTypeSupport_get_type_name(cRes_TS);
org_opensplice_dds_rtnetworking_ConfigResponse_tTypeSupport_register_type(cRes_TS, dp, cRes_TN);

/* ospl_rtn_ConfigRequest */
cReq_TS = org_opensplice_dds_rtnetworking_ConfigRequest_tTypeSupport__alloc();
cReq_TN = org_opensplice_dds_rtnetworking_ConfigRequest_tTypeSupport_get_type_name(cReq_TS);
org_opensplice_dds_rtnetworking_ConfigRequest_tTypeSupport_register_type(cReq_TS, dp, cReq_TN);

/* ospl_rtn_ConfigState */
cs_TS = org_opensplice_dds_rtnetworking_ConfigState_tTypeSupport__alloc();
cs_TN = org_opensplice_dds_rtnetworking_ConfigState_tTypeSupport_get_type_name(cs_TS);
org_opensplice_dds_rtnetworking_ConfigState_tTypeSupport_register_type(cs_TS, dp, cs_TN);

/* Create Topics */
/* ospl_rtn_ConfigResponse */
cRes_TQ = DDS_TopicQos__alloc();
DDS_DomainParticipant_get_default_topic_qos(dp, cRes_TQ);
cRes_TQ->history.kind = DDS_KEEP_ALL_HISTORY_QOS;
cRes_TQ->reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
cRes_TQ->destination_order.kind = DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
cRes_T = DDS_DomainParticipant_create_topic(dp, "ospl_rtn_ConfigResponse", cRes_TN, cRes_TQ, NULL, DDS_STATUS_MASK_NONE);

/* ospl_rtn_ConfigRequest */
cReq_TQ = DDS_TopicQos__alloc();
DDS_DomainParticipant_get_default_topic_qos(dp, cReq_TQ);
cReq_TQ->durability.kind = DDS_PERSISTENT_DURABILITY_QOS;
cReq_TQ->history.kind = DDS_KEEP_ALL_HISTORY_QOS;
cReq_TQ->reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
cReq_TQ->destination_order.kind = DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
cReq_T = DDS_DomainParticipant_create_topic(dp, "ospl_rtn_ConfigRequest", cReq_TN, cReq_TQ, NULL, DDS_STATUS_MASK_NONE);

/* ospl_rtn_ConfigState */
cs_TQ = DDS_TopicQos__alloc();
DDS_DomainParticipant_get_default_topic_qos(dp, cs_TQ);
cs_TQ->durability.kind = DDS_TRANSIENT_DURABILITY_QOS;
cs_TQ->history.kind = DDS_KEEP_LAST_HISTORY_QOS;
cs_TQ->history.depth = 1;
cs_TQ->reliability.kind = DDS_RELIABLE_RELIABILITY_QOS;
cs_TQ->destination_order.kind = DDS_BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;
cs_T = DDS_DomainParticipant_create_topic(dp, "ospl_rtn_ConfigState", cs_TN, cs_TQ, NULL, DDS_STATUS_MASK_NONE);

/* Create Subscriber */
sQ = DDS_SubscriberQos__alloc(); /* __alloc()'s are assumed to succeed */
DDS_DomainParticipant_get_default_subscriber_qos(dp, sQ);
DDS_free(sQ->partition.name._buffer);
sQ->partition.name._length = sQ->partition.name._maximum = 1;
sQ->partition.name._release = TRUE;
sQ->partition.name._buffer = DDS_StringSeq_allocbuf (sQ->partition.name._maximum);
sQ->partition.name._buffer[0] = DDS_string_dup("__BUILT-IN PARTITION__");
s = DDS_DomainParticipant_create_subscriber(dp, sQ, NULL, DDS_STATUS_MASK_NONE);

/* Create DataReaders */
/* ospl_rtn_ConfigResponse */
cRes_DRQ = DDS_DataReaderQos__alloc();
DDS_Subscriber_get_default_datareader_qos(s, cRes_DRQ);
DDS_Subscriber_copy_from_topic_qos(s, cRes_DRQ, cRes_TQ);
cRes_DR = DDS_Subscriber_create_datareader(s, cRes_T, cRes_DRQ, NULL, DDS_STATUS_MASK_NONE);

/* ospl_rtn_ConfigState */
cs_DRQ = DDS_DataReaderQos__alloc(); /* __alloc()'s are assumed to succeed */
DDS_Subscriber_get_default_datareader_qos(s, cs_DRQ);
DDS_Subscriber_copy_from_topic_qos(s, cs_DRQ, cs_TQ);
cs_DR = DDS_Subscriber_create_datareader(s, cs_T, cs_DRQ, NULL, DDS_STATUS_MASK_NONE);

/* Create Publisher */
pQ = DDS_PublisherQos__alloc(); /* __alloc()'s are assumed to succeed */
DDS_DomainParticipant_get_default_publisher_qos(dp, pQ);
DDS_free(pQ->partition.name._buffer);
pQ->partition.name._length = pQ->partition.name._maximum = 1;
pQ->partition.name._release = TRUE;
pQ->partition.name._buffer = DDS_StringSeq_allocbuf (pQ->partition.name._maximum);
pQ->partition.name._buffer[0] = DDS_string_dup("__BUILT-IN PARTITION__");
p = DDS_DomainParticipant_create_publisher(dp, pQ, NULL, DDS_STATUS_MASK_NONE);

/* Create DataWriter */
cReq_DWQ = DDS_DataWriterQos__alloc();
DDS_Publisher_get_default_datawriter_qos(p, cReq_DWQ);
DDS_Publisher_copy_from_topic_qos(p, cReq_DWQ, cReq_TQ);
cReq_DW = DDS_Publisher_create_datawriter(p, cReq_T, cReq_DWQ, NULL, DDS_STATUS_MASK_NONE);

/* Retrieve TRANSIENT ospl_rtn_ConfigState instances; alternatively an empty ospl_rtn_ConfigRequest can be
 * written, as will be done a bit later as well. */
org_opensplice_dds_rtnetworking_ConfigState_tDataReader_wait_for_historical_data(cs_DR, &myTimeout);

/* Create WaitSet and Conditions for readers */
/* ospl_rtn_ConfigResponse */
cRes_C = org_opensplice_dds_rtnetworking_ConfigResponse_tDataReader_get_statuscondition(cRes_DR);
DDS_StatusCondition_set_enabled_statuses(cRes_C, DDS_DATA_AVAILABLE_STATUS);

/* ospl_rtn_ConfigState */
cs_C = org_opensplice_dds_rtnetworking_ConfigState_tDataReader_get_statuscondition(cs_DR);
DDS_StatusCondition_set_enabled_statuses(cs_C, DDS_DATA_AVAILABLE_STATUS);

ws = DDS_WaitSet__alloc();
DDS_WaitSet_attach_condition(ws, cRes_C);
DDS_WaitSet_attach_condition(ws, cs_C);
conds = DDS_ConditionSeq__alloc();

/* Now wait for data; for now ospl_rtn_ConfigStates are expected for all live RTNetworking services
 * if Durability is properly configured. */
while((result = DDS_WaitSet_wait(ws, conds, &myTimeout)) == DDS_RETCODE_OK){
    DDS_Condition c;
    unsigned i;
    for(i = 0; i < conds->_length; i++){
        c = conds->_buffer[i];
        if(c == cs_C) {
            /* cs_DR has data available */
            configStateTakeAllAndPrint(cs_DR); /* Implementation is left as an exercise for the reader */
        } else if (c == cRes_C) {
            /* cRes_DR has data available */
            configResponseTakeAllAndPrint(cRes_DR); /* Implementation is left as an exercise for the reader */
        }
    }
}

/* The output of the above would be something like (depending on the implementation of configStateTakeAllAndPrint
 * and configResponseTakeAllAndPrint):
 *
 * ConfigState from server [1091538F.9800000001]
 * {
 *   role='myRole',
 *   configRevision=0,
 *   currentConfig[1] {
 *     currentConfig[0] -> {name='Partitioning/NetworkPartitions', value='<!-- id: 1, hash: B8C3B880 --><NetworkPartition Address="225.0.0.1" Name="MyPart" Connected="true" Compression="false" MulticastTimeToLive="32" />'}
 *   }
 * }
 *
 *
 * NOTE: The output contains a comment which can be used for consistency checking. These are implementation specific
 * and can't be guaranteed to be always available. The formatting of the XML in the ospl_rtn_ConfigState may be
 * different than in the configuration-file or ospl_rtn_configRequest. */

{
    /* Now write three ospl_rtn_ConfigRequests to show API behaviour. Results are printed afterwards. */
    org_opensplice_dds_rtnetworking_ConfigRequest_t data;

    /* Write an empty (no configs) request. This should cause all available RTNetworking
     * services to send an ospl_rtn_ConfigResponse and an update on ospl_rtn_ConfigState. This
     * can be used to discover all currently available RTNetworking services. */
    memset(&data, 0, sizeof(data));
    /* Unique ID for this requestor */
    data.requestId.requestorId.prefix = 0x8badf00ddeadbeef;
    data.requestId.requestorId.suffix = 0xdefec8edbadbeef0;
    /* Unique ID for this request */
    data.requestId.requestId = 1;
    data.scope = DDS_string_dup("*"); /* Not supported yet; leave at "*" or "" */
    /* data.serverIds -> Not supported yet; leave zeroed out */
    data.configRevision = 0; /* Not supported yet; leave at 0 (don't care) */
    data.resultConfigRevision = 0; /* Not supported yet; leave at 0 (don't care) */
    /* data.config -> Empty config means: request update on ospl_rtn_ConfigState topic */
    /* data.extensions -> Not needed; leave zeroed out */
    org_opensplice_dds_rtnetworking_ConfigRequest_tDataWriter_write(cReq_DW, &data, DDS_HANDLE_NIL);

    /* The next request will try modification of an existing and currently "connected" NetworkPartition
     * "MyPart". All attributes of existing NetworkPartitions are currently immutable, so this should
     * cause services to respond with an error (RESULT_IMMUTABLE) and possibly a description.
     * If errors can be detected, the entire request will be ignored. First all supplied NetworkPartitions
     * will be checked for consistency. */
    data.requestId.requestId++;
    data.config._length = data.config._maximum = 2;
    data.config._buffer = DDS_sequence_org_opensplice_dds_rtnetworking_NameStringValue_t_allocbuf(data.config._maximum);
    /* The name-field contains an XPATH expression relative to the NetworkService element. */
    data.config._buffer[0].name = DDS_string_dup("Partitioning/NetworkPartitions");
    /* The value-field contains a valid XML-snippet with a single root element. */
    data.config._buffer[0].value = DDS_string_dup("<NetworkPartition Address=\"225.0.0.1\" Connected=\"false\" Name=\"MyPart\" />");
    data.config._buffer[1].name = DDS_string_dup("Partitioning/PartitionMappings");
    data.config._buffer[1].value = DDS_string_dup("<PartitionMapping DCPSPartitionTopic=\"MyPart.*\" NetworkPartition=\"MyPart\" />");
    org_opensplice_dds_rtnetworking_ConfigRequest_tDataWriter_write(cReq_DW, &data, DDS_HANDLE_NIL);

    /* The next request will add a new NetworkPartition "MyPart2" and a related PartitionMapping.
     * This time the root XPATH will be used, containing multiple config-changes in a single XML-
     * snippet. */
    data.requestId.requestId++;
    data.config._length = 1;
    /* The name-field contains an XPATH expression relative to the NetworkService element. By using an
     * empty XPATH, the root is used (directly under the NetworkService-element). */
    data.config._buffer[0].name = DDS_string_dup("");
    data.config._buffer[0].value = DDS_string_dup(
                 "<Partitioning>"
                   "<NetworkPartitions>"
                     "<NetworkPartition Address=\"225.0.0.2\" Connected=\"true\" Name=\"MyPart2\" />"
                   "</NetworkPartitions>"
                   "<PartitionMappings>"
                     "<PartitionMapping DCPSPartitionTopic=\"MyPart2.*\" NetworkPartition=\"MyPart2\" />"
                   "</PartitionMappings>"
                 "</Partitioning>");
    org_opensplice_dds_rtnetworking_ConfigRequest_tDataWriter_write(cReq_DW, &data, DDS_HANDLE_NIL);
}

/* Take all data and print the results */
while((result = DDS_WaitSet_wait(ws, conds, &myTimeout)) == DDS_RETCODE_OK){
    DDS_Condition c;
    unsigned i;
    for(i = 0; i < conds->_length; i++){
        c = conds->_buffer[i];
        if(c == cs_C) {
            /* cs_DR has data available */
            configStateTakeAllAndPrint(cs_DR); /* Implementation is left as an exercise for the reader */
        } else if (c == cRes_C) {
            /* cRes_DR has data available */
            configResponseTakeAllAndPrint(cRes_DR); /* Implementation is left as an exercise for the reader */
        }
    }
}

/* The output of the above would be something like (depending on the implementation of configStateTakeAllAndPrint
 * and configResponseTakeAllAndPrint):
 *
 * ConfigState from server [1091538F.9800000001]
 * {
 *   role='myRole',
 *   configRevision=0,
 *   currentConfig[1] {
 *     currentConfig[0] -> {name='Partitioning/NetworkPartitions', value='<!-- id: 1, hash: B8C3B880 --><NetworkPartition Address="225.0.0.1" Name="MyPart" Connected="true" Compression="false" MulticastTimeToLive="32" />'}
 *   }
 * }
 *
 * ConfigResponse from server [1091538F.9800000001] for request [8BADF00DDEADBEEF.DEFEC8EDBADBEEF0]:1
 * {
 *    result=0 (RESULT_OK),
 *    configRevision=0,
 *    resultConfigRevision=0,
 *    description=''
 * }
 *
 * ConfigResponse from server [1091538F.9800000001] for request [8BADF00DDEADBEEF.DEFEC8EDBADBEEF0]:2
 * {
 *    result=3 (RESULT_IMMUTABLE),
 *    configRevision=0,
 *    resultConfigRevision=0,
 *    description='NetworkPartition 'MyPart' already exists with a different configuration.'
 * }
 *
 * ConfigState from server [1091538F.9800000001]
 * {
 *   role='myRole',
 *   configRevision=0,
 *   currentConfig[3] {
 *     currentConfig[0] -> {name='Partitioning/NetworkPartitions', value='<!-- id: 1, hash: B8C3B880 --><NetworkPartition Address="225.0.0.1" Name="MyPart" Connected="true" Compression="false" MulticastTimeToLive="32" />'}
 *     currentConfig[1] -> {name='Partitioning/NetworkPartitions', value='<!-- id: 2, hash: 853EB638 --><NetworkPartition Address="225.0.0.2" Name="MyPart2" Connected="true" Compression="false" MulticastTimeToLive="32" />'}
 *     currentConfig[2] -> {name='Partitioning/PartitionMappings', value='<!-- id: 2 --><PartitionMapping DCPSPartitionTopic="MyPart2.*" NetworkPartition="MyPart2" />'}
 *   }
 * }
 *
 * ConfigResponse from server [1091538F.9800000001] for request [8BADF00DDEADBEEF.DEFEC8EDBADBEEF0]:3
 * {
 *    result=0 (RESULT_OK),
 *    configRevision=0,
 *    resultConfigRevision=0,
 *    description=''
 * }
 */

Known limitations