Before you begin
- API key: You need a Sift API key to authenticate requests. To create one, go to Sift > your profile icon > Manage > API Keys > + Create API Key. For full steps, see Create an API key.
- gRPC URI: Your gRPC endpoint is shown on the API Keys page in Sift, in the gRPC API URL field.
- REST URI: Your REST endpoint is shown on the same page, in the REST API URL field. Required by the Python client.
SIFT_API_KEY, SIFT_URI, BASE_URI). Set them before running any example.
Python
Install
pip install sift-stack-py
Resources
Imports
import asyncio
import random
from datetime import datetime, timezone
from sift_client import SiftClient, SiftConnectionConfig
from sift_client.sift_types import RunCreate
from sift_stream_bindings import (
ChannelConfigPy,
ChannelDataTypePy,
ChannelValuePy,
FlowConfigPy,
FlowPy,
IngestionConfigFormPy,
TimeValuePy,
ValuePy,
)
Data source
async def data_source():
start = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start < 60:
await asyncio.sleep(0.5)
yield datetime.now(timezone.utc), random.uniform(0, 10)
Ingestion config
ASSET_NAME = "NostromoLV426"
FLOW_NAME = "velocity-reading"
CONFIG_KEY = "nostromo-lv-426-config-v1"
flow_config = FlowConfigPy(
name=FLOW_NAME,
channels=[
ChannelConfigPy(
name="mainmotor.velocity",
unit="m/s",
data_type=ChannelDataTypePy.Double,
description="Main motor velocity",
enum_types=[],
bit_field_elements=[],
),
],
)
ingestion_config = IngestionConfigFormPy(
asset_name=ASSET_NAME,
client_key=CONFIG_KEY,
flows=[flow_config],
)
Connect, create a Run, and stream data
async def main():
connection_config = SiftConnectionConfig(
api_key="your-api-key",
grpc_url="your-grpc-url",
rest_url="your-rest-url",
)
client = SiftClient(connection_config=connection_config)
run = RunCreate(name=f"[{ASSET_NAME}].run", client_key=f"{CONFIG_KEY}.run")
async with await client.async_.ingestion.create_ingestion_config_streaming_client(
ingestion_config=ingestion_config,
run=run,
) as ingest_client:
async for timestamp, velocity in data_source():
await ingest_client.send(
FlowPy(
flow_name=FLOW_NAME,
timestamp=TimeValuePy.from_timestamp_millis(int(timestamp.timestamp() * 1000)),
values=[
ChannelValuePy(name="mainmotor.velocity", value=ValuePy.Double(velocity)),
],
)
)
if __name__ == "__main__":
asyncio.run(main())
Backwards compatibilityFor a given
client_key, adding new flows and channels is a safe and backwards compatible operation. Existing flows and channels should not be modified; this type of change is not backwards compatible and will lead to unexpected behavior.Go
Install
go get github.com/sift-stack/sift/go
Resources
Imports
package main
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"time"
"github.com/sift-stack/sift/go/gen/sift/common/type/v1"
ingestv1 "github.com/sift-stack/sift/go/gen/sift/ingest/v1"
"github.com/sift-stack/sift/go/gen/sift/ingestion_configs/v1"
"github.com/sift-stack/sift/go/gen/sift/runs/v2"
"github.com/sift-stack/sift/go/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
Data source
func dataSource() <-chan dataPoint {
dataChannel := make(chan dataPoint)
go func() {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
duration := 60 * time.Second
start := time.Now()
for time.Since(start) < duration {
dataChannel <- dataPoint{
Timestamp: time.Now(),
Value: rng.Float64(),
}
time.Sleep(500 * time.Millisecond)
}
}()
return dataChannel
}
Flow and channel config
func config() []*ingestion_configsv1.FlowConfig {
return []*ingestion_configsv1.FlowConfig{
{
Name: "velocity_reading",
Channels: []*ingestion_configsv1.ChannelConfig{
{
Name: "mainmotor.velocity",
Unit: "km/hr",
Description: "vehicle speed",
DataType: typev1.ChannelDataType_CHANNEL_DATA_TYPE_DOUBLE,
},
},
},
}
}
Get or create ingestion config
func getOrCreateIngestionConfig(
ctx context.Context,
grpcChannel grpc.SiftChannel,
assetName,
clientKey string,
) (*ingestion_configsv1.IngestionConfig, error) {
svc := ingestion_configsv1.NewIngestionConfigServiceClient(grpcChannel)
listRes, err := svc.ListIngestionConfigs(ctx, &ingestion_configsv1.ListIngestionConfigsRequest{
Filter: fmt.Sprintf("client_key == '%s'", clientKey),
})
if err != nil {
return nil, err
}
if listRes != nil && len(listRes.IngestionConfigs) > 0 {
return listRes.IngestionConfigs[0], nil
}
createRes, err := svc.CreateIngestionConfig(ctx, &ingestion_configsv1.CreateIngestionConfigRequest{
AssetName: assetName,
ClientKey: clientKey,
Flows: config(),
})
if err != nil {
return nil, err
}
return createRes.IngestionConfig, nil
}
Create a Run
func createRun(
ctx context.Context,
grpcChannel grpc.SiftChannel,
runName string,
) (*runsv2.Run, error) {
svc := runsv2.NewRunServiceClient(grpcChannel)
ts := timestamppb.Now()
createRes, err := svc.CreateRun(ctx, &runsv2.CreateRunRequest{
Name: fmt.Sprintf("[%s].%d", runName, ts.Seconds),
StartTime: ts,
})
if err != nil {
return nil, err
}
return createRes.Run, nil
}
Main function
const (
assetName = "NostromoLV426"
clientKey = "nostromo-lv-426-config-v1"
)
func main() {
ctx := context.Background()
grpcChannel, err := grpc.UseSiftChannel(ctx, grpc.SiftChannelConfig{
Uri: os.Getenv("SIFT_URI"),
Apikey: os.Getenv("SIFT_API_KEY"),
})
if err != nil {
log.Fatalln(err)
}
ingestionConfig, err := getOrCreateIngestionConfig(ctx, grpcChannel, assetName, clientKey)
if err != nil {
log.Fatalln(err)
}
log.Printf("initialized ingestion config %s\n", ingestionConfig.ClientKey)
run, err := createRun(ctx, grpcChannel, assetName)
if err != nil {
log.Fatalln(err)
}
log.Printf("initialized run %s\n", run.Name)
siftStream, err := ingestv1.NewIngestServiceClient(grpcChannel).IngestWithConfigDataStream(ctx)
if err != nil {
log.Fatalln(err)
}
dataStream := dataSource()
for data := range dataStream {
req := &ingestv1.IngestWithConfigDataStreamRequest{
IngestionConfigId: ingestionConfig.IngestionConfigId,
RunId: run.RunId,
Flow: "velocity_reading",
Timestamp: timestamppb.New(data.Timestamp),
ChannelValues: []*ingestv1.IngestWithConfigDataChannelValue{
{Type: &ingestv1.IngestWithConfigDataChannelValue_Double{Double: data.Value}},
},
EndStreamOnValidationError: false,
}
if err := siftStream.Send(req); err != nil {
log.Fatalln(err)
}
log.Println("ingested a velocity_reading flow")
}
if _, err := siftStream.CloseAndRecv(); err != nil {
log.Fatalln(err)
}
log.Println("done.")
}
Rust
Install
cargo add sift_rs rand pbjson_types chrono tokio_stream
cargo add tokio --features full
TokioTokio is required to work with
sift_rs; it is the runtime that tonic relies on. The Tokio runtime flavor to use is dependent on your requirements.Resources
Imports
use chrono::{DateTime, Utc};
use pbjson_types::Timestamp;
use rand::Rng;
use sift_rs::{
gen::sift::{
common::r#type::v1::ChannelDataType,
ingest::v1::{
ingest_service_client::IngestServiceClient,
ingest_with_config_data_channel_value::Type, IngestWithConfigDataChannelValue,
IngestWithConfigDataStreamRequest,
},
ingestion_configs::v1::{
ingestion_config_service_client::IngestionConfigServiceClient, ChannelConfig,
CreateIngestionConfigRequest, FlowConfig, IngestionConfig, ListIngestionConfigsRequest,
},
runs::v2::{run_service_client::RunServiceClient, CreateRunRequest, Run},
},
grpc::{use_sift_channel, SiftChannel, SiftChannelConfig},
};
use std::{
env,
error::Error,
sync::mpsc::{channel, Receiver},
thread,
time::{Duration, Instant},
};
Data source
pub fn data_source() -> Receiver<(DateTime<Utc>, f64)> {
let (tx, rx) = channel();
thread::spawn(move || {
let duration = Duration::from_secs(60);
let start = Instant::now();
let mut rng = rand::thread_rng();
while Instant::now().duration_since(start) < duration {
tx.send((Utc::now(), rng.gen_range(0.0..100.0))).unwrap();
thread::sleep(Duration::from_millis(500));
}
});
rx
}
Channel and flow config
pub fn channel_configs() -> Vec<FlowConfig> {
vec![FlowConfig {
name: String::from("velocity_reading"),
channels: vec![ChannelConfig {
name: String::from("mainmotor.velocity"),
unit: String::from("km/hr"),
description: String::from("vehicle speed"),
data_type: ChannelDataType::Double.into(),
..Default::default()
}],
}]
}
Get or create ingestion config
async fn get_or_create_ingestion_config(
grpc_channel: SiftChannel,
asset_name: &str,
client_key: &str,
) -> Result<IngestionConfig, Box<dyn Error>> {
let mut svc = IngestionConfigServiceClient::new(grpc_channel);
let list_res = svc
.list_ingestion_configs(ListIngestionConfigsRequest {
filter: format!("client_key == '{client_key}'"),
..Default::default()
})
.await?;
if let Some(ingestion_config) = list_res.into_inner().ingestion_configs.first().cloned() {
return Ok(ingestion_config);
}
let req = CreateIngestionConfigRequest {
flows,
asset_name: String::from(asset_name),
client_key: String::from(client_key),
..Default::default()
};
let create_res = svc.create_ingestion_config(req).await?;
let ingestion_conf = create_res
.into_inner()
.ingestion_config
.ok_or("expected ingestion config")?;
Ok(ingestion_conf)
}
Create a Run
async fn create_run(grpc_channel: SiftChannel, run_name: &str) -> Result<Run, Box<dyn Error>> {
let mut svc = RunServiceClient::new(grpc_channel);
let ts = Utc::now();
let create_req = CreateRunRequest {
name: format!("[{}].{}", run_name.to_string(), ts.timestamp()),
start_time: Some(Timestamp::from(ts)),
..Default::default()
};
let create_res = svc.create_run(create_req).await?;
let run = create_res.into_inner().run.expect("expected run");
Ok(run)
}
Run metadata
CreateRunRequest supports additional metadata fields such as description, tags, client_key, and stop_time. For the full list of available fields, see the CreateRunRequest documentation in sift_rs. These fields are also documented in the Run API reference.Main function
pub const ASSET_NAME: &str = "NostromoLV426";
pub const CLIENT_KEY: &str = "nostromo-lv-426-config-v1";
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let grpc_channel = use_sift_channel(SiftChannelConfig {
uri: env::var("SIFT_URI")?,
apikey: env::var("SIFT_API_KEY")?,
})?;
let ingestion_config =
get_or_create_ingestion_config(grpc_channel.clone(), ASSET_NAME, CLIENT_KEY).await?;
println!(
"initialized ingestion config {}",
ingestion_config.client_key
);
let run = create_run(grpc_channel.clone(), ASSET_NAME).await?;
println!("initialized run {}", &run.name);
let mut ingestion_service = IngestServiceClient::new(grpc_channel);
let data_stream = data_source();
while let Ok((timestamp, velocity)) = data_stream.recv() {
let req = IngestWithConfigDataStreamRequest {
run_id: run.run_id.clone(),
ingestion_config_id: String::from(&ingestion_config.ingestion_config_id),
flow: String::from("velocity_reading"),
timestamp: Some(Timestamp::from(timestamp)),
channel_values: vec![IngestWithConfigDataChannelValue {
r#type: Some(Type::Double(velocity)),
}],
end_stream_on_validation_error: false,
..Default::default()
};
ingestion_service
.ingest_with_config_data_stream(tokio_stream::once(req))
.await?;
println!("ingested a velocity_reading flow");
}
println!("done.");
Ok(())
}