背景:系统上线迁移旧系统的未完结合同、未完结发票等业务数据
核心组件:数据流3.0
、连接器
、数据表
模拟发票和合同数据:ap_invoice.csvcontract.csv
使用连接器,连接旧系统数据库。
查看模拟数据:
在本系统中,创建数据表元素,用于承接数据,以 MySQL 数据表为例。
在开始节点后,依次添加数据集-连接器查询
、数据表操作
。
连接器查询
节点用于获取旧系统的数据。
修改节点名称:
节点配置,在查询SQL
框中输入需要查询的 SQL 语句即可,可以点击右上角的调试,查看本节点的运行结果。
进阶:使用参数和变量
如果节点之间有执行的前后顺序,那么后续节点可以引用前序节点的运行结果,引用方式用Operator.nodeX.data,还支持更多变量的引用,详见DPL_变量体系整理。
举个例子,本场景中,假设仅迁移 2024年及之后的合同,迁移的发票为关联了这些合同的,那么在查询发票数据时,需要引用到查询合同数据的结果:
用于在新系统中新增数据。
若您使用的系统还未发布数据表操作
节点,可以使用PY代码
节点替代这部分逻辑。
若您使用的系统还未发布数据表操作
节点,可以使用PY代码
节点用于在新系统中新增数据,相关 SDK 文档:操作数据库 — DeepFOS
节点配置,直接在节点中输入 Python 代码即可,如需引用变量,详见DPL_变量体系整理。
本节点引用了前序连接器查询
节点的输出结果,因此使用了变量Operator.nodeX.data
。
示例代码:
from deepfos.element.datatable import DataTableMySQL
import pandas as pd
contract = DataTableMySQL(element_name="CONTRACT",path='/0101/')
t = contract.table
insert_data = pd.DataFrame(Operator.ds_conn_query1_copy.data.order("contract_id").df().to_dict(orient='list'))
# 先删后插
contract.delete(where = t.contract_id > 0 )
contract.insert_df(insert_data)
contract.select()
from deepfos.element.datatable import DataTableMySQL
import pandas as pd
ap_invoice = DataTableMySQL(element_name="AP_INVOICE",path='/0101/')
t = ap_invoice.table
insert_data = pd.DataFrame(Operator.ds_conn_query1.data.order("contract_id").df().to_dict(orient='list'))
# 先删后插
ap_invoice.delete(where = t.contract_id > 0 )
ap_invoice.insert_df(insert_data)
ap_invoice.select()
回到顶部
咨询热线