Documentation Index
Fetch the complete documentation index at: https://docs.siftstack.com/llms.txt
Use this file to discover all available pages before exploring further.
Sift provides official client libraries for:
All libraries use ingestion-config-based streaming over gRPC.
For the full protocol buffers reference, see the gRPC API reference.
Before you begin
- API key: You need a Sift API key to authenticate requests. To create one, go to Sift > your profile icon > Manage > API Keys > + Create API Key. For full steps, see Create an API key.
- gRPC URI: Your gRPC endpoint is shown on the API Keys page in Sift, in the gRPC API URL field.
- REST URI: Your REST endpoint is shown on the same page, in the REST API URL field. Required by the Python client.
In the code examples below, these values are read from environment variables (SIFT_API_KEY, SIFT_URI, BASE_URI). Set them before running any example.
Python
Install
pip install sift-stack-py
Resources
Imports
import asyncio
import random
from datetime import datetime, timezone
from sift_client import SiftClient, SiftConnectionConfig
from sift_client.sift_types import RunCreate
from sift_stream_bindings import (
ChannelConfigPy,
ChannelDataTypePy,
ChannelValuePy,
FlowConfigPy,
FlowPy,
IngestionConfigFormPy,
TimeValuePy,
ValuePy,
)
Data source
async def data_source():
start = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start < 60:
await asyncio.sleep(0.5)
yield datetime.now(timezone.utc), random.uniform(0, 10)
Ingestion config
ASSET_NAME = "NostromoLV426"
FLOW_NAME = "velocity-reading"
CONFIG_KEY = "nostromo-lv-426-config-v1"
flow_config = FlowConfigPy(
name=FLOW_NAME,
channels=[
ChannelConfigPy(
name="mainmotor.velocity",
unit="m/s",
data_type=ChannelDataTypePy.Double,
description="Main motor velocity",
enum_types=[],
bit_field_elements=[],
),
],
)
ingestion_config = IngestionConfigFormPy(
asset_name=ASSET_NAME,
client_key=CONFIG_KEY,
flows=[flow_config],
)
Connect, create a Run, and stream data
async def main():
connection_config = SiftConnectionConfig(
api_key="your-api-key",
grpc_url="your-grpc-url",
rest_url="your-rest-url",
)
client = SiftClient(connection_config=connection_config)
run = RunCreate(name=f"[{ASSET_NAME}].run", client_key=f"{CONFIG_KEY}.run")
async with await client.async_.ingestion.create_ingestion_config_streaming_client(
ingestion_config=ingestion_config,
run=run,
) as ingest_client:
async for timestamp, velocity in data_source():
await ingest_client.send(
FlowPy(
flow_name=FLOW_NAME,
timestamp=TimeValuePy.from_timestamp_millis(int(timestamp.timestamp() * 1000)),
values=[
ChannelValuePy(name="mainmotor.velocity", value=ValuePy.Double(velocity)),
],
)
)
if __name__ == "__main__":
asyncio.run(main())
Backwards compatibilityFor a given client_key, adding new flows and channels is a safe and backwards compatible operation. Existing flows and channels should not be modified; this type of change is not backwards compatible and will lead to unexpected behavior.
Install
go get github.com/sift-stack/sift/go
Resources
Imports
package main
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"time"
"github.com/sift-stack/sift/go/gen/sift/common/type/v1"
ingestv1 "github.com/sift-stack/sift/go/gen/sift/ingest/v1"
"github.com/sift-stack/sift/go/gen/sift/ingestion_configs/v1"
"github.com/sift-stack/sift/go/gen/sift/runs/v2"
"github.com/sift-stack/sift/go/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
Data source
func dataSource() <-chan dataPoint {
dataChannel := make(chan dataPoint)
go func() {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
duration := 60 * time.Second
start := time.Now()
for time.Since(start) < duration {
dataChannel <- dataPoint{
Timestamp: time.Now(),
Value: rng.Float64(),
}
time.Sleep(500 * time.Millisecond)
}
}()
return dataChannel
}
Flow and channel config
func config() []*ingestion_configsv1.FlowConfig {
return []*ingestion_configsv1.FlowConfig{
{
Name: "velocity_reading",
Channels: []*ingestion_configsv1.ChannelConfig{
{
Name: "mainmotor.velocity",
Unit: "km/hr",
Description: "vehicle speed",
DataType: typev1.ChannelDataType_CHANNEL_DATA_TYPE_DOUBLE,
},
},
},
}
}
Get or create ingestion config
func getOrCreateIngestionConfig(
ctx context.Context,
grpcChannel grpc.SiftChannel,
assetName,
clientKey string,
) (*ingestion_configsv1.IngestionConfig, error) {
svc := ingestion_configsv1.NewIngestionConfigServiceClient(grpcChannel)
listRes, err := svc.ListIngestionConfigs(ctx, &ingestion_configsv1.ListIngestionConfigsRequest{
Filter: fmt.Sprintf("client_key == '%s'", clientKey),
})
if err != nil {
return nil, err
}
if listRes != nil && len(listRes.IngestionConfigs) > 0 {
return listRes.IngestionConfigs[0], nil
}
createRes, err := svc.CreateIngestionConfig(ctx, &ingestion_configsv1.CreateIngestionConfigRequest{
AssetName: assetName,
ClientKey: clientKey,
Flows: config(),
})
if err != nil {
return nil, err
}
return createRes.IngestionConfig, nil
}
Create a Run
func createRun(
ctx context.Context,
grpcChannel grpc.SiftChannel,
runName string,
) (*runsv2.Run, error) {
svc := runsv2.NewRunServiceClient(grpcChannel)
ts := timestamppb.Now()
createRes, err := svc.CreateRun(ctx, &runsv2.CreateRunRequest{
Name: fmt.Sprintf("[%s].%d", runName, ts.Seconds),
StartTime: ts,
})
if err != nil {
return nil, err
}
return createRes.Run, nil
}
Main function
const (
assetName = "NostromoLV426"
clientKey = "nostromo-lv-426-config-v1"
)
func main() {
ctx := context.Background()
grpcChannel, err := grpc.UseSiftChannel(ctx, grpc.SiftChannelConfig{
Uri: os.Getenv("SIFT_URI"),
Apikey: os.Getenv("SIFT_API_KEY"),
})
if err != nil {
log.Fatalln(err)
}
ingestionConfig, err := getOrCreateIngestionConfig(ctx, grpcChannel, assetName, clientKey)
if err != nil {
log.Fatalln(err)
}
log.Printf("initialized ingestion config %s\n", ingestionConfig.ClientKey)
run, err := createRun(ctx, grpcChannel, assetName)
if err != nil {
log.Fatalln(err)
}
log.Printf("initialized run %s\n", run.Name)
siftStream, err := ingestv1.NewIngestServiceClient(grpcChannel).IngestWithConfigDataStream(ctx)
if err != nil {
log.Fatalln(err)
}
dataStream := dataSource()
for data := range dataStream {
req := &ingestv1.IngestWithConfigDataStreamRequest{
IngestionConfigId: ingestionConfig.IngestionConfigId,
RunId: run.RunId,
Flow: "velocity_reading",
Timestamp: timestamppb.New(data.Timestamp),
ChannelValues: []*ingestv1.IngestWithConfigDataChannelValue{
{Type: &ingestv1.IngestWithConfigDataChannelValue_Double{Double: data.Value}},
},
EndStreamOnValidationError: false,
}
if err := siftStream.Send(req); err != nil {
log.Fatalln(err)
}
log.Println("ingested a velocity_reading flow")
}
if _, err := siftStream.CloseAndRecv(); err != nil {
log.Fatalln(err)
}
log.Println("done.")
}
Rust
Install
cargo add sift_rs rand pbjson_types chrono tokio_stream
cargo add tokio --features full
TokioTokio is required to work with sift_rs; it is the runtime that tonic relies on. The Tokio runtime flavor to use is dependent on your requirements.
Resources
Imports
use chrono::{DateTime, Utc};
use pbjson_types::Timestamp;
use rand::Rng;
use sift_rs::{
gen::sift::{
common::r#type::v1::ChannelDataType,
ingest::v1::{
ingest_service_client::IngestServiceClient,
ingest_with_config_data_channel_value::Type, IngestWithConfigDataChannelValue,
IngestWithConfigDataStreamRequest,
},
ingestion_configs::v1::{
ingestion_config_service_client::IngestionConfigServiceClient, ChannelConfig,
CreateIngestionConfigRequest, FlowConfig, IngestionConfig, ListIngestionConfigsRequest,
},
runs::v2::{run_service_client::RunServiceClient, CreateRunRequest, Run},
},
grpc::{use_sift_channel, SiftChannel, SiftChannelConfig},
};
use std::{
env,
error::Error,
sync::mpsc::{channel, Receiver},
thread,
time::{Duration, Instant},
};
Data source
pub fn data_source() -> Receiver<(DateTime<Utc>, f64)> {
let (tx, rx) = channel();
thread::spawn(move || {
let duration = Duration::from_secs(60);
let start = Instant::now();
let mut rng = rand::thread_rng();
while Instant::now().duration_since(start) < duration {
tx.send((Utc::now(), rng.gen_range(0.0..100.0))).unwrap();
thread::sleep(Duration::from_millis(500));
}
});
rx
}
Channel and flow config
pub fn channel_configs() -> Vec<FlowConfig> {
vec![FlowConfig {
name: String::from("velocity_reading"),
channels: vec![ChannelConfig {
name: String::from("mainmotor.velocity"),
unit: String::from("km/hr"),
description: String::from("vehicle speed"),
data_type: ChannelDataType::Double.into(),
..Default::default()
}],
}]
}
Get or create ingestion config
async fn get_or_create_ingestion_config(
grpc_channel: SiftChannel,
asset_name: &str,
client_key: &str,
) -> Result<IngestionConfig, Box<dyn Error>> {
let mut svc = IngestionConfigServiceClient::new(grpc_channel);
let list_res = svc
.list_ingestion_configs(ListIngestionConfigsRequest {
filter: format!("client_key == '{client_key}'"),
..Default::default()
})
.await?;
if let Some(ingestion_config) = list_res.into_inner().ingestion_configs.first().cloned() {
return Ok(ingestion_config);
}
let req = CreateIngestionConfigRequest {
flows,
asset_name: String::from(asset_name),
client_key: String::from(client_key),
..Default::default()
};
let create_res = svc.create_ingestion_config(req).await?;
let ingestion_conf = create_res
.into_inner()
.ingestion_config
.ok_or("expected ingestion config")?;
Ok(ingestion_conf)
}
Create a Run
async fn create_run(grpc_channel: SiftChannel, run_name: &str) -> Result<Run, Box<dyn Error>> {
let mut svc = RunServiceClient::new(grpc_channel);
let ts = Utc::now();
let create_req = CreateRunRequest {
name: format!("[{}].{}", run_name.to_string(), ts.timestamp()),
start_time: Some(Timestamp::from(ts)),
..Default::default()
};
let create_res = svc.create_run(create_req).await?;
let run = create_res.into_inner().run.expect("expected run");
Ok(run)
}
Main function
pub const ASSET_NAME: &str = "NostromoLV426";
pub const CLIENT_KEY: &str = "nostromo-lv-426-config-v1";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let grpc_channel = use_sift_channel(SiftChannelConfig {
uri: env::var("SIFT_URI")?,
apikey: env::var("SIFT_API_KEY")?,
})?;
let ingestion_config =
get_or_create_ingestion_config(grpc_channel.clone(), ASSET_NAME, CLIENT_KEY).await?;
println!(
"initialized ingestion config {}",
ingestion_config.client_key
);
let run = create_run(grpc_channel.clone(), ASSET_NAME).await?;
println!("initialized run {}", &run.name);
let mut ingestion_service = IngestServiceClient::new(grpc_channel);
let data_stream = data_source();
while let Ok((timestamp, velocity)) = data_stream.recv() {
let req = IngestWithConfigDataStreamRequest {
run_id: run.run_id.clone(),
ingestion_config_id: String::from(&ingestion_config.ingestion_config_id),
flow: String::from("velocity_reading"),
timestamp: Some(Timestamp::from(timestamp)),
channel_values: vec![IngestWithConfigDataChannelValue {
r#type: Some(Type::Double(velocity)),
}],
end_stream_on_validation_error: false,
..Default::default()
};
ingestion_service
.ingest_with_config_data_stream(tokio_stream::once(req))
.await?;
println!("ingested a velocity_reading flow");
}
println!("done.");
Ok(())
}