Manage data pipelines
Monitor and manage your data pipelines after creation. For creating pipelines, see Create a pipeline.
List pipelines
viam datapipelines list --org-id=<org-id>
Example output (one line per pipeline):
hourly-temp-avg (ID: 64f3a1b2c4d5e6f7a8b9c0d1) [Enabled] [Data Source Type: Standard]
daily-summary (ID: 64f3a1b2c4d5e6f7a8b9c0d2) [Disabled] [Data Source Type: Hot Storage]
If the command prints nothing, the organization has no pipelines. This is not an error.
pipelines = await data_client.list_data_pipelines(organization_id=ORG_ID)
for p in pipelines:
print(f"{p.id}: {p.name} (enabled={p.enabled}, schedule={p.schedule})")
pipelines, err := dataClient.ListDataPipelines(ctx, orgID)
if err != nil {
logger.Fatal(err)
}
for _, p := range pipelines {
fmt.Printf("%s: %s (enabled=%v, schedule=%s)\n", p.ID, p.Name, p.Enabled, p.Schedule)
}
Get pipeline details
viam datapipelines describe --id=<pipeline-id>
Example output:
ID: 64f3a1b2c4d5e6f7a8b9c0d1
Name: hourly-temp-avg
Enabled: true
Schedule: 0 * * * *
MQL query: [
{
"$match": {
"component_name": "temperature-sensor"
}
},
...
]
DataSourceType: TABULAR_DATA_SOURCE_TYPE_STANDARD
Last run:
Status: Success
Started: 2026-03-15T15:02:13Z
Data range: [2026-03-15T14:00:00Z, 2026-03-15T15:00:00Z]
Ended: 2026-03-15T15:02:18Z
If the pipeline has never run, the last section reads Has not run yet. instead.
pipeline = await data_client.get_data_pipeline(id="YOUR-PIPELINE-ID")
print(f"Name: {pipeline.name}")
print(f"Schedule: {pipeline.schedule}")
print(f"Enabled: {pipeline.enabled}")
print(f"Data source: {pipeline.data_source_type}")
print(f"Created: {pipeline.created_on}")
pipeline, err := dataClient.GetDataPipeline(ctx, "YOUR-PIPELINE-ID")
if err != nil {
logger.Fatal(err)
}
fmt.Printf("Name: %s\nSchedule: %s\nEnabled: %v\n", pipeline.Name, pipeline.Schedule, pipeline.Enabled)
Monitor pipeline runs
Each pipeline run has a status and an associated time window showing which data it processed.
# Returns a page of runs (default page size: 10)
page = await data_client.list_data_pipeline_runs(id="YOUR-PIPELINE-ID")
for run in page.runs:
print(f"Run {run.id}: {run.status}")
print(f" Data window: {run.data_start_time} to {run.data_end_time}")
if run.error_message:
print(f" Error: {run.error_message}")
# Get the next page if there are more runs
if page.next_page_token:
next_page = await page.next_page()
// Returns a page of runs (default page size: 10)
page, err := dataClient.ListDataPipelineRuns(ctx, "YOUR-PIPELINE-ID", 10)
if err != nil {
logger.Fatal(err)
}
for _, run := range page.Runs {
fmt.Printf("Run %s: %d\n", run.ID, run.Status)
fmt.Printf(" Data window: %s to %s\n", run.DataStartTime, run.DataEndTime)
if run.ErrorMessage != "" {
fmt.Printf(" Error: %s\n", run.ErrorMessage)
}
}
// Get the next page
nextPage, err := page.NextPage(ctx)
Run statuses:
| SDK status | CLI label | Meaning |
|---|---|---|
SCHEDULED | Scheduled | The run is queued and waiting to execute (2-minute delay before execution starts). |
STARTED | Running | The run is executing the MQL aggregation against the data source. |
COMPLETED | Success | The run finished and results are in the pipeline sink. |
FAILED | Failed | The run encountered an error. Check the error_message field. |
SDK methods return the enum Status value on the left. The viam datapipelines describe CLI output uses the label on the right.
If a run stays in STARTED for more than 10 minutes, it is automatically marked as failed and a new run is created for that time window.
Enable a pipeline
viam datapipelines enable --id=<pipeline-id>
err = dataClient.EnableDataPipeline(ctx, "YOUR-PIPELINE-ID")
Note
The Python SDK does not currently have enable_data_pipeline or disable_data_pipeline methods. Use the CLI or Go SDK.
Disable a pipeline
viam datapipelines disable --id=<pipeline-id>
err = dataClient.DisableDataPipeline(ctx, "YOUR-PIPELINE-ID")
Disabling a pipeline stops future scheduled runs but does not delete existing results. When you re-enable a pipeline, it resumes from the next scheduled time window. It does not backfill windows it missed while disabled.
Rename a pipeline
viam datapipelines rename --id=<pipeline-id> --name=new-name
await data_client.rename_data_pipeline(id="YOUR-PIPELINE-ID", name="new-name")
err = dataClient.RenameDataPipeline(ctx, "YOUR-PIPELINE-ID", "new-name")
Delete a pipeline
viam datapipelines delete --id=<pipeline-id>
await data_client.delete_data_pipeline(id="YOUR-PIPELINE-ID")
err = dataClient.DeleteDataPipeline(ctx, "YOUR-PIPELINE-ID")
Deleting a pipeline is irreversible
Deleting a pipeline removes the pipeline configuration, its execution history, and all output data in the pipeline sink. If you need to preserve pipeline results, export them first.
Troubleshooting
Was this page helpful?
Glad to hear it! If you have any other feedback please let us know:
We're sorry about that. To help us improve, please tell us what we can do better:
Thank you!