数据流

数据流 3.0 元素用于触发平台侧 Dagster 编排的数据流作业,并同步或异步获取运行结果。同步类为 DeepPipeline,异步类为 AsyncDeepPipeline,对应 API deepfos.api.deep_pipeline.DeepPipelineAPImodule_type = DPL)。

更多说明可参考在线文档:


Copy
from deepfos.element.deep_pipeline import DeepPipeline

pipe = DeepPipeline(
    element_name='PIPE_DEMO',
    folder_id='10086'
)

一次性提交并阻塞直到拿到结果(内部先 run_asyncresult)。

Copy
from deepfos.lib.constant import UNSET

# 无参数运行
out = pipe.run()

# 带参数(可为 dict 或与后端约定一致的结构)
out = pipe.run(parameter={"year": "2024", "period": "3"}, timeout=600)

# in_process:兼容参数;同步路径固定为同进程不带启停,详见源码说明
out = pipe.run(in_process=True)

适合长耗时作业:先取运行 ID,再轮询结果。

Copy
run_id = pipe.run_async(
    parameter={"batch": "A01"},
    in_process=False   # False:服务端带启停启动;True:同进程
)
result = pipe.result(run_id, timeout=1200)

result 会在部分错误码(如超时等待)下按 SDK 策略重试;最终失败可能映射为 RunFailedErrorRunTerminatedReleaseFlowTimeout 等(见 deepfos.exceptions)。


对多组参数并发发起多次 run_async,返回 运行 ID 列表

Copy
run_ids = pipe.run_batch(
    parameters=[
        {"year": "2024", "period": "1"},
        {"year": "2024", "period": "2"},
    ],
    in_process=False
)

for rid in run_ids:
    print(pipe.result(rid, timeout=600))

  • 参数类型parameter 可为任意可 JSON 序列化的结构,需与数据流发布版中定义的入参一致。

  • in_processrun_async / run_batchFalse 表示默认的带启停启动;同步 run 固定走同进程逻辑(详见源码 warnings)。

  • 超时timeout 为整段等待上限(秒);底层单次 HTTP 仍受全局 OPTION.api.timeout 约束。


  1. 运行前务必确认 has_approved_release,否则会直接报错。

  2. 大批量并行 run_batch 时注意服务端资源与并发限制。

  3. 与 REST 文档对照时,运行 ID 即接口 POST .../run 返回的任务标识,GET .../run/{id}/result 与之对应。

回到顶部

咨询热线

400-821-9199

我们使用 ChatGPT,基于文档中心的内容以及对话上下文回答您的问题。

ctrl+Enter to send