背景:每日定时从业务系统同步开票数据,并在同步后自动生成会计凭证
核心组件:数据流3.0
、连接器
、数据表
、会计引擎
模拟业务系统中的开票数据文件:ap_invoice.csv
使用连接器,连接旧系统数据库。
查看模拟数据:
在本系统中,创建数据表元素,用于承接数据,以 MySQL 数据表为例。
配置会计引擎,用于根据 AP_INVOICE 数据表的数据,生成会计凭证。
会计引擎相关配置详见对应组件的用户手册用户手册-20230306,此处粗略讲解。
新建业务模型(业务模型 V1.1 组件),作为会计引擎处理来源
新建凭证模型(业务模型 V1.1 组件),作为会计引擎输出对象
新建会计事件类型
新建会计引擎(从单据到凭证)
新建清单表 V1.2 和业务明细表 V1.1,用于展示生成的凭证
由于调试需要,可以在开始节点配置启动参数,传入固定日期,正式版本需要每天定时运行,就需要清空启动参数,在后续指定日期时用 Python 表达式取即时的系统日期
配置启动触发,开启定时,定时规则是每天凌晨1点钟。以实现每天凌晨 1 点同步前一天的发票数据并生成凭证。
连接器查询
节点用于获取旧系统中前一天的发票数据。
修改节点名称:
节点配置可采用两种模式:
UI模式:过滤条件的值(发票日期=前一天)可以用 Python 代码
SQL 模式:在查询SQL
框中输入需要查询的 SQL 语句即可
配置完成后,可以点击右上角的带前序节点一同调试
,查看本节点的运行结果。
进阶:使用参数和变量
如果节点之间有执行的前后顺序,那么后续节点可以引用前序节点的运行结果,引用方式用Operator.nodeX.data,还支持更多变量的引用,详见DPL_变量体系整理。
举个例子,本场景中,仅需查询当前日期前一天的数据,当前日期为启动参数,也是开始节点的输出结果,因此在查询 SQL 中需要引用Operator.start.data
。
用于在新系统中新增前一天的发票数据。
若您使用的系统还未发布数据表操作
节点,可以使用PY代码
节点替代这部分逻辑。
若您使用的系统还未发布数据表操作
节点,可以使用PY代码
节点用于在新系统中新增前一天的发票数据,相关 SDK 文档:操作数据库 — DeepFOS
本节点引用了前序连接器查询
节点的输出结果,因此使用了变量Operator.nodeX.data
。
示例代码:
from deepfos.element.datatable import DataTableMySQL
import pandas as pd
ap_invoice = DataTableMySQL(element_name="AP_INVOICE",path='/0101/')
t = ap_invoice.table
insert_data = pd.DataFrame(Operator.ds_conn_query1.data.order("invoice_id").df().to_dict(orient='list'))
# invoice_id字段需要int转varchar
insert_data['invoice_id'] = insert_data['invoice_id'].astype(str)
# 插数
ap_invoice.insert_df(insert_data)
ap_invoice.select()
再添加PY代码
节点用于在新系统中,根据同步过来的发票数据,调用会计引擎,生成凭证,相关 SDK 文档:会计/单据引擎 — DeepFOS
示例代码:
# 实例化
invoice = DataTableMySQL(element_name="AP_INVOICE", path="/0101/")
invoice_table = invoice.table
eventType = AccountingEventType('AP_INVOICE')
engines = AccountingEngines('AP_INVOICE')
# 指定开票日期=当前日期的前一天
cur_date = Operator.start.data['date'] if Operator.start.data is not None else datetime.datetime.now().date()
invoice_date = (datetime.strptime(cur_date, "%Y-%m-%d") - timedelta(days=1))
invoice_datetime_string = invoice_date.strftime("%Y-%m-%d %H:%M:%S")
# 查询指定开票日期的journal_id
columns = ["invoice_id"]
where = (invoice_table.invoice_date == invoice_date)
data = invoice.select(columns, where=where)
# 插入会计事件表
eventType.insert_to_event_table(list(data['invoice_id']))
# 执行会计引擎
engines.execute(filter_scope={"date": invoice_datetime_string})
回到顶部
咨询热线