Flows

Flows are a named group of channels whose data are often sent together; they offer a mechanism to send more data per request in the context of a gRPC stream.

The formal definition of a flow can be found in its protocol buffer documentation.

For illustrative purposes, the definition is provided below, along with ChannelConfig:

message FlowConfig {
  string name = 1 [(google.api.field_behavior) = REQUIRED];
  repeated ChannelConfig channels = 2;
}
 
message ChannelConfig {
  string name = 1 [(google.api.field_behavior) = REQUIRED];
  string component = 2 [(google.api.field_behavior) = OPTIONAL];
  string unit = 3 [(google.api.field_behavior) = OPTIONAL];
  string description = 4 [(google.api.field_behavior) = OPTIONAL];
  sift.common.type.v1.ChannelDataType data_type = 5 [(google.api.field_behavior) = REQUIRED];
  repeated sift.common.type.v1.ChannelEnumType enum_types = 6;
  repeated sift.common.type.v1.ChannelBitFieldElement bit_field_elements = 7;
}

A FlowConfig defines a flow to be created, while a ChannelConfig defines the channel to be created.

Channel Ordering

When defining your flow configuration to use as part of your ingestion config, it is important to maintain the order of channels that exist in the channels list. That exact order must be preserved when providing data values to the IngestWithConfigDataStreamRequest, otherwise errors may occur during streaming.

Example

Consider the following pseudo-code that creates a flow called reading with one double channel and one string channel:

def flow_configs() -> List[FlowConfig] {
    return [
        FlowConfig {
            name: "reading"
            channels: [
                ChannelConfig {
                    name: "velocity",
                    component: "mainmotor",
                    unit: "km/hr",
                    description: "vehicle speed",
                    data_type: ChannelDataType::Double,
                },
                ChannelConfig {
                    name: "log",
                    description: "logs",
                    data_type: ChannelDataType::String,
                },
            ],
        },
    ]
}

When creating your IngestWithConfigDataStreamRequest to send for the reading flow, Sift will know which channel to attribute which data point to based on the order in which the values appear in the request.

IngestWithConfigDataStreamRequest {
    ingestion_config_id: ingestion_config.ingestion_config_id,
    run_id: run.run_id,
    flow: "reading"
    timestamp: now(),
    channel_values: [
        // velocity channel
        IngestWithConfigDataChannelValue {
            type: Some(Type::Double(10.0)),
        },
        // log channel
        IngestWithConfigDataChannelValue {
            type: Some(Type::String("example log")),
        },
    ],
}

Failure to respect the ordering may result either in an error or data being attributed to the wrong channel.

Empty Values

For the same example above, if there is data for the velocity channel but none for the log channel, we can still send data for the reading flow by sending a google.protobuf.Empty value.

On this page