Skip to main content
Sift provides official client libraries for: All libraries use ingestion-config-based streaming over gRPC. For the full protocol buffers reference, see the gRPC API reference.

Before you begin

  • API key: You need a Sift API key to authenticate requests. To create one, go to Sift > your profile icon > Manage > API Keys > + Create API Key. For full steps, see Create an API key.
  • gRPC URI: Your gRPC endpoint is shown on the API Keys page in Sift, in the gRPC API URL field.
  • REST URI: Your REST endpoint is shown on the same page, in the REST API URL field. Required by the Python client.
In the code examples below, these values are read from environment variables (SIFT_API_KEY, SIFT_URI, BASE_URI). Set them before running any example.

Python

Install

pip install sift-stack-py

Resources

Imports

import asyncio
import random
from datetime import datetime, timezone

from sift_client import SiftClient, SiftConnectionConfig
from sift_client.sift_types import RunCreate
from sift_stream_bindings import (
    ChannelConfigPy,
    ChannelDataTypePy,
    ChannelValuePy,
    FlowConfigPy,
    FlowPy,
    IngestionConfigFormPy,
    TimeValuePy,
    ValuePy,
)

Data source

async def data_source():
    start = asyncio.get_event_loop().time()
    while asyncio.get_event_loop().time() - start < 60:
        await asyncio.sleep(0.5)
        yield datetime.now(timezone.utc), random.uniform(0, 10)

Ingestion config

ASSET_NAME = "NostromoLV426"
FLOW_NAME = "velocity-reading"
CONFIG_KEY = "nostromo-lv-426-config-v1"

flow_config = FlowConfigPy(
    name=FLOW_NAME,
    channels=[
        ChannelConfigPy(
            name="mainmotor.velocity",
            unit="m/s",
            data_type=ChannelDataTypePy.Double,
            description="Main motor velocity",
            enum_types=[],
            bit_field_elements=[],
        ),
    ],
)

ingestion_config = IngestionConfigFormPy(
    asset_name=ASSET_NAME,
    client_key=CONFIG_KEY,
    flows=[flow_config],
)

Connect, create a Run, and stream data

async def main():
    connection_config = SiftConnectionConfig(
        api_key="your-api-key",
        grpc_url="your-grpc-url",
        rest_url="your-rest-url",
    )
    client = SiftClient(connection_config=connection_config)

    run = RunCreate(name=f"[{ASSET_NAME}].run", client_key=f"{CONFIG_KEY}.run")

    async with await client.async_.ingestion.create_ingestion_config_streaming_client(
        ingestion_config=ingestion_config,
        run=run,
    ) as ingest_client:
        async for timestamp, velocity in data_source():
            await ingest_client.send(
                FlowPy(
                    flow_name=FLOW_NAME,
                    timestamp=TimeValuePy.from_timestamp_millis(int(timestamp.timestamp() * 1000)),
                    values=[
                        ChannelValuePy(name="mainmotor.velocity", value=ValuePy.Double(velocity)),
                    ],
                )
            )

if __name__ == "__main__":
    asyncio.run(main())
Backwards compatibilityFor a given client_key, adding new flows and channels is a safe and backwards compatible operation. Existing flows and channels should not be modified; this type of change is not backwards compatible and will lead to unexpected behavior.

Go

Install

go get github.com/sift-stack/sift/go

Resources

Imports

package main

import (
        "context"
        "fmt"
        "log"
        "math/rand"
        "os"
        "time"

        "github.com/sift-stack/sift/go/gen/sift/common/type/v1"
        ingestv1 "github.com/sift-stack/sift/go/gen/sift/ingest/v1"
        "github.com/sift-stack/sift/go/gen/sift/ingestion_configs/v1"
        "github.com/sift-stack/sift/go/gen/sift/runs/v2"
        "github.com/sift-stack/sift/go/grpc"
        "google.golang.org/protobuf/types/known/timestamppb"
)

Data source

func dataSource() <-chan dataPoint {
    dataChannel := make(chan dataPoint)
    go func() {
        rng := rand.New(rand.NewSource(time.Now().UnixNano()))
        duration := 60 * time.Second
        start := time.Now()

        for time.Since(start) < duration {
            dataChannel <- dataPoint{
                Timestamp: time.Now(),
                Value:     rng.Float64(),
            }
            time.Sleep(500 * time.Millisecond)
        }
    }()
    return dataChannel
}

Flow and channel config

func config() []*ingestion_configsv1.FlowConfig {
    return []*ingestion_configsv1.FlowConfig{
        {
            Name: "velocity_reading",
            Channels: []*ingestion_configsv1.ChannelConfig{
                {
                    Name:        "mainmotor.velocity",
                    Unit:        "km/hr",
                    Description: "vehicle speed",
                    DataType:    typev1.ChannelDataType_CHANNEL_DATA_TYPE_DOUBLE,
                },
            },
        },
    }
}

Get or create ingestion config

func getOrCreateIngestionConfig(
    ctx context.Context,
    grpcChannel grpc.SiftChannel,
    assetName,
    clientKey string,
) (*ingestion_configsv1.IngestionConfig, error) {
    svc := ingestion_configsv1.NewIngestionConfigServiceClient(grpcChannel)

    listRes, err := svc.ListIngestionConfigs(ctx, &ingestion_configsv1.ListIngestionConfigsRequest{
        Filter: fmt.Sprintf("client_key == '%s'", clientKey),
    })
    if err != nil {
        return nil, err
    }
    if listRes != nil && len(listRes.IngestionConfigs) > 0 {
        return listRes.IngestionConfigs[0], nil
    }

    createRes, err := svc.CreateIngestionConfig(ctx, &ingestion_configsv1.CreateIngestionConfigRequest{
        AssetName: assetName,
        ClientKey: clientKey,
        Flows:     config(),
    })
    if err != nil {
        return nil, err
    }
    return createRes.IngestionConfig, nil
}

Create a Run

func createRun(
    ctx context.Context,
    grpcChannel grpc.SiftChannel,
    runName string,
) (*runsv2.Run, error) {
    svc := runsv2.NewRunServiceClient(grpcChannel)
    ts := timestamppb.Now()

    createRes, err := svc.CreateRun(ctx, &runsv2.CreateRunRequest{
        Name:      fmt.Sprintf("[%s].%d", runName, ts.Seconds),
        StartTime: ts,
    })
    if err != nil {
        return nil, err
    }
    return createRes.Run, nil
}

Main function

const (
    assetName = "NostromoLV426"
    clientKey = "nostromo-lv-426-config-v1"
)

func main() {
    ctx := context.Background()

    grpcChannel, err := grpc.UseSiftChannel(ctx, grpc.SiftChannelConfig{
        Uri:    os.Getenv("SIFT_URI"),
        Apikey: os.Getenv("SIFT_API_KEY"),
    })
    if err != nil {
        log.Fatalln(err)
    }

    ingestionConfig, err := getOrCreateIngestionConfig(ctx, grpcChannel, assetName, clientKey)
    if err != nil {
        log.Fatalln(err)
    }
    log.Printf("initialized ingestion config %s\n", ingestionConfig.ClientKey)

    run, err := createRun(ctx, grpcChannel, assetName)
    if err != nil {
        log.Fatalln(err)
    }
    log.Printf("initialized run %s\n", run.Name)

    siftStream, err := ingestv1.NewIngestServiceClient(grpcChannel).IngestWithConfigDataStream(ctx)
    if err != nil {
        log.Fatalln(err)
    }

    dataStream := dataSource()

    for data := range dataStream {
        req := &ingestv1.IngestWithConfigDataStreamRequest{
            IngestionConfigId: ingestionConfig.IngestionConfigId,
            RunId:             run.RunId,
            Flow:              "velocity_reading",
            Timestamp:         timestamppb.New(data.Timestamp),
            ChannelValues: []*ingestv1.IngestWithConfigDataChannelValue{
                {Type: &ingestv1.IngestWithConfigDataChannelValue_Double{Double: data.Value}},
            },
            EndStreamOnValidationError: false,
        }
        if err := siftStream.Send(req); err != nil {
            log.Fatalln(err)
        }
        log.Println("ingested a velocity_reading flow")
    }

    if _, err := siftStream.CloseAndRecv(); err != nil {
        log.Fatalln(err)
    }

    log.Println("done.")
}

Rust

Install

cargo add sift_rs rand pbjson_types chrono tokio_stream
cargo add tokio --features full
TokioTokio is required to work with sift_rs; it is the runtime that tonic relies on. The Tokio runtime flavor to use is dependent on your requirements.

Resources

Imports

use chrono::{DateTime, Utc};
use pbjson_types::Timestamp;
use rand::Rng;
use sift_rs::{
    gen::sift::{
        common::r#type::v1::ChannelDataType,
        ingest::v1::{
            ingest_service_client::IngestServiceClient,
            ingest_with_config_data_channel_value::Type, IngestWithConfigDataChannelValue,
            IngestWithConfigDataStreamRequest,
        },
        ingestion_configs::v1::{
            ingestion_config_service_client::IngestionConfigServiceClient, ChannelConfig,
            CreateIngestionConfigRequest, FlowConfig, IngestionConfig, ListIngestionConfigsRequest,
        },
        runs::v2::{run_service_client::RunServiceClient, CreateRunRequest, Run},
    },
    grpc::{use_sift_channel, SiftChannel, SiftChannelConfig},
};
use std::{
    env,
    error::Error,
    sync::mpsc::{channel, Receiver},
    thread,
    time::{Duration, Instant},
};

Data source

pub fn data_source() -> Receiver<(DateTime<Utc>, f64)> {
    let (tx, rx) = channel();

    thread::spawn(move || {
        let duration = Duration::from_secs(60);
        let start = Instant::now();
        let mut rng = rand::thread_rng();

        while Instant::now().duration_since(start) < duration {
            tx.send((Utc::now(), rng.gen_range(0.0..100.0))).unwrap();
            thread::sleep(Duration::from_millis(500));
        }
    });
    rx
}

Channel and flow config

pub fn channel_configs() -> Vec<FlowConfig> {
    vec![FlowConfig {
        name: String::from("velocity_reading"),
        channels: vec![ChannelConfig {
            name: String::from("mainmotor.velocity"),
            unit: String::from("km/hr"),
            description: String::from("vehicle speed"),
            data_type: ChannelDataType::Double.into(),
            ..Default::default()
        }],
    }]
}

Get or create ingestion config

async fn get_or_create_ingestion_config(
    grpc_channel: SiftChannel,
    asset_name: &str,
    client_key: &str,
) -> Result<IngestionConfig, Box<dyn Error>> {
    let mut svc = IngestionConfigServiceClient::new(grpc_channel);

    let list_res = svc
        .list_ingestion_configs(ListIngestionConfigsRequest {
            filter: format!("client_key == '{client_key}'"),
            ..Default::default()
        })
        .await?;

    if let Some(ingestion_config) = list_res.into_inner().ingestion_configs.first().cloned() {
        return Ok(ingestion_config);
    }

    let req = CreateIngestionConfigRequest {
        flows,
        asset_name: String::from(asset_name),
        client_key: String::from(client_key),
        ..Default::default()
    };

    let create_res = svc.create_ingestion_config(req).await?;
    let ingestion_conf = create_res
        .into_inner()
        .ingestion_config
        .ok_or("expected ingestion config")?;

    Ok(ingestion_conf)
}

Create a Run

async fn create_run(grpc_channel: SiftChannel, run_name: &str) -> Result<Run, Box<dyn Error>> {
    let mut svc = RunServiceClient::new(grpc_channel);
    let ts = Utc::now();

    let create_req = CreateRunRequest {
        name: format!("[{}].{}", run_name.to_string(), ts.timestamp()),
        start_time: Some(Timestamp::from(ts)),
        ..Default::default()
    };
    let create_res = svc.create_run(create_req).await?;
    let run = create_res.into_inner().run.expect("expected run");
    Ok(run)
}
Run metadataCreateRunRequest supports additional metadata fields such as description, tags, client_key, and stop_time. For the full list of available fields, see the CreateRunRequest documentation in sift_rs. These fields are also documented in the Run API reference.

Main function

pub const ASSET_NAME: &str = "NostromoLV426";
pub const CLIENT_KEY: &str = "nostromo-lv-426-config-v1";

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let grpc_channel = use_sift_channel(SiftChannelConfig {
        uri: env::var("SIFT_URI")?,
        apikey: env::var("SIFT_API_KEY")?,
    })?;

    let ingestion_config =
        get_or_create_ingestion_config(grpc_channel.clone(), ASSET_NAME, CLIENT_KEY).await?;
    println!(
        "initialized ingestion config {}",
        ingestion_config.client_key
    );

    let run = create_run(grpc_channel.clone(), ASSET_NAME).await?;
    println!("initialized run {}", &run.name);

    let mut ingestion_service = IngestServiceClient::new(grpc_channel);
    let data_stream = data_source();

    while let Ok((timestamp, velocity)) = data_stream.recv() {
        let req = IngestWithConfigDataStreamRequest {
            run_id: run.run_id.clone(),
            ingestion_config_id: String::from(&ingestion_config.ingestion_config_id),
            flow: String::from("velocity_reading"),
            timestamp: Some(Timestamp::from(timestamp)),
            channel_values: vec![IngestWithConfigDataChannelValue {
                r#type: Some(Type::Double(velocity)),
            }],
            end_stream_on_validation_error: false,
            ..Default::default()
        };
        ingestion_service
            .ingest_with_config_data_stream(tokio_stream::once(req))
            .await?;
        println!("ingested a velocity_reading flow");
    }

    println!("done.");
    Ok(())
}