文档中心开发者中心Python SDK数据表与数据库客户端

数据表与数据库客户端

本文档说明 DataTable 系列与数据库客户端 Python SDK 的常用用法,适用于通过代码方式操作 DeepModel、MySQL 和 ClickHouse 数据表。

说明:deepfos.element.datatabledeepfos.db 是 Python SDK 中的数据访问封装。其中数据库客户端(deepfos.db)是基于 deepfos.api.datatable 中的 API 能力进行封装,不是独立的对外 HTTP 组件服务。


用于操作存储在 DeepModel 中的数据表。

用于操作存储在 MySQL 中的数据表。

用于操作存储在 ClickHouse 中的数据表。

用于操作 DeepModel 数据库中的表数据。

用于操作 MySQL 数据库中的表数据。

用于操作 ClickHouse 数据库中的表数据。


Copy
from deepfos.element.datatable import DataTableDeepModel

table = DataTableDeepModel(
    element_name='MDM_Interface_Run_Log_T',
    path='/Application/98_Interfaces_manage/02_Interface_Tables/MDM_Interface_Data_Table/'
)

适用场景:

  • 操作 DeepModel 表

  • 写入接口日志表

  • 记录差异数据

Copy
from deepfos.element.datatable import DataTableMySQL

ACTUAL_01 = DataTableMySQL("ACTUAL_01")
ExchangeRate = DataTableMySQL("ExchangeRate_01")

适用场景:

  • 读取贴源表

  • 查询 MySQL 映射表

  • 维护展示表数据

Copy
from deepfos.element.datatable import DataTableClickHouse

table = DataTableClickHouse('table_name')

适用场景:

  • 操作 ClickHouse 明细表

  • 查询分析型结果表

Copy
from deepfos.element.datatable import DataTableDeepModel

B0305_table = DataTableDeepModel('B0305', path='/Application/00_JJHotel/05_DATA_DeepModel/DM01/')
df = B0305_table.select(columns=['name', 'description_zh_cn', 'is_base'])

适用场景:

  • 读取映射表

  • 加载配置表

Copy
from deepfos.element.datatable import DataTableDeepModel

tbl_conf = DataTableDeepModel('A0101').table_name
sql_query = f"""
    SELECT \"TY_Account\", \"ZX_Account\"
    FROM \"{tbl_conf}\"
"""

适用场景:

  • 与数据库客户端配合查询

  • 动态拼接 SQL

Copy
from deepfos.element.datatable import DataTableDeepModel
import pandas as pd
import datetime

log_table = DataTableDeepModel(
    element_name='MDM_Interface_Run_Log_T',
    path='/Application/98_Interfaces_manage/02_Interface_Tables/MDM_Interface_Data_Table/'
)

log_df = pd.DataFrame({
    'user': ['admin'],
    'run_mode': ['手工执行'],
    'start_time': [datetime.datetime.now()],
    'end_time': [datetime.datetime.now()],
    'execution_time': [12.5],
    'status': ['成功'],
    'error_massage': ['']
})

log_table.insert_df(dataframe=log_df)

适用场景:

  • 记录接口执行结果

  • 记录批处理运行日志

Copy
from deepfos.element.datatable import DataTableDeepModel

B0305_table = DataTableDeepModel('B0305', path='/Application/00_JJHotel/05_DATA_DeepModel/DM01/')
B0305_table.insert_df(
    dataframe=df_entity,
    updatecol=['description_zh_cn', 'is_base'],
    conflict_target=['name']
)

适用场景:

  • 同步维度属性

  • 更新主数据映射关系

Copy
from deepfos.element.datatable import DataTableMySQL

MAP_ENTITY = DataTableMySQL("JD_ENTITY")
MAP_ENTITY.insert_df(add_entity_map)

适用场景:

  • 新增组织映射

  • 补录部门、品牌、渠道映射


Copy
from deepfos.db import DeepModelClient

client = DeepModelClient()

适用场景:

  • 查询 DeepModel 配置表

  • 读取中间表数据

Copy
from deepfos.db import MySQLClient

client = MySQLClient()

适用场景:

  • 查询贴源表和映射表

  • 清理 MySQL 目标表

Copy
from deepfos.db import ClickHouseClient

client = ClickHouseClient()

适用场景:

  • 查询 ClickHouse 明细数据

  • 读取分析型宽表

Copy
from deepfos.db import DeepModelClient
from deepfos.element.datatable import DataTableDeepModel

client = DeepModelClient()
tbl_conf = DataTableDeepModel('A0101').table_name

sql_query = f"""
    SELECT \"TY_Account\", \"ZX_Account\"
    FROM \"{tbl_conf}\"
"""

df_conf = client.query_dfs(sql_query)

适用场景:

  • 读取通月专项映射关系

  • 加载业务配置

Copy
from deepfos.db import MySQLClient
from deepfos.element.datatable import DataTableMySQL

client = MySQLClient()
MAP_ACCOUNT = DataTableMySQL("JD_ACCOUNT")

sql = f"""
    SELECT FACCOUNTID, MGMT AS ACCOUNT, MGMT_FLAG AS MAPPING_FLAG
    FROM {MAP_ACCOUNT.table_name}
"""

map_data = client.query_dfs(sql)

适用场景:

  • 读取金蝶科目映射

  • 加载组织、部门、品牌映射

Copy
from deepfos.db import MySQLClient
from deepfos.element.datatable import DataTableMySQL

client = MySQLClient()
ExchangeRate = DataTableMySQL("ExchangeRate_01")

sql = f"""
    SELECT SourceCurrency AS CURRENCY, `{period}` AS RATE
    FROM {ExchangeRate.table_name}
    WHERE FYEAR = {year} AND TargetCurrency = 'PRE001'
"""

df_rate = client.query_dfs(sql)

适用场景:

  • 读取期间汇率用于本位币转换

Copy
from deepfos.db import MySQLClient
from deepfos.element.datatable import DataTableMySQL

client = MySQLClient()
ACTUAL_02 = DataTableMySQL("ACTUAL_02")

delete_sql = f"""
    DELETE FROM {ACTUAL_02.table_name}
    WHERE FYEAR = {year} AND FPERIOD = {period}
"""

client.exec_sqls(sqls=delete_sql)

适用场景:

  • ETL 前清理展示表

  • 重跑时覆盖历史期间

Copy
from deepfos.db import MySQLClient

client = MySQLClient()
tbl_request_records = "${interface_run_logs}"

sql = f"""
    SELECT id FROM {tbl_request_records}
    WHERE year = '{year}' AND period = '{period}'
      AND status = 'Running' AND code = 'JD_ACTUAL01'
"""

df_request_records = client.query_dfs(sql)

if not df_request_records.empty:
    print("接口正在运行中,跳过本次执行")

适用场景:

  • 防止重复执行接口

  • 调度状态校验

Copy
from deepfos.db import DeepModelClient
from deepfos.element.datatable import DataTableDeepModel

sql_client = DeepModelClient()
table_interface_log = DataTableDeepModel('interface_log')

sql_list = []
for _, row in interface_log_data_all.iterrows():
    sql = f"""
        UPDATE {table_interface_log.table_name}
        SET execute_time='{row['execute_time']}',
            execute_status='{row['execute_status']}',
            execute_user='{row['execute_user']}'
        WHERE year='{row['year']}'
          AND period='{row['period']}'
          AND scenario='{row['scenario']}'
          AND version='{row['version']}'
          AND entity='{row['entity']}'
          AND py_name='{row['py_name']}'
    """
    sql_list.append(sql)

sql_client.exec_sqls(sql_list)

适用场景:

  • 批量更新接口执行状态

  • 记录执行时间和用户


DataTable 与数据库客户端均可操作底层数据表,但设计目标和适用场景有所不同。

特性

DataTable

数据库客户端

查询方式

通过 select() 方法,不写 SQL

通过 query_dfs() 执行任意 SQL

写入方式

通过 insert_df() 等方法

通过 exec_sqls() 执行 UPDATE/DELETE/INSERT SQL

类型保持

保持原始数据类型,不会因 JSON 转换丢失精度

数据经过 JSON 序列化/反序列化,数值精度可能受影响

适用场景

单表 CRUD、映射表维护、日志记录

复杂联表查询、批量清理、动态条件过滤

  • 只需操作单张表,无需编写 SQL

  • 需要保持原始数据类型(如 Decimal 精度)

  • 维护映射表、日志表、配置表等结构稳定的表

  • 需要 insert_df() 的冲突更新语义(conflict_target + updatecol)

  • 需要执行复杂联表查询或多表 JOIN

  • 需要动态拼接 WHERE 条件或 ORDER BY 逻辑

  • 需要批量清理历史数据(DELETE with WHERE 条件)

  • 需要使用 ${} 占位符引用平台变量表(如 ${interface_run_logs}

  • 查询结果需要参与 Cube 维表映射等业务逻辑

DataTable 可与数据库客户端配合使用,发挥各自优势:

  • DataTable 获取表名,数据库客户端执行 SQL:用 DataTable 的 table_name 解析物理表名,再交给数据库客户端拼接和执行

  • 数据库客户端负责查询,DataTable 负责写入:用数据库客户端执行复杂查询获取数据,再用 DataTable 的 insert_df() 写入目标表

  • 查询用数据库客户端,冲突更新用 DataTable:复杂查询后,需要带冲突更新语义(updatecol + conflict_target)写入时,切换到 DataTable

Copy
from deepfos.element.datatable import DataTableMySQL
from deepfos.db import MySQLClient

client = MySQLClient()

# 数据库客户端执行复杂查询
sql = """
    SELECT a.YEAR, a.PERIOD, a.ACCOUNT, SUM(a.AMOUNT) AS TOTAL_AMOUNT
    FROM ACTUAL_01 a
    INNER JOIN JD_ACCOUNT b ON a.ACCOUNT = b.FACCOUNTID
    WHERE a.YEAR = '2025' AND b.MAPPING_FLAG = 'Y'
    GROUP BY a.YEAR, a.PERIOD, a.ACCOUNT
"""

df_actual = client.query_dfs(sql)

# DataTable 写入目标表(带冲突更新语义)
ACTUAL_02 = DataTableMySQL("ACTUAL_02")
ACTUAL_02.insert_df(
    dataframe=df_actual,
    updatecol=['TOTAL_AMOUNT'],
    conflict_target=['YEAR', 'PERIOD', 'ACCOUNT']
)

复杂联表查询或需要动态 SQL 拼接时,优先选择数据库客户端;简单单表读写或需要保持类型安全时,优先选择 DataTable。两者可配合使用,数据库客户端负责查询,DataTable 负责写入和冲突更新。

回到顶部

咨询热线

400-821-9199

我们使用 ChatGPT,基于文档中心的内容以及对话上下文回答您的问题。

ctrl+Enter to send