Go

Streaming sensor data into Sift using Go

Credentials

Before starting this section be sure to retrieve your API key and the appropriate Sift URL for your provisioned environment. Instructions on how to retrieve the API key and URL and can be found in the authentication section of the documentation.

This exercise demonstrates how to stream data into Sift using the Go client library. The goal is to read in data from a mock data source representing sensor data from our asset, NostromoLV426, and then stream that data to Sift.

NostromoLV426 will contain a single velocity channel whose values are of type double; this channel will belong to the mainmotor component.

For full code examples, documentation, as well as the library's source code, refer to the following:

Installing Dependencies

For this exercise we just need to install the Sift Go module inside of a new Go project:

go get github.com/sift-stack/sift/go

Imports

With our Sift module installed, we're going to begin by importing everything we will use for this example in advance. Inside of main.go:

package main
 
import (
        "context"
        "fmt"
        "log"
        "math/rand"
        "os"
        "time"
 
        "github.com/sift-stack/sift/go/gen/sift/common/type/v1"
        ingestv1 "github.com/sift-stack/sift/go/gen/sift/ingest/v1"
        "github.com/sift-stack/sift/go/gen/sift/ingestion_configs/v1"
        "github.com/sift-stack/sift/go/gen/sift/runs/v2"
        "github.com/sift-stack/sift/go/grpc"
        "google.golang.org/protobuf/types/known/timestamppb"
)

Data Source

Next we're going to define a data source which will provide us with mock sensor data. The data source will simply send a timestamp and a float over the course of a minute and sleep for 0.5s between each send. This will represent the time-series coming from the velocity channel:

// Simulate sensor data
func dataSource() <-chan dataPoint {
    dataChannel := make(chan dataPoint)
    go func() {
        rng := rand.New(rand.NewSource(time.Now().UnixNano()))
        duration := 60 * time.Second
        start := time.Now()
 
        for time.Since(start) < duration {
            dataChannel <- dataPoint{
                Timestamp: time.Now(),
                Value:     rng.Float64(),
            }
            time.Sleep(500 * time.Millisecond)
        }
    }()
    return dataChannel
}

Config

Now we will define our channels in a function called configs. Right now we only have one velocity channel but we are free to add more later.

// Flow and channel configuration
func config() []*ingestion_configsv1.FlowConfig {
    return []*ingestion_configsv1.FlowConfig{
        {
            Name: "velocity_reading",
            Channels: []*ingestion_configsv1.ChannelConfig{
                {
                    Name:        "velocity",
                    Component:   "mainmotor",
                    Unit:        "km/hr",
                    Description: "vehicle speed",
                    DataType:    typev1.ChannelDataType_CHANNEL_DATA_TYPE_DOUBLE,
                },
            },
        },
    }
}

Next, we will define a function called get_or_create_ingestion config which retrieves an existing ingestion config by client key or creates it if it does not exist. This function will use the configs function that we created in the previous step:

// Retrieves an existing ingestion config or create it.
func getOrCreateIngestionConfig(
    ctx context.Context,
    grpcChannel grpc.SiftChannel,
    assetName,
    clientKey string,
) (*ingestion_configsv1.IngestionConfig, error) {
    svc := ingestion_configsv1.NewIngestionConfigServiceClient(grpcChannel)
 
    listRes, err := svc.ListIngestionConfigs(ctx, &ingestion_configsv1.ListIngestionConfigsRequest{
        Filter: fmt.Sprintf("client_key == '%s'", clientKey),
    })
    if err != nil {
        return nil, err
    }
    if listRes != nil && len(listRes.IngestionConfigs) > 0 {
        return listRes.IngestionConfigs[0], nil
    }
 
    createRes, err := svc.CreateIngestionConfig(ctx, &ingestion_configsv1.CreateIngestionConfigRequest{
        AssetName: assetName,
        ClientKey: clientKey,
        Flows:     config(),
    })
    if err != nil {
        return nil, err
    }
    return createRes.IngestionConfig, nil
}

Client Keys

As mentioned in the ingestion config section, specifying a client key is highly recommended as it simplifies lookups for users after creation as shown above.

Creating a Run

Now we will make a create_run function so we can group together our asset data.

// Create a run to use to group all the data ingested during this period.
func createRun(
    ctx context.Context,
    grpcChannel grpc.SiftChannel,
    runName string,
) (*runsv2.Run, error) {
    svc := runsv2.NewRunServiceClient(grpcChannel)
    ts := timestamppb.Now()
 
    createRes, err := svc.CreateRun(ctx, &runsv2.CreateRunRequest{
        Name:      fmt.Sprintf("[%s].%d", runName, ts.Seconds),
        StartTime: ts,
    })
    if err != nil {
        return nil, err
    }
    return createRes.Run, nil
}

Runs are Optional

Creating a run is not required to send data to Sift. It's a mechanism to group together data for a single asset or multiple assets. Read more.

Sending Data

Now all that's left to do is to assemble our main function:

const (
    // Name of the asset that we want to ingest data for.
    assetName = "NostromoLV426"
 
    // Unique client-chosen identifier used to identify an ingestion config.
    clientKey = "nostromo-lv-426-config-v1"
)
 
func main() {
    ctx := context.Background()
 
    // Connect to Sift
    grpcChannel, err := grpc.UseSiftChannel(ctx, grpc.SiftChannelConfig{
        Uri:    os.Getenv("SIFT_URI"),
        Apikey: os.Getenv("SIFT_API_KEY"),
    })
    if err != nil {
        log.Fatalln(err)
    }
 
    // Define the schema of our telemetry
    ingestionConfig, err := getOrCreateIngestionConfig(ctx, grpcChannel, assetName, clientKey)
    if err != nil {
        log.Fatalln(err)
    }
    log.Printf("initialized ingestion config %s\n", ingestionConfig.ClientKey)
 
    // Create a run to group all the data ingested during this period.
    run, err := createRun(ctx, grpcChannel, assetName)
    if err != nil {
        log.Fatalln(err)
    }
    log.Printf("initialized run %s\n", run.Name)
 
    // Initialize a gRPC stream to Sift
    siftStream, err := ingestv1.NewIngestServiceClient(grpcChannel).IngestWithConfigDataStream(ctx)
    if err != nil {
        log.Fatalln(err)
    }
 
    dataStream := dataSource()
 
    // Stream sensor data
    for data := range dataStream {
        req := &ingestv1.IngestWithConfigDataStreamRequest{
            IngestionConfigId: ingestionConfig.IngestionConfigId,
            RunId:             run.RunId,
            Flow:              "velocity_reading",
            Timestamp:         timestamppb.New(data.Timestamp),
            ChannelValues: []*ingestv1.IngestWithConfigDataChannelValue{
                {Type: &ingestv1.IngestWithConfigDataChannelValue_Double{Double: data.Value}},
            },
            // Set this flag to 'true' only for debugging purposes to get real-time data validation from
            // the Sift API. Do not use in production as it will hurt performance.
            EndStreamOnValidationError: false,
        }
        if err := siftStream.Send(req); err != nil {
            log.Fatalln(err)
        }
        log.Println("ingested a velocity_reading flow")
    }
 
    // Close the stream when finished and check if there are any errors
    if _, err := siftStream.CloseAndRecv(); err != nil {
        log.Fatalln(err)
    }
 
    log.Println("done.")
}

And that's it! In Sift, you should see the asset, NostromoLV426, the run we just created, as well as data for the velocity channel.

On this page