本文档说明 DataTable 系列与数据库客户端 Python SDK 的常用用法,适用于通过代码方式操作 DeepModel、MySQL 和 ClickHouse 数据表。
说明:deepfos.element.datatable 和 deepfos.db 是 Python SDK 中的数据访问封装。其中数据库客户端(deepfos.db)是基于 deepfos.api.datatable 中的 API 能力进行封装,不是独立的对外 HTTP 组件服务。
用于操作存储在 DeepModel 中的数据表。
用于操作存储在 MySQL 中的数据表。
用于操作存储在 ClickHouse 中的数据表。
用于操作 DeepModel 数据库中的表数据。
用于操作 MySQL 数据库中的表数据。
用于操作 ClickHouse 数据库中的表数据。
from deepfos.element.datatable import DataTableDeepModel
table = DataTableDeepModel(
element_name='MDM_Interface_Run_Log_T',
path='/Application/98_Interfaces_manage/02_Interface_Tables/MDM_Interface_Data_Table/'
)
适用场景:
操作 DeepModel 表
写入接口日志表
记录差异数据
from deepfos.element.datatable import DataTableMySQL
ACTUAL_01 = DataTableMySQL("ACTUAL_01")
ExchangeRate = DataTableMySQL("ExchangeRate_01")
适用场景:
读取贴源表
查询 MySQL 映射表
维护展示表数据
from deepfos.element.datatable import DataTableClickHouse
table = DataTableClickHouse('table_name')
适用场景:
操作 ClickHouse 明细表
查询分析型结果表
from deepfos.element.datatable import DataTableDeepModel
B0305_table = DataTableDeepModel('B0305', path='/Application/00_JJHotel/05_DATA_DeepModel/DM01/')
df = B0305_table.select(columns=['name', 'description_zh_cn', 'is_base'])
适用场景:
读取映射表
加载配置表
from deepfos.element.datatable import DataTableDeepModel
tbl_conf = DataTableDeepModel('A0101').table_name
sql_query = f"""
SELECT \"TY_Account\", \"ZX_Account\"
FROM \"{tbl_conf}\"
"""
适用场景:
与数据库客户端配合查询
动态拼接 SQL
from deepfos.element.datatable import DataTableDeepModel
import pandas as pd
import datetime
log_table = DataTableDeepModel(
element_name='MDM_Interface_Run_Log_T',
path='/Application/98_Interfaces_manage/02_Interface_Tables/MDM_Interface_Data_Table/'
)
log_df = pd.DataFrame({
'user': ['admin'],
'run_mode': ['手工执行'],
'start_time': [datetime.datetime.now()],
'end_time': [datetime.datetime.now()],
'execution_time': [12.5],
'status': ['成功'],
'error_massage': ['']
})
log_table.insert_df(dataframe=log_df)
适用场景:
记录接口执行结果
记录批处理运行日志
from deepfos.element.datatable import DataTableDeepModel
B0305_table = DataTableDeepModel('B0305', path='/Application/00_JJHotel/05_DATA_DeepModel/DM01/')
B0305_table.insert_df(
dataframe=df_entity,
updatecol=['description_zh_cn', 'is_base'],
conflict_target=['name']
)
适用场景:
同步维度属性
更新主数据映射关系
from deepfos.element.datatable import DataTableMySQL
MAP_ENTITY = DataTableMySQL("JD_ENTITY")
MAP_ENTITY.insert_df(add_entity_map)
适用场景:
新增组织映射
补录部门、品牌、渠道映射
from deepfos.db import DeepModelClient
client = DeepModelClient()
适用场景:
查询 DeepModel 配置表
读取中间表数据
from deepfos.db import MySQLClient
client = MySQLClient()
适用场景:
查询贴源表和映射表
清理 MySQL 目标表
from deepfos.db import ClickHouseClient
client = ClickHouseClient()
适用场景:
查询 ClickHouse 明细数据
读取分析型宽表
from deepfos.db import DeepModelClient
from deepfos.element.datatable import DataTableDeepModel
client = DeepModelClient()
tbl_conf = DataTableDeepModel('A0101').table_name
sql_query = f"""
SELECT \"TY_Account\", \"ZX_Account\"
FROM \"{tbl_conf}\"
"""
df_conf = client.query_dfs(sql_query)
适用场景:
读取通月专项映射关系
加载业务配置
from deepfos.db import MySQLClient
from deepfos.element.datatable import DataTableMySQL
client = MySQLClient()
MAP_ACCOUNT = DataTableMySQL("JD_ACCOUNT")
sql = f"""
SELECT FACCOUNTID, MGMT AS ACCOUNT, MGMT_FLAG AS MAPPING_FLAG
FROM {MAP_ACCOUNT.table_name}
"""
map_data = client.query_dfs(sql)
适用场景:
读取金蝶科目映射
加载组织、部门、品牌映射
from deepfos.db import MySQLClient
from deepfos.element.datatable import DataTableMySQL
client = MySQLClient()
ExchangeRate = DataTableMySQL("ExchangeRate_01")
sql = f"""
SELECT SourceCurrency AS CURRENCY, `{period}` AS RATE
FROM {ExchangeRate.table_name}
WHERE FYEAR = {year} AND TargetCurrency = 'PRE001'
"""
df_rate = client.query_dfs(sql)
适用场景:
读取期间汇率用于本位币转换
from deepfos.db import MySQLClient
from deepfos.element.datatable import DataTableMySQL
client = MySQLClient()
ACTUAL_02 = DataTableMySQL("ACTUAL_02")
delete_sql = f"""
DELETE FROM {ACTUAL_02.table_name}
WHERE FYEAR = {year} AND FPERIOD = {period}
"""
client.exec_sqls(sqls=delete_sql)
适用场景:
ETL 前清理展示表
重跑时覆盖历史期间
from deepfos.db import MySQLClient
client = MySQLClient()
tbl_request_records = "${interface_run_logs}"
sql = f"""
SELECT id FROM {tbl_request_records}
WHERE year = '{year}' AND period = '{period}'
AND status = 'Running' AND code = 'JD_ACTUAL01'
"""
df_request_records = client.query_dfs(sql)
if not df_request_records.empty:
print("接口正在运行中,跳过本次执行")
适用场景:
防止重复执行接口
调度状态校验
from deepfos.db import DeepModelClient
from deepfos.element.datatable import DataTableDeepModel
sql_client = DeepModelClient()
table_interface_log = DataTableDeepModel('interface_log')
sql_list = []
for _, row in interface_log_data_all.iterrows():
sql = f"""
UPDATE {table_interface_log.table_name}
SET execute_time='{row['execute_time']}',
execute_status='{row['execute_status']}',
execute_user='{row['execute_user']}'
WHERE year='{row['year']}'
AND period='{row['period']}'
AND scenario='{row['scenario']}'
AND version='{row['version']}'
AND entity='{row['entity']}'
AND py_name='{row['py_name']}'
"""
sql_list.append(sql)
sql_client.exec_sqls(sql_list)
适用场景:
批量更新接口执行状态
记录执行时间和用户
DataTable 与数据库客户端均可操作底层数据表,但设计目标和适用场景有所不同。
|
特性 |
DataTable |
数据库客户端 |
|---|---|---|
|
查询方式 |
通过 |
通过 |
|
写入方式 |
通过 |
通过 |
|
类型保持 |
保持原始数据类型,不会因 JSON 转换丢失精度 |
数据经过 JSON 序列化/反序列化,数值精度可能受影响 |
|
适用场景 |
单表 CRUD、映射表维护、日志记录 |
复杂联表查询、批量清理、动态条件过滤 |
只需操作单张表,无需编写 SQL
需要保持原始数据类型(如 Decimal 精度)
维护映射表、日志表、配置表等结构稳定的表
需要 insert_df() 的冲突更新语义(conflict_target + updatecol)
需要执行复杂联表查询或多表 JOIN
需要动态拼接 WHERE 条件或 ORDER BY 逻辑
需要批量清理历史数据(DELETE with WHERE 条件)
需要使用 ${} 占位符引用平台变量表(如 ${interface_run_logs})
查询结果需要参与 Cube 维表映射等业务逻辑
DataTable 可与数据库客户端配合使用,发挥各自优势:
DataTable 获取表名,数据库客户端执行 SQL:用 DataTable 的 table_name 解析物理表名,再交给数据库客户端拼接和执行
数据库客户端负责查询,DataTable 负责写入:用数据库客户端执行复杂查询获取数据,再用 DataTable 的 insert_df() 写入目标表
查询用数据库客户端,冲突更新用 DataTable:复杂查询后,需要带冲突更新语义(updatecol + conflict_target)写入时,切换到 DataTable
from deepfos.element.datatable import DataTableMySQL
from deepfos.db import MySQLClient
client = MySQLClient()
# 数据库客户端执行复杂查询
sql = """
SELECT a.YEAR, a.PERIOD, a.ACCOUNT, SUM(a.AMOUNT) AS TOTAL_AMOUNT
FROM ACTUAL_01 a
INNER JOIN JD_ACCOUNT b ON a.ACCOUNT = b.FACCOUNTID
WHERE a.YEAR = '2025' AND b.MAPPING_FLAG = 'Y'
GROUP BY a.YEAR, a.PERIOD, a.ACCOUNT
"""
df_actual = client.query_dfs(sql)
# DataTable 写入目标表(带冲突更新语义)
ACTUAL_02 = DataTableMySQL("ACTUAL_02")
ACTUAL_02.insert_df(
dataframe=df_actual,
updatecol=['TOTAL_AMOUNT'],
conflict_target=['YEAR', 'PERIOD', 'ACCOUNT']
)
复杂联表查询或需要动态 SQL 拼接时,优先选择数据库客户端;简单单表读写或需要保持类型安全时,优先选择 DataTable。两者可配合使用,数据库客户端负责查询,DataTable 负责写入和冲突更新。
回到顶部
咨询热线
