Python

Streaming sensor data into Sift using Python

Credentials

Before starting this section be sure to retrieve your API key and the appropriate Sift URL for your provisioned environment. Instructions on how to retrieve the API key and URL and can be found in the authentication section of the documentation.

This exercise demonstrates how to stream data into Sift using the Python client library. The objective is to read in data from a mock data source representing sensor data from our asset, NostromoLV426, and then stream that data to Sift.

NostromoLV426 will contain a single velocity channel whose values are of type double; this channel will belong to the mainmotor component.

For full code examples, documentation, as well as the library's source code, refer to the following:

Installing Dependencies

Before we begin streaming data to Sift we will need to install sift-stack-py. One way in which to do that is simply to pip install the package.

pip install sift-stack-py

Imports

With sift-stack-py installed, we'll begin by importing the necessary modules for this example. Inside of your main.py:

import os
 
from datetime import datetime, timezone
from random import random
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import ChannelConfig, ChannelDataType, double_value
from sift_py.ingestion.config.telemetry import TelemetryConfig
from sift_py.ingestion.flow import FlowConfig
from sift_py.ingestion.service import IngestionService
from time import sleep, time
from typing import Iterator, Tuple

Data Source

Next we'll define a data source that provides us mock sensor data. The data source will send a timestamp and a float over the course of a minute, sleeping for 0.5s between each send. This will represent the time-series coming from the velocity channel:

# Simulate sensor data
def data_source() -> Iterator[Tuple[datetime, float]]:
    start = time()
    while time() - start < 60:
        sleep(0.5)
        yield datetime.now(timezone.utc), random()

Config

Now we define the schema of our telemetry in a TelemetryConfig which is a simple wrapper around an ingestion config:

# The name of our asset
ASSET_NAME = "NostromoLV426"
 
# An arbitrary key used to identify our config
CONFIG_KEY = "nostromo-lv-426-config-v1"
 
# Define the schema of your telemetry
config = TelemetryConfig(
    asset_name=ASSET_NAME,
    ingestion_client_key=CONFIG_KEY,
    flows=[
        FlowConfig(
            name="velocity-reading",
            channels=[
                ChannelConfig(
                    component="mainmotor",
                    name="velocity",
                    data_type=ChannelDataType.DOUBLE,
                    unit="m/s",
                ),
            ],
        ),
    ],
)

Client Keys

As mentioned in the ingestion config section, specifying a client key (CONFIG_KEY in the example above) is highly recommended as it simplifies lookups for users after creation.

Sending Data

Now that we have a TelemetryConfig, we will establish a connection to Sift, create an ingestion service, and initialize a run.

credentials: SiftChannelConfig = {
    "apikey": os.getenv("SIFT_API_KEY"),
    "uri": os.getenv("BASE_URI"),
}
 
with use_sift_channel(credentials) as grpc_channel:
    # Create an ingestion service
    ingestion_service = IngestionService(grpc_channel, config)
 
    # Create a run to use during this period of ingestion
    run_name = f"[{ASSET_NAME}].{time():.0f}"
    ingestion_service.attach_run(grpc_channel, run_name)

Runs are Optional

Creating a run is not required to send data to Sift. It's a mechanism to group together data for a single asset or multiple assets. Read more.

Once our ingestion_service has successfully been instantiated, Sift will have a record of your telemetry config a.k.a. your ingestion config. Henceforth, whenever you provide the same TelemetryConfig with the same ingestion_client_key, Sift will reuse that config for future streams for the specified asset. Additional flows and channels may be added over time.

Backwards Compatibility

Note that for a given ingestion_client_key, adding new flows and channels over time is a safe and backwards compatible operation. Existing flows and channels, however, should not be modified; this type of change is not considered backwards compatible and will lead to unexpected behavior.

Now that we have a fully configured ingestion service, let's read data in from our the data source we initially created and stream that data into Sift:

# Stream sensor data to Sift
for timestamp, velocity in data_source():
    ingestion_service.ingest_flows({
        "flow_name": "velocity-reading",
        "timestamp": timestamp,
        "channel_values": [double_value(velocity)],
    })

And that's it! In Sift, you should see the asset, NostromoLV426, the run we just created, as well as data for the velocity channel.

On this page