Create a data pipeline
Create a pipeline that runs a scheduled MQL aggregation against your captured data and stores the results as precomputed summaries. For an overview of how pipelines work, see Data pipelines overview.
Prerequisites
- Captured tabular data in the cloud (see Start data capture)
- Your organization ID (find it in the Viam app under Settings)
- Viam CLI installed, or the Python/Go SDK
Create with the CLI
This example creates a pipeline that computes hourly temperature averages grouped by location:
viam datapipelines create \
--org-id=<org-id> \
--name=hourly-temp-avg \
--schedule="0 * * * *" \
--data-source-type=standard \
--mql='[{"$match": {"component_name": "temperature-sensor"}}, {"$group": {"_id": "$location_id", "avg_temp": {"$avg": "$data.readings.temperature"}, "count": {"$sum": 1}}}, {"$project": {"location": "$_id", "avg_temp": 1, "count": 1, "_id": 0}}]' \
--enable-backfill=true
The CLI prints the pipeline ID on success. Save this ID to query results and manage the pipeline.
CLI flags
| Flag | Required | Description |
|---|---|---|
--org-id | Yes | Your organization ID. |
--name | Yes | A descriptive name. Must be unique within the organization. |
--schedule | Yes | A cron expression in UTC. Also determines the query time window. See Cron schedule. |
--mql | One of --mql or --mql-path | The MQL aggregation pipeline as a JSON string. |
--mql-path | One of --mql or --mql-path | Path to a file containing the MQL aggregation pipeline as JSON. |
--enable-backfill | Yes | Whether to process historical time windows. true or false. |
--data-source-type | Yes | standard or hotstorage. |
For complex queries, use --mql-path to read from a file:
viam datapipelines create \
--org-id=<org-id> \
--name=hourly-temp-avg \
--schedule="0 * * * *" \
--data-source-type=standard \
--mql-path=./my-pipeline.json \
--enable-backfill=true
Where my-pipeline.json contains:
[
{ "$match": { "component_name": "temperature-sensor" } },
{
"$group": {
"_id": "$location_id",
"avg_temp": { "$avg": "$data.readings.temperature" },
"count": { "$sum": 1 }
}
},
{
"$project": {
"location": "$_id",
"avg_temp": 1,
"count": 1,
"_id": 0
}
}
]
Create with the SDK
import asyncio
from viam.rpc.dial import DialOptions
from viam.app.viam_client import ViamClient
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType
API_KEY = "YOUR-API-KEY"
API_KEY_ID = "YOUR-API-KEY-ID"
ORG_ID = "YOUR-ORGANIZATION-ID"
async def main():
dial_options = DialOptions.with_api_key(
api_key=API_KEY,
api_key_id=API_KEY_ID,
)
client = await ViamClient.create_from_dial_options(dial_options)
data_client = client.data_client
# Returns the pipeline ID
pipeline_id = await data_client.create_data_pipeline(
organization_id=ORG_ID,
name="hourly-temp-avg",
mql_binary=[
{"$match": {"component_name": "temperature-sensor"}},
{"$group": {
"_id": "$location_id",
"avg_temp": {"$avg": "$data.readings.temperature"},
"count": {"$sum": 1},
}},
{"$project": {
"location": "$_id",
"avg_temp": 1,
"count": 1,
"_id": 0,
}},
],
schedule="0 * * * *",
data_source_type=TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_STANDARD,
enable_backfill=False,
)
print(f"Created pipeline: {pipeline_id}")
client.close()
if __name__ == "__main__":
asyncio.run(main())
package main
import (
"context"
"fmt"
"go.viam.com/rdk/app"
"go.viam.com/rdk/logging"
)
func main() {
ctx := context.Background()
logger := logging.NewDebugLogger("pipeline")
viamClient, err := app.CreateViamClientWithAPIKey(
ctx, app.Options{}, "YOUR-API-KEY", "YOUR-API-KEY-ID", logger)
if err != nil {
logger.Fatal(err)
}
defer viamClient.Close()
dataClient := viamClient.DataClient()
mqlStages := []map[string]interface{}{
{"$match": map[string]interface{}{
"component_name": "temperature-sensor",
}},
{"$group": map[string]interface{}{
"_id": "$location_id",
"avg_temp": map[string]interface{}{"$avg": "$data.readings.temperature"},
"count": map[string]interface{}{"$sum": 1},
}},
{"$project": map[string]interface{}{
"location": "$_id",
"avg_temp": 1,
"count": 1,
"_id": 0,
}},
}
pipelineID, err := dataClient.CreateDataPipeline(
ctx, "YOUR-ORGANIZATION-ID", "hourly-temp-avg",
mqlStages, "0 * * * *", false,
&app.CreateDataPipelineOptions{
TabularDataSourceType: app.TabularDataSourceTypeStandard,
},
)
if err != nil {
logger.Fatal(err)
}
fmt.Printf("Created pipeline: %s\n", pipelineID)
}
To get your credentials:
- Go to your machine’s page in the Viam app.
- Click the CONNECT tab.
- Select API keys.
- Copy the API key and API key ID.
Find your organization ID in the Viam app by clicking your organization name and selecting Settings.
After creating a pipeline, see Query pipeline results to access the output, and Examples and tips for MQL patterns for common robotics use cases.
Troubleshooting
Was this page helpful?
Glad to hear it! If you have any other feedback please let us know:
We're sorry about that. To help us improve, please tell us what we can do better:
Thank you!