Skip to content

Moniker streaming using sideband mechanism

Rohan Doshi edited this page Jan 3, 2025 · 1 revision

This page contains an example of how to use sideband mechanism with moniker based streaming for streaming data to and from grpc-device server.

Sideband mechanism uses raw sockets instead of gRPC streams for streaming data to and from server. In most cases gRPC streaming support is sufficient. Scenarios where large amounts of data (>1-2GB/s) needs to be streamed, or the communication needs to happen with very low latency (<30us), sideband mechanism can help achieve higher throughput compared to using gRPC streams.

Setup

For streaming using sideband mechanism, client applications need to make use of additional library ni_grpc_sideband. This is available from ni/grpc-sideband repo on GitHub. You can either build ni_grpc_sideband library separate and load it dynamically in your client application, or statically link to the library generated from grpc-sideband repo.

Adding Client Code for Sideband Streaming

For this example, we will write a client code for streaming using DAQmx APIs WriteAnalogF64 and ReadAnalogF64, which are part of the sideband streaming functionality.

These APIs utilize sideband streaming to transfer data between the client and the server. To see an example that uses gRPC streaming, refer to the Setup a Client for Streaming.

Below is the client code for initiating a sideband stream using these APIs:

// Setup Daqmx task and request parameters.
auto request_write = BeginWriteAnalogF64Request{};
auto request_read = BeginReadAnalogF64Request{};

StubPtr stub = std::make_unique<NiDAQmx::Stub>(grpc::CreateChannel(target_str, creds)); // Create a gRPC stub to access DAQmx API functions over gRPC.
ni::data_monikers::DataMoniker::Stub moniker_service(grpc::CreateChannel(target_str, creds)); //Create a Data Moniker stub to interact with moniker API for streaming data

std::vector<pb::float64> write_data_float64 = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0};

auto response_write = BeginWriteAnalogF64Response{};
stub->BeginWriteAnalogF64(&context, request_write, &response_write);

auto response_read = BeginReadAnalogF64Response{};
stub->BeginReadAnalogF64(&context, request_read, &response_read);

auto write_moniker_f64 = new ni::data_monikers::Moniker(response_write.moniker());
auto read_moniker_f64 = new ni::data_monikers::Moniker(response_read.moniker());

grpc::ClientContext moniker_context;
ni::data_monikers::BeginMonikerSidebandStreamRequest sideband_request;
ni::data_monikers::BeginMonikerSidebandStreamResponse sideband_response;
sideband_request.set_strategy(ni::data_monikers::SidebandStrategy::SOCKETS);
sideband_request.mutable_monikers()->mutable_read_monikers()->AddAllocated(read_moniker_f64 );
sideband_request.mutable_monikers()->mutable_write_monikers()->AddAllocated(write_moniker_f64 );

auto write_stream = moniker_service.BeginSidebandStream(&moniker_context, sideband_request, &sideband_response);
auto sideband_token = InitClientSidebandData(sideband_response);

for (int i = 0; i < 5; i++) {
  nifpga_grpc::MonikerWriteAnalogF64Request write_values_array_f64;
  write_values_array_f64.mutable_array()->Add(write_data_float64.begin(), write_data_float64.end());

  ni::data_monikers::SidebandWriteRequest write_data_request;
  write_data_request.mutable_values()->add_values()->PackFrom(write_values_array_f64);

  WriteSidebandMessage(sideband_token, write_data_request);

  nifpga_grpc::MonikerReadAnalogF64Response read_values_array_f64;
  ni::data_monikers::SidebandReadResponse read_result;
  ReadSidebandMessage(sideband_token, &read_result);

  read_result.values().values(0).UnpackTo(&read_values_array_f64);
}

ni::data_monikers::SidebandWriteRequest cancel_request;
cancel_request.set_cancel(true);
WriteSidebandMessage(sideband_token, cancel_request);
CloseSidebandData(sideband_token);auto request_read = BeginReadAnalogF64Request{};

Code Explanation

We will now break down the client code into smaller parts, explaining each component:

Prepare Write Request

auto request_write = BeginWWriteAnalogF64Request{};
  • auto request_write = BeginWriteAnalogF64Request{};: This creates an object of BeginWriteAnalogF64Request which is defined in the grpc.pb.h file (generated from the .proto file).

Call Write API

auto response_write = BeginWriteAnalogF64 Response{};
stub->BeginWriteAnalogF64 (&context, request_write, &response);
  • This sends the write request to the server using the BeginWriteAnalogF64 API.

  • The response is stored in the response object. The stub represents the gRPC client connection to the server, which processes the request.

Similarly, we will prepare the read request, call BeginReadAnalogF64 and store the response.

Create Monikers for Write and Read

auto write_moniker_f64 = new ni::data_monikers::Moniker(response_write.moniker());
auto read_moniker_f64 = new ni::data_monikers::Moniker(response_read.moniker());

Here, we create two Moniker objects: one for the write data and one for the read data. These monikers are created from the responses received from the write and read operations.

Prepare Sideband Stream Request

grpc::ClientContext moniker_context;
ni::data_monikers::BeginMonikerSidebandStreamRequest sideband_request;
ni::data_monikers::BeginMonikerSidebandStreamResponse sideband_response;
sideband_request.set_strategy(ni::data_monikers::SidebandStrategy::SOCKETS);
sideband_request.mutable_monikers()->mutable_read_monikers()->AddAllocated(read_moniker_f64);
sideband_request.mutable_monikers()->mutable_write_monikers()->AddAllocated(write_moniker_f64);
  • A BeginMonikerSidebandStreamRequest is created to initiate a sideband stream. This request includes the monikers for both the read and write data, which were created earlier. The strategy is set to SOCKETS, indicating the use of sockets for the sideband stream.

  • The mutable_monikers() method accesses the Monikers field of the request.

  • The mutable_read_monikers() and mutable_write_monikers() methods add the previously created monikers for the read and write streams, respectively. These monikers uniquely identify the data streams to be associated with the sideband connection.

Start the Sideband Stream

auto write_stream = moniker_service.BeginSidebandStream(&moniker_context, sideband_request, &sideband_response);
auto sideband_token = InitClientSidebandData(sideband_response);

The sideband stream is initiated by calling BeginSidebandStream. A sideband token is generated using InitClientSidebandData, which will be used to manage the data during streaming.

Streaming in a Loop

for (int i = 0; i < 5; i++) {
  nifpga_grpc::MonikerWriteArrayI16Request write_values_array_f64;
  write_values_array_f64.mutable_array()->Add(write_data_float64.begin(), 
  write_data_float64.end());

  ni::data_monikers::SidebandWriteRequest write_data_request;
  write_data_request.mutable_values()->add_values()->PackFrom(write_values_array_f64);

  WriteSidebandMessage(sideband_token, write_data_request);

  nifpga_grpc::MonikerReadAnalogF64Response read_values_array_f64;
  ni::data_monikers::SidebandReadResponse read_result;
  ReadSidebandMessage(sideband_token, &read_result);

  read_result.values().values(0).UnpackTo(&read_values_array_f64);
}
  • Prepare the data to be written

    • A MonikerWriteAnalogF64Request object is created to hold the write data for the current iteration.
    • The mutable_array() method provides access to the data array, and the Add() function appends the contents of write_data_float64 (a pre-defined vector) to this array. This prepares the data to be streamed to the server.
  • Pack the write data into a sideband write request

    • A SidebandWriteRequest object is created to encapsulate the write data for the sideband stream.
    • The mutable_values() method provides access to the values field, which is used to store the packed data.
    • PackFrom() serializes the write_values_array_f64 data.
  • Send the packed write data to the sideband stream

    • The WriteSidebandMessage() function sends the write_data_request to the server using the sideband stream identified by sideband_token.
  • Prepare a container for the read response

    • read_values_array_f64: This will store the unpacked read data.
    • read_result: This will store the raw response data from the server's sideband stream.
  • Read the response data from the sideband stream

    • The ReadSidebandMessage() function fetches data from the sideband stream, using the sideband_token to identify the stream.
    • The server sends the data it has read, which is stored in read_result in its raw packed form.
  • Unpack the read response

    • The values() method of read_result accesses the packed data from the sideband stream.
    • The UnpackTo() method deserializes the packed data from read_result and stores it in read_values_array_f64.
    • At this point, the client has successfully read and unpacked the data sent by the server.

Table of Contents

Internal Development

Creating and Setting Up a gRPC Server

Server Security Support

Creating a gRPC Client

gRPC Client Examples

Session Utilities API Reference

Driver Documentation

gRPC API Differences From C API

Sharing Driver Sessions Between Clients

Getting started with moniker based streaming
C API Docs
NI-DAQmx
NI-DCPOWER
NI-DIGITAL PATTERN DRIVER
NI-DMM
NI-FGEN
NI-FPGA
NI-RFmx Bluetooth
NI-RFmx NR
NI-RFmx WCDMA
NI-RFmx GSM
NI-RFmx CDMA2k
NI-RFmx Instr
NI-RFmx LTE
NI-RFmx SpecAn
NI-RFmx TD-SCDMA
NI-RFmx WLAN
NI-RFSA
NI-RFSG
NI-SCOPE
NI-SWITCH
NI-TCLK
NI-XNET
Clone this wiki locally