Go Streaming sensor data into Sift using Go
Credentials
Before starting this section be sure to retrieve your API key and the appropriate Sift URL for your provisioned environment. Instructions on how to retrieve the API key and URL and can be found in the authentication section of the documentation.
This exercise demonstrates how to stream data into Sift using the Go client library.
The goal is to read in data from a mock data source representing sensor data from our asset,
NostromoLV426
, and then stream that data to Sift.
NostromoLV426
will contain a single velocity
channel whose
values are of type double
; this channel will belong to the mainmotor
component.
For full code examples, documentation, as well as the library's source code, refer to the following:
For this exercise we just need to install the Sift Go module inside of a new Go project:
go get github.com/sift-stack/sift/go
With our Sift module installed, we're going to begin by importing everything we will use for this example in advance. Inside of main.go
:
package main
import (
" context "
" fmt "
" log "
" math/rand "
" os "
" time "
" github.com/sift-stack/sift/go/gen/sift/common/type/v1 "
ingestv1 " github.com/sift-stack/sift/go/gen/sift/ingest/v1 "
" github.com/sift-stack/sift/go/gen/sift/ingestion_configs/v1 "
" github.com/sift-stack/sift/go/gen/sift/runs/v2 "
" github.com/sift-stack/sift/go/grpc "
" google.golang.org/protobuf/types/known/timestamppb "
)
Next we're going to define a data source which will provide us with mock sensor data. The data source will simply send a timestamp and a float
over the course of a minute and sleep for 0.5s
between each send. This will represent the time-series coming from the velocity
channel:
// Simulate sensor data
func dataSource () <-chan dataPoint {
dataChannel := make ( chan dataPoint )
go func () {
rng := rand. New (rand. NewSource (time. Now (). UnixNano ()))
duration := 60 * time.Second
start := time. Now ()
for time. Since (start) < duration {
dataChannel <- dataPoint {
Timestamp: time. Now (),
Value: rng. Float64 (),
}
time. Sleep ( 500 * time.Millisecond)
}
}()
return dataChannel
}
Now we will define our channels in a function called configs
. Right now we only have one velocity
channel but
we are free to add more later.
// Flow and channel configuration
func config () [] * ingestion_configsv1 . FlowConfig {
return [] * ingestion_configsv1 . FlowConfig {
{
Name: "velocity_reading" ,
Channels: [] * ingestion_configsv1 . ChannelConfig {
{
Name: "velocity" ,
Component: "mainmotor" ,
Unit: "km/hr" ,
Description: "vehicle speed" ,
DataType: typev1.ChannelDataType_CHANNEL_DATA_TYPE_DOUBLE,
},
},
},
}
}
Next, we will define a function called get_or_create_ingestion
config which retrieves an existing
ingestion config
by client key or creates it if it does not exist. This function will use the configs
function that we created
in the previous step:
// Retrieves an existing ingestion config or create it.
func getOrCreateIngestionConfig (
ctx context . Context ,
grpcChannel grpc . SiftChannel ,
assetName ,
clientKey string ,
) ( * ingestion_configsv1 . IngestionConfig , error ) {
svc := ingestion_configsv1. NewIngestionConfigServiceClient (grpcChannel)
listRes, err := svc. ListIngestionConfigs (ctx, & ingestion_configsv1 . ListIngestionConfigsRequest {
Filter: fmt. Sprintf ( "client_key == ' %s '" , clientKey),
})
if err != nil {
return nil , err
}
if listRes != nil && len (listRes.IngestionConfigs) > 0 {
return listRes.IngestionConfigs[ 0 ], nil
}
createRes, err := svc. CreateIngestionConfig (ctx, & ingestion_configsv1 . CreateIngestionConfigRequest {
AssetName: assetName,
ClientKey: clientKey,
Flows: config (),
})
if err != nil {
return nil , err
}
return createRes.IngestionConfig, nil
}
Client Keys
As mentioned in the ingestion config section , specifying a client key is highly recommended as it simplifies lookups for users after creation as shown above.
Now we will make a create_run
function so we can group together our asset data.
// Create a run to use to group all the data ingested during this period.
func createRun (
ctx context . Context ,
grpcChannel grpc . SiftChannel ,
runName string ,
) ( * runsv2 . Run , error ) {
svc := runsv2. NewRunServiceClient (grpcChannel)
ts := timestamppb. Now ()
createRes, err := svc. CreateRun (ctx, & runsv2 . CreateRunRequest {
Name: fmt. Sprintf ( "[ %s ]. %d " , runName, ts.Seconds),
StartTime: ts,
})
if err != nil {
return nil , err
}
return createRes.Run, nil
}
Runs are Optional
Creating a run is not required to send data to Sift. It's a mechanism to group together data for a single asset or multiple assets. Read more .
Now all that's left to do is to assemble our main
function:
const (
// Name of the asset that we want to ingest data for.
assetName = "NostromoLV426"
// Unique client-chosen identifier used to identify an ingestion config.
clientKey = "nostromo-lv-426-config-v1"
)
func main () {
ctx := context. Background ()
// Connect to Sift
grpcChannel, err := grpc. UseSiftChannel (ctx, grpc . SiftChannelConfig {
Uri: os. Getenv ( "SIFT_URI" ),
Apikey: os. Getenv ( "SIFT_API_KEY" ),
})
if err != nil {
log. Fatalln (err)
}
// Define the schema of our telemetry
ingestionConfig, err := getOrCreateIngestionConfig (ctx, grpcChannel, assetName, clientKey)
if err != nil {
log. Fatalln (err)
}
log. Printf ( "initialized ingestion config %s\n " , ingestionConfig.ClientKey)
// Create a run to group all the data ingested during this period.
run, err := createRun (ctx, grpcChannel, assetName)
if err != nil {
log. Fatalln (err)
}
log. Printf ( "initialized run %s\n " , run.Name)
// Initialize a gRPC stream to Sift
siftStream, err := ingestv1. NewIngestServiceClient (grpcChannel). IngestWithConfigDataStream (ctx)
if err != nil {
log. Fatalln (err)
}
dataStream := dataSource ()
// Stream sensor data
for data := range dataStream {
req := & ingestv1 . IngestWithConfigDataStreamRequest {
IngestionConfigId: ingestionConfig.IngestionConfigId,
RunId: run.RunId,
Flow: "velocity_reading" ,
Timestamp: timestamppb. New (data.Timestamp),
ChannelValues: [] * ingestv1 . IngestWithConfigDataChannelValue {
{Type: & ingestv1 . IngestWithConfigDataChannelValue_Double {Double: data.Value}},
},
// Set this flag to 'true' only for debugging purposes to get real-time data validation from
// the Sift API. Do not use in production as it will hurt performance.
EndStreamOnValidationError: false ,
}
if err := siftStream. Send (req); err != nil {
log. Fatalln (err)
}
log. Println ( "ingested a velocity_reading flow" )
}
// Close the stream when finished and check if there are any errors
if _, err := siftStream. CloseAndRecv (); err != nil {
log. Fatalln (err)
}
log. Println ( "done." )
}
And that's it! In Sift, you should see the asset, NostromoLV426
, the run we just created, as well as
data for the velocity
channel.