Rust

Streaming sensor data into Sift using Rust

Credentials

Before starting this section be sure to retrieve your API key and the appropriate Sift URL for your provisioned environment. Instructions on how to retrieve the API key and URL and can be found in the authentication section of the documentation.

This exercise demonstrates how to stream data into Sift using the Rust client library. The objective is to read in data from a mock data source representing sensor data from our asset, NostromoLV426, and then stream that data to Sift.

NostromoLV426 will contain a single velocity channel whose values are of type double; this channel will belong to the mainmotor component.

For full code examples, documentation, as well as the library's source code, refer to the following:

Installing Dependencies

For this exercise we will need sift_rs, rand, pbjson_types, chrono, and tokio_stream. Inside of your new Rust project:

cargo add sift_rs rand pbjson_types chrono tokio_stream

We will also add Tokio to use as our asynchronous runtime:

cargo add tokio --features full

Tokio

Note that we don't actually require the full feature set of Tokio. The Tokio runtime flavor to use is dependent on your requirements, however, Tokio is required to work with sift_rs that is the runtime that tonic relies on.

Imports

With our dependencies installed, we're going to begin by importing everything we will use for this example in advance. Inside of main.rs:

use chrono::{DateTime, Utc};
use pbjson_types::Timestamp;
use rand::Rng;
use sift_rs::{
    gen::sift::{
        common::r#type::v1::ChannelDataType,
        ingest::v1::{
            ingest_service_client::IngestServiceClient,
            ingest_with_config_data_channel_value::Type, IngestWithConfigDataChannelValue,
            IngestWithConfigDataStreamRequest,
        },
        ingestion_configs::v1::{
            ingestion_config_service_client::IngestionConfigServiceClient, ChannelConfig,
            CreateIngestionConfigRequest, FlowConfig, IngestionConfig, ListIngestionConfigsRequest,
        },
        runs::v2::{run_service_client::RunServiceClient, CreateRunRequest, Run},
    },
    grpc::{use_sift_channel, SiftChannel, SiftChannelConfig},
};
use std::{
    env,
    error::Error,
    sync::mpsc::{channel, Receiver},
    thread,
    time::{Duration, Instant},
};

Data Source

Next we're going to define a data source which will provide us with mock sensor data. The data source will simply send a timestamp and a float over the course of a minute and sleep for 0.5s between each send. This will represent the time-series coming from the velocity channel:

/// Simulate sensor data
pub fn data_source() -> Receiver<(DateTime<Utc>, f64)> {
    let (tx, rx) = channel();
 
    thread::spawn(move || {
        let duration = Duration::from_secs(60);
        let start = Instant::now();
        let mut rng = rand::thread_rng();
 
        while Instant::now().duration_since(start) < duration {
            tx.send((Utc::now(), rng.gen_range(0.0..100.0))).unwrap();
            thread::sleep(Duration::from_millis(500));
        }
    });
    rx
}

Config

Now we will define our channels in a function called channel_configs. Right now we only have one velocity channel but we are free to add more later.

/// Channel and flow configuration used to create an ingestion config.
pub fn channel_configs() -> Vec<FlowConfig> {
    vec![FlowConfig {
        name: String::from("velocity_reading"),
        channels: vec![ChannelConfig {
            name: String::from("velocity"),
            component: String::from("mainmotor"),
            unit: String::from("km/hr"),
            description: String::from("vehicle speed"),
            data_type: ChannelDataType::Double.into(),
            ..Default::default()
        }],
    }]
}

Next, we will define a function called get_or_create_ingestion config which retrieves an existing ingestion config by client key or creates it if it does not exist. This function will use the channel_configs function that we created in the previous step:

/// Retrieves an existing ingestion config or create it.
async fn get_or_create_ingestion_config(
    grpc_channel: SiftChannel,
    asset_name: &str,
    client_key: &str,
) -> Result<IngestionConfig, Box<dyn Error>> {
    let mut svc = IngestionConfigServiceClient::new(grpc_channel);
 
    let list_res = svc
        .list_ingestion_configs(ListIngestionConfigsRequest {
            filter: format!("client_key == '{client_key}'"),
            ..Default::default()
        })
        .await?;
 
    if let Some(ingestion_config) = list_res.into_inner().ingestion_configs.first().cloned() {
        return Ok(ingestion_config);
    }
 
    let req = CreateIngestionConfigRequest {
        flows,
        asset_name: String::from(asset_name),
        client_key: String::from(client_key),
        ..Default::default()
    };
 
    let create_res = svc.create_ingestion_config(req).await?;
    let ingestion_conf = create_res
        .into_inner()
        .ingestion_config
        .ok_or("expected ingestion config")?;
 
    Ok(ingestion_conf)
}

Client Keys

As mentioned in the ingestion config section, specifying a client key is highly recommended as it simplifies lookups for users after creation as shown above.

Creating a Run

Now we will make a create_run function so we can group together our asset data.

/// Create a run to use to group all the data ingested during this period.
async fn create_run(grpc_channel: SiftChannel, run_name: &str) -> Result<Run, Box<dyn Error>> {
    let mut svc = RunServiceClient::new(grpc_channel);
    let ts = Utc::now();
 
    let create_req = CreateRunRequest {
        name: format!("[{}].{}", run_name.to_string(), ts.timestamp()),
        start_time: Some(Timestamp::from(ts)),
        ..Default::default()
    };
    let create_res = svc.create_run(create_req).await?;
    let run = create_res.into_inner().run.expect("expected run");
    Ok(run)
}

Runs are Optional

Creating a run is not required to send data to Sift. It's a mechanism to group together data for a single asset or multiple assets. Read more.

Sending Data

Now all that's left to do is to assemble our main function:

/// Name of the asset that we want to ingest data for.
pub const ASSET_NAME: &str = "NostromoLV426";
 
/// Unique client-chosen identifier used to identify an ingestion config.
pub const CLIENT_KEY: &str = "nostromo-lv-426-config-v1";
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Connect to Sift
    let grpc_channel = use_sift_channel(SiftChannelConfig {
        uri: env::var("SIFT_URI")?,
        apikey: env::var("SIFT_API_KEY")?,
    })?;
 
    // Create your ingestion config which defines the schema of your telemetry.
    let ingestion_config =
        get_or_create_ingestion_config(grpc_channel.clone(), ASSET_NAME, CLIENT_KEY).await?;
    println!(
        "initialized ingestion config {}",
        ingestion_config.client_key
    );
 
    // Create a run to group all the data ingested during this period.
    let run = create_run(grpc_channel.clone(), ASSET_NAME).await?;
    println!("initialized run {}", &run.name);
 
    let mut ingestion_service = IngestServiceClient::new(grpc_channel);
    let data_stream = data_source();
 
    // Stream data to Sift from a data source.
    while let Ok((timestamp, velocity)) = data_stream.recv() {
        let req = IngestWithConfigDataStreamRequest {
            run_id: run.run_id.clone(),
            ingestion_config_id: String::from(&ingestion_config.ingestion_config_id),
            flow: String::from("velocity_reading"),
            timestamp: Some(Timestamp::from(timestamp)),
            channel_values: vec![IngestWithConfigDataChannelValue {
                r#type: Some(Type::Double(velocity)),
            }],
            // Set this flag to 'true' only for debugging purposes to get real-time data validation from
            // the Sift API. Do not use in production as it will hurt performance.
            end_stream_on_validation_error: false,
            ..Default::default()
        };
        ingestion_service
            .ingest_with_config_data_stream(tokio_stream::once(req))
            .await?;
        println!("ingested a velocity_reading flow");
    }
 
    println!("done.");
    Ok(())
}

And that's it! In Sift, you should see the asset, NostromoLV426, the run we just created, as well as data for the velocity channel.

On this page