数据流

数据流 3.0(组件类型 DPL,Python SDK:DeepPipelineAPI / 元素类 DeepPipeline)用于触发 DeepFOS 数据流运行、轮询运行结果。本文档依据 deepfos.api.deep_pipeline 中封装接口整理 HTTP 路径与请求要点,并与服务 OpenAPI 3.1(服务名示例 deep-pipeline-server3-0)对齐,细化请求参数与响应结构。

  1. 执行前:应保证目标数据流元素存在 已审批通过且启用中 的发布版(与 SDK has_approved_release 语义一致)。

  2. 异步执行:先 POST /run 获取运行 ID,再 GET /run/{runId}/result 轮询直至结束或超时。

  3. 同进程直连:请求体中的 inProcess 控制在服务端是否「同进程」执行 Dagster Job;具体行为由各部署环境与实现约定。

服务名说明:以下为 deep-pipeline-server3-0(与 SDK 内置测试及应用元数据中 DPL 服务名示例一致)。线上请以实际注册的服务名为准:https://{host}/deepfos-server/{服务名}


提交一次运行;成功时信封中 data执行 ID 字符串(可与 SDK 口中的「任务 ID」对应),用于查询日志与结果。

请求

项目

HTTP Method

POST

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/run

Content-Type

application/json(请求体必填)

Header Parameters

参数名

位置

类型

必填

说明

app

Header

String

应用 ID

space

Header

String

空间 ID

user

Header

String

用户 ID

language

Header

String

语言

token

Header

String

签名

cookie

Header

String

Cookie

Request Body(RunWithInfoRunWithId 二选一)

规范为 anyOf:按编码/文件夹定位时使用 RunWithInfo;已知平台 元素 ID 时使用 RunWithId

RunWithInfo

字段

类型

必填

说明

elementName

String

元素编码/名称

elementType

String

元素类型,默认 DPL

folderId

String

文件夹 ID;与 path 须二选一提供其一(不可同时省略两者)

path

String

文件路径

parameter

Any

运行参数(无固定 schema)

inProcess

Boolean

是否同进程执行,默认 false

parentRunId

String

父流程的 runId

RunWithId

字段

类型

必填

说明

elementId

String | Integer

元素 ID

parameter

Any

运行参数

inProcess

Boolean

是否同进程执行,默认 false

parentRunId

String

父流程的 runId

Request Body 示例(RunWithInfo + 参数)

Copy
{
  "elementName": "PIPE_DEMO",
  "elementType": "DPL",
  "folderId": "10086",
  "inProcess": false,
  "parameter": {
    "year": "2024",
    "period": "3"
  }
}

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[str],业务语意为执行 ID)

字段

类型

说明

status

Boolean

接口是否成功(规范必填)

code

Integer

错误码

message

String

提示信息

data

String

执行 ID;若要拉取结果可请求路径 GET https://{host}/deepfos-server/deep-pipeline-server3-0/run/{runId}/result(将 {runId} 替换为返回值)


按执行 ID 轮询直至完成或超时(SDK 中对部分错误码可能重试)。

请求

项目

HTTP Method

GET

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/run/{runId}/result

路径中的 {runId}POST /run 返回的执行 ID(与文档/口语中的 taskId 同指)。

Path Parameters

参数名

类型

必填

说明

runId

String

执行 ID

Query Parameters

参数名

类型

必填

说明

timeout

Integer

执行等待/阻塞超时(秒),语义以实现为准

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[Any]data 为执行结果)

字段

类型

说明

status

Boolean

接口是否成功(规范必填)

code

Integer

错误码

message

String

提示信息

data

Any

执行结果;具体结构随 Dagster Asset/Op 与数据流实现变化,OpenAPI 未约束子 schema

未完成或运行中时的行为(HTTP 状态、data 形态)以实现为准;SDK 侧通常会持续轮询直至终态或错误。


在服务侧完成一次阻塞式运行直至结束或超时,响应体在一次 HTTP 往返中给出;timeout(秒)作用于本次同步等待时长,具体阻塞语义以实现为准。

请求

项目

HTTP Method

POST

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/run-sync

Content-Type

application/json(请求体必填)

Header Parameters

参数名

位置

类型

必填

说明

app

Header

String

应用 ID

space

Header

String

空间 ID

user

Header

String

用户 ID

language

Header

String

语言

token

Header

String

签名

cookie

Header

String

Cookie

Request Body(RunWithInfoTimeoutRunWithIdTimeout 二选一)

RunWithInfoTimeout

字段

类型

必填

说明

elementName

String

元素编码/名称

elementType

String

元素类型,默认 DPL

folderId

String

文件夹 ID

path

String

文件路径

parameter

Any

运行参数

inProcess

Boolean

是否同进程执行,默认 false

parentRunId

String

父流程的执行 ID

timeout

Integer

本次同步等待超时(秒)

folderIdpath:若需要通过目录定位,须二选一提供其一(不可二者皆缺)。

RunWithIdTimeout

字段

类型

必填

说明

elementId

String | Integer

元素 ID

parameter

Any

运行参数

inProcess

Boolean

是否同进程执行,默认 false

parentRunId

String

父流程的执行 ID

timeout

Integer

本次同步等待超时(秒)

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[Any]

字段

类型

说明

status

Boolean

接口调用是否受理成功(规范必填)

code

Integer

业务/错误码

message

String

提示文案

data

Any

本次运行的结果载荷:结构取决于数据流内含的 Dagster Asset/Op 与参数,规范未给出固定 JSON Schema


对若干执行 ID 发起中断/终止;响应 data 为映射:键为各执行 ID 字符串,值为布尔型(可理解为 runId → boolean)。

请求

项目

HTTP Method

POST

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/run/terminate

Content-Type

application/json(请求体必填)

本请求在 OpenAPI 中未强制声明 Header;若租户体系要求在 HTTP 层显式传递应用或用户信息,可自行附加下表中键值对,是否必填以运行环境为准。

参数名

位置

类型

必填

说明

app

Header

String

视环境

应用 ID

space

Header

String

视环境

空间 ID

user

Header

String

视环境

用户 ID

language

Header

String

语言

token

Header

String

签名

cookie

Header

String

Cookie

Request Body

字段

类型

必填

说明

runIds

String[]

执行 ID 列表

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[Dict[str, bool]]

字段

类型

说明

status

Boolean

接口调用是否成功(规范必填)

code

Integer

业务/错误码

message

String

提示文案

data

Object

键为 UTF-8 字符串形式的 runId,值为布尔型,表示服务端是否已对对应执行请求了中断或终止


按执行 ID 列表删除运行记录或关联资源;每条 ID 在响应 data 中对应一个 boolean,表示是否删除成功。

请求

项目

HTTP Method

POST

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/run/delete

Content-Type

application/json(请求体必填)

OpenAPI 未列出 Header。若当前部署仍要求租户/用户上下文,可额外携带下表(是否必填以运行环境为准)。

参数名

位置

类型

必填

说明

app

Header

String

视环境

应用 ID

space

Header

String

视环境

空间 ID

user

Header

String

视环境

用户 ID

language

Header

String

语言

token

Header

String

签名

cookie

Header

String

Cookie

Request Body

字段

类型

必填

说明

runIds

String[]

执行 ID 列表

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[Dict[str, bool]]

字段

类型

说明

status

Boolean

接口调用是否成功(规范必填)

code

Integer

业务/错误码

message

String

提示文案

data

Object

键为 UTF-8 字符串形式的 runId,值为布尔型,表示该执行记录是否在服务端被认为已删除


返回当前应用/空间可见范围内多条数据流元素的汇总信息(元素定位、草稿/审批状态、版本、最近一次发布与最近一次运行概要、定时计划摘要、实例总数等)。

请求

项目

HTTP Method

GET

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/monitor/flows

Header Parameters

参数名

位置

类型

必填

说明

app

Header

String

应用 ID

space

Header

String

空间 ID

user

Header

String

用户 ID

language

Header

String

语言

token

Header

String

签名

cookie

Header

String

Cookie

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[List[FlowOverviewRecord]]

字段

类型

说明

status

Boolean

接口调用是否成功(规范必填)

code

Integer

业务/错误码

message

String

提示文案

data

Array

FlowOverviewRecord 数组

FlowOverviewRecord

字段

类型

必填

说明

element

Object

元素编码、路径类定位与单行描述构成的对象

status

String

FlowStatusDRAFTAPPROVEDOBSOLETE

revision

Object

Revisionversionname

schedule

Object

定时计划摘要:ScheduleOverview

lastRelease

Object

上次发布:ReleaseOverview

lastRun

Object

上次运行:RunOverview

totalRuns

Integer

累积实例总数

summary

String

概览摘要文案

FlowOverviewRecord.element(亦称 FlowElementInfoWithDescription

字段

类型

必填

说明

elementName

String

元素编码/名称

elementType

String

元素类型,默认 DPL

folderId

String

文件夹 ID

elementId

String | Integer

元素 ID

path

String

仓库内路径文字

description

String

单行描述文案

目录类定位时 folderIdpath 须二选一提供其一。

ScheduleOverview(字段 schedule

字段

类型

必填

说明

name

String

定时计划展示名

crontab

String

Cron 表达式

scheduleId

String

计划在平台侧的稳定 ID

lastRunAt

Integer

上次应运行时间戳(毫秒)

nextRunAt

Integer

下次应运行时间戳(毫秒)

ReleaseOverview(字段 lastRelease

字段

类型

必填

说明

user

Object

JSON 可选对象:userIduserNameemailmobilePhonenickNameavatarssoUserstatus 等键均可单独缺失

time

String

ISO 8601 形式的发布时间

RunOverview(字段 lastRun

字段

类型

必填

说明

runId

String

实例 ID

triggerType

String

APISCHEDULEDSYNCASYNC

runStatus

String

STARTEDSUCCESSFAILURECANCELEDQUEUED

duration

Integer

耗时(毫秒)

createTime / startTime / endTime

Integer

时间戳(毫秒)

version

String

本次运行所用数据流版本号

runName

String

实例名称

summary

String

单行摘要


按条件分页查询某一数据流下(或与之关联)的运行实例列表,并返回各状态计数。

请求

项目

HTTP Method

POST

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/monitor/runs

Content-Type

application/json(请求体必填)

Header Parameters

参数名

位置

类型

必填

说明

app

Header

String

应用 ID

space

Header

String

空间 ID

user

Header

String

用户 ID

language

Header

String

语言

token

Header

String

签名

cookie

Header

String

Cookie

Request Body(PagedRunsFilter

字段

类型

必填

说明

pageNo

Integer

页码,minimum 0

pageSize

Integer

每页条数:> 0maximum 100

elementName

String

数据流元素编码/名称

elementType

String

默认 DPL

folderId / path / elementId

视类型

过滤用元素定位:elementId 可选;若以目录维度过滤则 folderIdpath 二选一必填其一

runStatusFilter

String[]

STARTEDSUCCESSFAILURECANCELEDQUEUED

triggerFilter

String[]

APISCHEDULEDSYNCASYNC

orderBy

Array

RunOrderInfo[]name 为字段名,asc 是否升序(默认 true)。可排序字段包括 runIdstartTimeendTimedurationrunStatus

versionFilter

String[]

按版本过滤

quickSearch

String

快捷搜索

colFilters

Array

列过滤:StringColFilter / IntervalColFiltername 区分 runIdinitiatorstartTime 等)

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[MonitorRunResp]

字段

类型

说明

status

Boolean

接口调用是否成功(规范必填)

code

Integer

业务/错误码

message

String

提示文案

data

Object

MonitorRunResptotalCounttotalPagerunsqueuedCountstartedCountsuccessCountfailureCountcanceledCount

MonitorRunResp

字段

类型

说明

totalCount

Integer

条目总数

totalPage

Integer

总页数

runs

Array

RunInfo[],实例列表

queuedCount / startedCount / successCount / failureCount / canceledCount

Integer

各状态下实例数量(默认值 0)

RunInfo

字段

类型

必填

说明

runId

String

实例 ID

triggerType

String

APISCHEDULEDSYNCASYNC

runStatus

String

STARTEDSUCCESSFAILURECANCELEDQUEUED

duration

Integer

耗时(毫秒)

createTime / startTime / endTime

Integer

时间戳(毫秒)

version

String

对应数据流元素版本

runName

String

实例名称

runParam

String

运行参数(序列化形式,以实现为准)

summary

String

摘要

initiator

Object

JSON 可选对象:userIduserNameemailmobilePhonenickNameavatarssoUserstatus 等均可单独缺失


实例 ID 查询单个运行的展示用摘要信息。

请求

项目

HTTP Method

GET

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/monitor/run

Query Parameters

参数名

类型

必填

说明

runId

String

实例 ID

规范未列出 Header;若部署通过 Cookie 或服务网格注入用户信息,则由运行环境决定是否仍需手工传入 app / space / user

可选用 Header

参数名

位置

类型

必填

说明

app

Header

String

视环境

应用 ID

space

Header

String

视环境

空间 ID

user

Header

String

视环境

用户 ID

language

Header

String

语言

token

Header

String

签名

cookie

Header

String

Cookie

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[RunDisplayResp]

字段

类型

说明

status

Boolean

接口调用是否成功(规范必填)

code

Integer

业务/错误码

message

String

提示文案

data

Object

JSON 载荷;键包括必填项 runIdtriggerTyperunStatus,以及可选项 durationcreateTimestartTimeendTimeversionrunNamerunParamsummaryelementNameelementDescriptionfolderIdinitiator

RunDisplayResp

字段

类型

必填

说明

runId

String

实例 ID

triggerType

String

APISCHEDULEDSYNCASYNC

runStatus

String

STARTEDSUCCESSFAILURECANCELEDQUEUED

duration

Integer

耗时(毫秒)

createTime / startTime / endTime

Integer

时间戳(毫秒)

version

String

数据流元素版本

runName

String

实例名称

runParam

String

运行参数

summary

String

摘要

elementName

String

数据流元素名

elementDescription

String

数据流元素描述

folderId

String

与该实例关联展示的文件夹 ID

initiator

Object

JSON 可选对象:userIduserNameemailmobilePhonenickNameavatarssoUserstatus 等均可单独缺失


提交图与配置以发布到目标版本;请求体为 DeployFlowReqRenameDeployFlowReq 二选一。

请求

项目

HTTP Method

POST

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/flow/deploy

Content-Type

application/json(请求体必填)

Header Parameters

参数名

位置

类型

必填

说明

app

Header

String

应用 ID

space

Header

String

空间 ID

user

Header

String

用户 ID

language

Header

String

语言

token

Header

String

签名

cookie

Header

String

Cookie

Request Body

一、DeployFlowReq(新建/完整发布草稿)

字段

类型

必填

说明

elementName

String

元素编码

revisionName

String

版本名

flow

Object

Graphedgesvertexes

configure

Object

Configure:含 triggerserrorHandlerspreluderunNameTemplaterunTimeoutlogContentfileCleanup 等顶层键

elementType

String

默认 DPL

folderId / path / elementId

视类型

元素定位:folderIdpath 二选一

description

Object

ElementDescription(本地化显示名:zh-cnen 等)

二、RenameDeployFlowReq(对已发布版本的改名等轻量场景)

字段

类型

必填

说明

elementName

String

元素编码

revisionName

String

版本名

elementType

String

默认 DPL

folderId

String

文件夹 ID

path

String

仓库路径

elementId

String | Integer

元素 ID

folderIdpath 若以目录维度参与定位,须二选一提供其一。

Configure(字段摘要,对象 configure

字段

类型

说明

triggers

Array

触发器列表项为 manual / cron 等 discriminator 分支

errorHandlers

Object

ErrorHandler:邮件 / 机器人 / 自定义处理方式

prelude

String

前言脚本正文

runNameTemplate

String

实例名称模板

runTimeout

Object

超时策略:type 可取 systemcustomizedsentinel,细节由各自子 schema 给出

logContent

String

CONCISE / DETAILED,默认后者

fileCleanup

Object

运行产物清理:type 可取 keepdefault 等分支

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[List[ValidationResult]]

字段

类型

说明

status

Boolean

接口调用是否成功(规范必填)

code

Integer

业务/错误码

message

String

提示文案

data

Array

画布校验结果:ValidationResult 条目列表(可能为空数组)

data 数组元素(ValidationResult

字段

类型

必填

说明

errorCode

Integer

错误码

nodeId

String

相关节点 ID

nodeType

String

节点类型

errorMessage

String

错误信息

severity

Integer

严重级别

operatorId / operatorType

String

算子相关信息

path

Array

路径信息(可为 string / integer 混合)


对多个元素依次执行发布类操作;请求体为 FlowElementInfo 数组

请求

项目

HTTP Method

POST

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/flow/batch-deploy

Content-Type

application/json(请求体必填;Body 为 JSON 数组)

Header Parameters

参数名

位置

类型

必填

说明

app

Header

String

应用 ID

space

Header

String

空间 ID

user

Header

String

用户 ID

language

Header

String

语言

token

Header

String

签名

cookie

Header

String

Cookie

Request Body

JSON 数组,元素类型均为 FlowElementInfo

字段

类型

必填

说明

elementName

String

元素编码/名称

elementType

String

元素类型,默认 DPL

folderId

String

文件夹 ID

path

String

仓库路径

elementId

String | Integer

元素 ID

若以目录维度定位,folderIdpath 须二选一提供其一。

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[List[BatchDeployResult]]

字段

类型

说明

status

Boolean

接口调用是否成功(规范必填)

code

Integer

业务/错误码

message

String

提示文案

data

Array

BatchDeployResult 条目列表

BatchDeployResult

字段

类型

必填

说明

element

Object

FlowElementInfoelementName 必填,可选 elementType(默认 DPL)、folderIdpathelementId;若以目录维度定位则 folderIdpath 二选一

result

String

DeployResultTypeOKERRORSKIP

message

String

结果说明


将指定数据流元素置为启用(具体业务状态以实现为准);请求体为单个 FlowElementInfo

请求

项目

HTTP Method

POST

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/flow/activate

Content-Type

application/json(请求体必填)

Header Parameters

参数名

位置

类型

必填

说明

app

Header

String

应用 ID

space

Header

String

空间 ID

user

Header

String

用户 ID

language

Header

String

语言

token

Header

String

签名

cookie

Header

String

Cookie

Request Body:FlowElementInfo

字段

类型

必填

说明

elementName

String

元素编码/名称

elementType

String

元素类型,默认 DPL

folderId

String

文件夹 ID

path

String

仓库路径

elementId

String | Integer

元素 ID

若以目录维度定位,folderIdpath 须二选一提供其一。

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[FlowStatus]

字段

类型

说明

status

Boolean

接口调用是否成功(规范必填)

code

Integer

业务/错误码

message

String

提示文案

data

String

枚举字面量:DRAFTAPPROVEDOBSOLETE


将指定数据流元素停用;请求体为单个 FlowElementInfo

请求

项目

HTTP Method

POST

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/flow/deactivate

Content-Type

application/json(请求体必填)

Header Parameters

参数名

位置

类型

必填

说明

app

Header

String

应用 ID

space

Header

String

空间 ID

user

Header

String

用户 ID

language

Header

String

语言

token

Header

String

签名

cookie

Header

String

Cookie

Request Body:FlowElementInfo

字段

类型

必填

说明

elementName

String

元素编码/名称

elementType

String

元素类型,默认 DPL

folderId

String

文件夹 ID

path

String

仓库路径

elementId

String | Integer

元素 ID

若以目录维度定位,folderIdpath 须二选一提供其一。

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[FlowStatus]

字段

类型

说明

status

Boolean

接口调用是否成功(规范必填)

code

Integer

业务/错误码

message

String

提示文案

data

String

枚举字面量:DRAFTAPPROVEDOBSOLETE


对多个元素依次执行停用操作;请求体为 FlowElementInfo JSON 数组;响应中为每个元素的执行结果条目。

请求

项目

HTTP Method

POST

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/flow/batch-deactivate

Content-Type

application/json(请求体必填;Body 为 JSON 数组)

Header Parameters

参数名

位置

类型

必填

说明

app

Header

String

应用 ID

space

Header

String

空间 ID

user

Header

String

用户 ID

language

Header

String

语言

token

Header

String

签名

cookie

Header

String

Cookie

Request Body

JSON 数组;每一项字段表与 FlowElementInfo 一致:

字段

类型

必填

说明

elementName

String

元素编码/名称

elementType

String

元素类型,默认 DPL

folderId

String

文件夹 ID

path

String

仓库路径

elementId

String | Integer

元素 ID

若以目录维度定位,folderIdpath 须二选一提供其一。

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[List[BatchDeployResult]]

字段

类型

说明

status

Boolean

接口调用是否成功(规范必填)

code

Integer

业务/错误码

message

String

提示文案

data

Array

BatchDeployResult 列表

BatchDeployResult

字段

类型

必填

说明

element

Object

FlowElementInfoelementName 必填,可选 elementType(默认 DPL)、folderIdpathelementId;若以目录维度定位则 folderIdpath 二选一

result

String

DeployResultTypeOKERRORSKIP

message

String

单行结果说明


返回当前空间中数据流元素的列表项,每条包含进行中执行计数与可选的多语言标题。

请求

项目

HTTP Method

GET

Path

https://xxxx.com/deepfos-server/deep-pipeline-server3-0/flow/list

本路径在 OpenAPI 中既不声明 Query 字符串也不列出 Header:集成方若通过网关 Cookie、反向代理重写或 sidecar 注入会话,应保持与控制台相同的访问链路。

响应(HTTP 200,Content-Type:application/json

信封(RespGenericModel[List[FlowElementInfoWithInProcess]]

字段

类型

说明

status

Boolean

接口调用是否成功(规范必填)

code

Integer

业务/错误码

message

String

提示文案

data

Array

FlowElementInfoWithInProcess 列表

FlowElementInfoWithInProcess

字段

类型

必填

说明

elementName

String

元素编码

inProcessCount

Integer

平台统计的未完成执行个数(通常为同进程或直接托管的运行)

elementType

String

默认 DPL

folderId

String

文件夹 ID

path

String

仓库路径

elementId

String

Integer

description

Object

ElementDescriptionzh-cnen 等键为人类可读标题

若以目录维度定位,folderIdpath 在同一对象中须二选一且不可同时省略。

回到顶部

咨询热线

400-821-9199

我们使用 ChatGPT,基于文档中心的内容以及对话上下文回答您的问题。

ctrl+Enter to send