当前版本2.6.2.22
2.6.0 适配了新的流程控制 以及 deepcube
2.6.1 适配了deepcube 2.0、配合合并流程控制迭代
2.6.2 增加钩子函数、增加con函数、结转EC时候可以按审计线索结转
2.6.2.1 异常处理修改bug
2.6.2.2 增加journal_data方法 增加自定义数据源
2.6.2.3 增加calc_journal_data con_journal_data方法 增加最小共父的LRU缓存
2.6.2.4 修改最小共父的算法 考虑父.子
2.6.2.5 deep_consol.EntityMD.list增加参数 增加缓存
2.6.2.6 调度程序调整(即使有投资关系的影响 也不允许跑出当前合并范围) config文件增加配置项(调度顺序是否依赖投资关系)
2.6.2.7 增加投资关系模型 组织架构模型支持批量保存 修改 ActionEnum 增加新的枚举值(consolidation_all)
2.6.2.8 合并开始时批量修改流程状态为未计算 未折算 未贡献
2.6.2.9 con函数支持polars dataframe、con增加round位数配置
2.6.2.10 结转ec增加 round配置
2.6.2.11 修复bug
2.6.2.12 新增分层调度模式 修复bug 结构优化
2.6.2.13 新增记录下级日志功能
2.6.2.14 新增合并下级功能
2.6.2.15 增加自定义结转EC钩子函数
2.6.2.16 增加celery并发调度模式,根据account维度的ud判断具体维度成员是否需要结转EC
2.6.2.17 适配维度新接口、对账逻辑修改、dc.EntityMD 新增参数 考虑是否激活
2.6.2.18 增加状态向上传播配置项、凭证生成性能优化
2.6.2.19 高精度round实现 增加定制化校验
2.6.2.20 修复bug
2.6.2.21 修复bug 新增参数 优化效率(此次更新更改了deepcube_config.py的部分内容 顾问需注意)
2.6.2.22 新增调度模式
合并SDK基本概念见此篇,建议阅读后再查看本文档。
合并算法整体设计
描述
该对象由组织模型(Entity_Hierarchy)实例化得到。
根据组织模型的规范设计,该对象能提供标准的属性/方法如下:
|
属性/方法 |
来源 |
入参结构 |
说明 |
|---|---|---|---|
|
member |
组织对象 |
子结构 |
也可使用Deepcube维度对象获取维度成员属性,例:entity[‘A’].attr(‘ud1’) |
|
currency | |||
|
principle | |||
|
isBase | |||
|
isICP | |||
|
name_zh_cn | |||
|
name_en | |||
|
entity_type |
组织层级 |
父子结构 |
注意,入参仅支持父子结构,若在value=entity currency等使用时,cur_entity为纯子,需自己拼上父。 |
|
goodwill | |||
|
consol_method | |||
|
share_percentage | |||
|
share_move_percentage | |||
|
common_ancestor | |||
|
is_base、is_child、is_descendant、is_ancestor、is_parent |
组织层级 |
子结构 |
纯子结构下可能返回多个parent |
|
list(base、child、parent、descendant、sibling、ancestor) |
描述
获取组织架构模型属性信息
语法
_
deep_consol.EntityMD.<property>(member : union[str, EntityStr], pov : dict,active_filter:boolean) -> str
|
参数 |
参数类型 |
参数含义 |
|---|---|---|
|
member |
union[str, EntityStr] |
当为非分层调度模式,传参为空时默认为运行时当前entity |
|
pov |
union[str, dict] |
传参为空时默认为当前运行的pov |
|
active_filter |
boolean |
是否考虑激活,默认为True,即只考虑已激活的实体 |
示例
if dc.EntityMD.entity_type() == 'holding' and dc.EntityMD.consol_method() == 's_non_ctrl_equ':
# 当前节点的兄弟节点
sublings = dc.EntityMD.list(v_entity, 'Sibling')
for each in sublings:
# 判断子公司的合并方法是否是非同控权益法
if dc.EntityMD.consol_method(each,{'period': prior_period, 'year': dc.cur_year, 'scenario': dc.cur_scenario, 'version': dc.cur_version}) == 's_non_ctrl_equ':
# 取持股比例
vShare = dc.EntityMD.share_percentage(each)
实现
|
版本 |
实现路径 |
备注 |
|---|---|---|
|
|
通过读取组织架构模型相关数据重写__getattr__动态创建相关方法 | |
对应HFM实现
由于HFM中没有类似关系库模型,取类似信息通过其他方式来实现
' 取控股公司
Owner=Hs.Entity.Owner
' 取被控股公司
Owned=Hs.Entity.Owned
' 取控股公司的币种
OwnerDefaultCurrency=Hs.Entity.DefCurrency(Owner)
If(OwnerDefaultCurrency<>"CNY") Then
OwnerDefaultCurrency="CNY"
End If
' 取控股公司币种,用于下述EPU计算时的判断
OwnerCurrency=Hs.Entity.DefCurrency(Owner)
' 取持股比例
vPown=Hs.GetCell("A#[Shares%Owned].E#"&Owned&".I#"&Owner&".V#[None].MM#[None].CP#[None].PJ#[None].ET#[None].BK1#[None].BK2#[None]")/100
vPmin=1-vPown
描述
获取组织架构模型中对应关系的成员列表
语法
deep_consol.EntityMD.list(member: union[str, EntityStr], relation: str, pov: dict = None, list_filter: dict,with_parent:boolean,active_filter:boolean) -> list[str, EntityStr]
|
参数 |
参数类型 |
参数含义 |
|---|---|---|
|
member |
union[str, EntityStr] |
member为具体的实体成员 |
|
relation |
str |
对应的层级关系 descendant``sibling``ancestor
|
|
pov |
dict |
传参为空时默认为当前运行的pov |
|
list_filter |
dict |
过滤条件 根据属性对获取到的成员列表进行过滤,传参为空默认不过滤 |
|
with_parent |
boolean |
是否需父.子对象 默认为None |
|
active_filter |
boolean |
是否考虑激活,默认为True,即只考虑已激活的实体 |
当member传参为父.子结构时,默认返回成员为父.子结构的成员列表(对象类型为EntityStr),除非指定with_parent参数
当member传参非父.子结构时,默认返回成员为单体结构的成员列表,除非指定with_parent参数
list_filter 作为关键词参数使用时需明确指定_**list_filter = {'entity_type':'subsidiary'}**_
示例
if dc.EntityMD.entity_type() == 'holding' and dc.EntityMD.consol_method() == 's_non_ctrl_equ':
# 获取当前节点的企业类型为子公司的兄弟节点
sublings = dc.EntityMD.list(v_entity, 'Sibling',list_filter={
'entity_type':' subsidiary'
})
for each in sublings:
# 判断子公司的合并方法是否是非同控权益法
if dc.EntityMD.consol_method(each) == 's_non_ctrl_equ':
# 取持股比例
vShare = dc.EntityMD.share_percentage(each)
dc.EntityMD.list('BC', 'base')
# 返回值 ['B', 'C']
dc.EntityMD.list('BC', 'base',with_parent=True)
# 返回值 ['[BC].[B]', '[BC].[C]']
dc.EntityMD.list('BC', 'base',with_parent=False)
# 返回值 ['B', 'C']
dc.EntityMD.list('[BC].[B]', 'sibling')
# 返回值 ['[BC].[C]']
dc.EntityMD.list('[BC].[B]', 'sibling',with_parent=True)
# 返回值 ['[BC].[C]']
dc.EntityMD.list('[BC].[B]', 'sibling',with_parent=False)
# 返回值 ['C']
实现
|
版本 |
实现路径 |
备注 |
|---|---|---|
|
|
通过读取组织架构模型相关数据构建图数据结构 |
对应HFM实现
Function getParent(rootEntity,CurEntity)
If HS.Entity.IsChild(rootEntity,CurEntity) Then
getParent = rootEntity
Exit Function
End If
aParent = HS.Entity.List("001_Cons","[Descendants]")
For m = LBound(aParent) to UBound(aParent)
If HS.Entity.IsChild(aParent(m),CurEntity) then
getParent = aParent(m)
Exit For
End If
Next
描述
判断两个成员组织层级模型中的关系
语法
deep_consol.EntityMD.is_base/is_child/is_descendant/is_ancestor/is_parent(member1: union[str,EntityStr], member2: union[str,EntityStr], pov: dict = None,active_filter:boolean)-> boolean
|
参数 |
参数类型 |
参数含义 |
|---|---|---|
|
member1 |
union[str, EntityStr] |
member1为具体的实体成员 |
|
member2 |
union[str, EntityStr] |
member2为具体的实体成员 |
|
pov |
dict |
传参为空时默认为当前运行的pov |
|
active_filter |
bool |
是否考虑激活,默认为True,即只考虑已激活的实体 |
当member1、member2为父.子结构时,考虑父.子所处的边的方向(只沿着特别的父级判断关系)。
示例
pl_df = main_cube.loc[account['PL'].Base(), partner['Internal'].Base(), value['ParentTotal'], audittrail[
'REPORT'], movement['BAL']].to_dataframe()
for index, row in pl_df.iterrows():
# 按行迭代判断
if dc.EntityMD.is_descendant(dc.EntityMD.parent(), row['partner']):
dc.con(row=row, dest=(
audittrail['REVCOS_ELIM'],
value["Elimination"]
), factor=-1, nature='内部交易抵销')
实现
|
版本 |
实现路径 |
备注 |
|---|---|---|
|
|
通过读取组织架构模型相关数据构建图数据结构 |
对应HFM实现
If vAcct= "140301" and HS.Entity.Isbase("SNGFHB","")=False Then
HS.Exp "A#"&vAcct&".I#"&vICP&".C1#"&vC1&".C2#RECLASS.C3#"&vC3&".C4#"&vC4&"=-1*"&vData
If vC4="C4GEN" Then
HS.Exp "A#64010102.I#"&vICP&".C2#RECLASS.C4#C4GEN=A#64010102.I#"&vICP&".C2#RECLASS.C4#C4GEN-A#140301.I#"&vICP&".C2#RECLASS.C4#NETMOV"
End If
描述
返回两个成员在组织架构模型中的最小共父
语法
deep_consol.EntityMD.common_ancestor(member1: union[str, EntityStr], member2: union[str, EntityStr], pov: dict ,ignore_disposal:bool,active_filter:bool) -> list[str]
|
参数 |
参数类型 |
参数含义 |
|---|---|---|
|
member1 |
union[str,EntityStr] |
member1为具体的实体成员 |
|
member2 |
union[str,EntityStr] |
member2为具体的实体成员 |
|
pov |
dict |
传参为空时默认为当前运行的pov |
|
ignore_disposal |
bool |
是否忽略处置 |
|
active_filter |
bool |
是否考虑激活,默认为True,即只考虑已激活的实体 |
当member1、member2为父.子结构时,考虑父.子所处的边的方向(只沿着特别的父级寻找最小共父)。
示例
pl_df = main_cube.loc[account['PL'].Base(), partner['Internal'].Base(), value['ParentTotal'], audittrail[
'REPORT'], movement['BAL']].to_dataframe()
for index, row in pl_df.iterrows():
if dc.EntityMD.parent() in dc.EntityMD.common_ancestor(row['partner'], row['entity']):
dc.con(row=row, dest=(
audittrail['REVCOS_ELIM'],
value["Elimination"]
), factor=-1, nature='内部交易抵销')
实现
|
版本 |
实现路径 |
备注 |
|---|---|---|
|
|
通过读取组织架构模型相关数据构建图数据结构 |
对应HFM实现
HFM无对应实现
描述
该对象由投资关系模型(Investment)实例化得到
描述
获取投资关系模型属性信息
语法
_
deep_consol.EntityMD.<property>(investor : union[str, EntityStr], investee: union[str, EntityStr],pov =None: dict) -> str
|
参数 |
参数类型 |
参数含义 |
|---|---|---|
|
investor |
union[str, EntityStr] |
投资方 |
|
investee |
union[str, EntityStr] |
被投资方 |
|
pov |
dict |
传参为空时默认为当前运行的pov |
当investor、investee为父.子结构时 截断父代 只按子代判断
示例
dc.investment_model.consideration('HC','AC',{'year': '2023', 'period': '2', 'scenario': 'Actual', 'version': 'Working'})
描述
获取投资方为相应公司对应所有的被投资方
语法
deep_consol.investment_model.all_investee(investor: union[str, EntityStr],pov =None: dict) -> list[str]
|
参数 |
参数类型 |
参数含义 |
|---|---|---|
|
investor |
union[str, EntityStr] |
当为非分层调度模式,传参为空时默认为运行时当前entity |
|
pov |
dict |
传参为空时默认为当前运行的pov |
当investor为父.子结构时 截断父代 只按子代判断
示例
dc.investment_model.all_investee('HC',{'year': '2023', 'period': '2', 'scenario': 'Actual', 'version': 'Working'})
描述
获取被投资方为相应公司对应所有的投资方
语法
_**deep_consol.investment_model**_**.all_investor**_**(investee: union[str, EntityStr],pov =None: dict) -> list[str]**_
|
参数 |
参数类型 |
参数含义 |
|---|---|---|
|
investee |
union[str, EntityStr] |
当为非分层调度模式,传参为空时默认为运行时当前entity |
|
pov |
dict |
传参为空时默认为当前运行的pov |
当investee为父.子结构时 截断父代 只按子代判断
示例
dc.investment_model.all_investor('HC',{'year': '2023', 'period': '2', 'scenario': 'Actual', 'version': 'Working'})
描述
当为非分层调度模式,返回当前合并算法运行的实体维度成员
描述
当为分层调度模式,返回当前合并算法运行的实体维度成员集合
描述
当为非分层调度模式,返回当前合并算法运行的实体维度成员(无论是父.子还是纯子代的实体都返回子代)
描述
当为非分层调度模式,返回当前合并算法运行的父.子实体的父代 如果当前运行的不是父.子实体返回None
描述
当前合并算法运行的情景维度成员
描述
当前合并算法运行的版本维度成员
描述
当前合并算法运行的年份维度成员
描述
当前合并算法运行的期间维度成员
描述
当前合并算法运行的值维度成员
描述
当前计算是否第二次执行的 flag
当前合并算法运行框架中EC、ECA、PC、PCA计算规则 默认运行两次,该属性描述当前计算规则是否是第二次运行
描述
当为非分层调度模式,返回当前实体的本位币、本位币调整、本位币合计
描述
当为非分层调度模式,返回当前实体的父级币种、父级币种调整、父级币种合计(当value为ParentCurrency以上成员时该属性有效)
描述
确定当前Value是否是折币或者折币调整,返回布尔值True/false当时
描述
包含当前情景、版本、年度、期间、实体的信息,如果是执行的是合并逻辑还会包括目标节点(target_vertex)、所有与目标节点有计算依赖关系的实体集合(all_entity)、以及执行完毕的实体集合(completed_entity)。
示例:
{
'parent': None,
'jobId': 'consol_process_9e364a47-5f83-44b3-a587-510e0f2af3df',
'period': '10',
'year': '2021',
'scenario': 'Actual',
'consolProcessElement': {
'elementName': 'Consol_Process',
'elementType': 'CONSOLPROC',
'folderId': 'DIRcca5ba18db1d',
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\',
'serverName': 'consolidation-process-server1-0'
},
'consolUnitContribution': {
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\\\Consol_Process_tables_ywdy',
'serverName': 'data-table-mysql-server1-0',
'elementType': 'DAT_MYSQL',
'folderId': 'DIRdf5a85392bd9',
'elementName': 'consol_unit_contribution',
'tableName': 'eijuqd015_t_contribution_fk09y'
},
'consolUnit': {
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\\\Consol_Process_tables_ywdy',
'serverName': 'data-table-mysql-server1-0',
'elementType': 'DAT_MYSQL',
'folderId': 'DIRdf5a85392bd9',
'elementName': 'consol_unit',
'tableName': 'eijuqd015_consol_unit_8ws4a'
},
'unitId': '2023_1_Actual_Working_TotalEntity',
'version': 'Working',
'entity': 'MC',
# 执行逻辑的枚举:包括calculation、consolidation、consolidation_all、contribution、translation
'action': 'consolidation',
'target_vertex': 'MC',
# 目标实体
'all_entity': {
# 与当前合并实体有依赖关系的所有实体 包括当前实体
'M',
'P',
'C',
'B',
'[AC].[A]',
'[HC].[E]',
'[MC].[P]',
'[AC].[BC]',
'[HC].[F]',
'[HC].[D]',
'[MC].[AC]',
'N',
'D',
'[MC].[M]',
'BC',
'[BC].[C]',
'AC',
'E',
'[MC].[N]',
'A',
'HC',
'F',
'[BC].[B]',
'MC',
'[MC].[HC]'
},
'consol_task_id': '25ec16de-0ab8-4caa-a7c8-401d8ffb3051',
'completed_entity'
# 已经执行完合并逻辑的所有实体 不包括当前实体
: {
'M',
'P',
'C',
'B',
'[AC].[A]',
'[HC].[E]',
'[MC].[P]',
'[AC].[BC]',
'[HC].[F]',
'[HC].[D]',
'[MC].[AC]',
'N',
'D',
'[MC].[M]',
'BC',
'[BC].[C]',
'AC',
'E',
'[MC].[N]',
'A',
'HC',
'F',
'[BC].[B]',
'[MC].[HC]'
}
}
描述
con函数的作用是将数据从ParentToal 搬运/抵销至Elimination、Proportion上。
con函数内部会对数据进行聚合,但是数据不会立即回写到deepcube中,当rule中consolidate函数运行完毕后才会回写到deepcube中。
该方法只能用于consolidate函数中。
注意
con函数不对数据合法性进行校验,比如可能存在取到某个父级科目的数据然后未修改该行数据的维值导致脏数据写入数据库。可以考虑在取数df时限制维度组合为base节点。
语法
deep_consol.con(row:Union[pd.DataFrame, pd.Series, Dict, pl.DataFrame], factor: Union[int, float], dest: Union[DimMember, Tuple[DimMember, …]], nature: str,gen_journal:bool)
|
参数 |
参数类型 |
参数含义 |
|---|---|---|
|
row |
Union[pd.DataFrame, pd.Series, Dict, pl.DataFrame] |
series 、 dataframe(可以是polars dataframe)、dict |
|
factor |
Union[int, float] |
乘数 |
|
dest |
Union[DimMember, Tuple[DimMember, …]] |
目标pov |
|
nature |
str |
备注 |
|
gen_journal |
bool |
是否需要con函数生成凭证 默认为True,如果传参为False,con函数提交的数据将不会在con_journal_data属性中拿到。 |
示例
df = main_cube.loc[account['BS'].Base(), partner['Internal'].Base(), value['ParentTotal'], audittrail[
'REPORT'], movement['BAL']].to_dataframe()
# pandas dataframe 行迭代判断
for index, row in df.iterrows():
# 按行迭代 做最小共父判断
if dc.EntityMD.parent() in dc.EntityMD.common_ancestor(row['partner'], row['entity']):
dc.con(row=row, dest=(
audittrail['INTERCO_ELIM'],
value["Elimination"]
), factor=-1, nature='关联往来抵销')
ar_ap_df = main_cube.loc[
ac['BS'].Base().loc[ac['BS'].Base().attr('ud1') == 'PLUG01'], par['Internal'].Base(), mov['TotalMov'].Base(),
val['ParentTotal'], ad['REPORT']].to_dataframe(engine='polars')
# polars dataframe 行迭代判断
for row in ar_ap_df.iter_rows(named=True):
if dc.EntityMD.parent() in dc.EntityMD.common_ancestor(row['partner'], row['entity']):
row['decimal_val']=0.001
dc.con(row=row, dest=(
ad['REVCOS_ELIM'],
val["Elimination"]
), factor=-1, nature='关联往来抵销')
partner_set_1 = set(dc.EntityMD.list(cur_entity_parent, 'base'))
partner_set_2 = set(dc.EntityMD.list(cur_entity_child, 'base'))
partner_set = partner_set_1-partner_set_2
partner_set.discard(cur_entity_child)
partner_list = list(partner_set)
df = main_cube.loc[account['BS'].Base(), partner[partner_list], value['ParentTotal'], audittrail[
'REPORT'], movement['BAL']].to_dataframe()
# pandas dataframe
dc.con(row=df, dest=(
audittrail['INTERCO_ELIM'],
value["Elimination"]
), factor=-1, nature='关联往来抵销')
partner_set_1 = set(dc.EntityMD.list(cur_entity_parent, 'base'))
partner_set_2 = set(dc.EntityMD.list(cur_entity_child, 'base'))
partner_set = partner_set_1-partner_set_2
partner_set.discard(cur_entity_child)
partner_list = list(partner_set)
df = main_cube.loc[account['BS'].Base(), partner[partner_list], value['ParentTotal'], audittrail[
'REPORT'], movement['BAL']].to_dataframe(engine='polars')
# polars dataframe
dc.con(row=df, dest=(
audittrail['INTERCO_ELIM'],
value["Elimination"]
), factor=-1, nature='关联往来抵销')
对应HFM实现
'4.1、长期股权调整值表外调表内
Set DataUnit=Hs.OpenDataUnit("A#EPU003.ET#[None]")
NumItems=DataUnit.GetNumItems
For i=0 To NumItems-1
vAcc = DataUnit.Item(i).Account
vICP = DataUnit.Item(i).ICP
vMM = DataUnit.Item(i).Custom("MM")
vCP = DataUnit.Item(i).Custom("CP")
vPJ = DataUnit.Item(i).Custom("PJ")
vET = DataUnit.Item(i).Custom("ET")
vBK1 = DataUnit.Item(i).Custom("BK1")
vBK2 = DataUnit.Item(i).Custom("BK2")
vData = DataUnit.Item(i).Data
If vData<>0 And (StrComp(vICP, "[ICP None]", vbTextCompare) <> 0) Then
'如果数据不为0且ICP不为[ICP None]
If (HS.Node.IsDescendant(HS.Parent.Member,vICP,"S#"&vScenario&".Y#"&vYear&".P#"&vPeriod)=True) Then
'如果实体与ICP是同一个父级的后代
'1401010101长期股权投资_成本,权益法调整
Call Hs.Con("A#1401010101.MM#[None].CP#[None].PJ#[None].ET#EPU_ADJ.BK1#[None].BK2#[None].V#[246]",1,"权益抵消-调整-长期股权投资_成本")
'3111010101资本公积_资本溢价,权益法调整
Call Hs.Con("A#3111010101.MM#[None].CP#[None].PJ#[None].ET#EPU_ADJ.BK1#[None].BK2#[None].V#[Elimination]",-1,"权益抵消-调整-资本公积股本溢价")
End If
End If
Next
描述
设置con函数round位数
语法
**deep_consol.set_con_round_num(round_num: int)**
|
参数 |
参数类型 |
参数含义 |
|---|---|---|
|
round_num |
int |
round位数 |
如果不设置round位数 默认为两位
描述
该方法作用是从con_data(即con函数所提交的数据)中拆分凭证头、凭证行,按照科目类型拆分借贷方,并补充一些字段信息如comment_head、post_status、journal_id、journal_name。其中journal_id及journal_name 按照情景、版本、年度、期间、备注生成、post_status默认为True。
语法
deep_consol.con_journal_data(entity: str = None) -> Tuple[pd.DataFrame, pd.DataFrame]
|
参数 |
参数类型 |
参数含义 |
|---|---|---|
|
entity |
Optional[str] |
entity维度成员字符串,可不传默认为None |
若entity传参为None,则构造全部实体的凭证,否则构造对应entity的凭证。
返回值为两个Dataframe组成的元组,第一个Dataframe是凭证头,第二个是凭证行。
借贷方拆分规则:暂时仅按照科目类型拆分
def split_debit_credit(row):
row["accounttype"] = account_dim[row[account_field]].accounttype
if row["accounttype"] in ["ASSET", "EXPENSE"]:
if row[DECIMAL_COL] > 0:
return pd.Series({"debit": row["decimal_val"], "credit": 0})
else:
return pd.Series({"debit": 0, "credit": -row["decimal_val"]})
else:
if row["decimal_val"] > 0:
return pd.Series({"debit": 0, "credit": row["decimal_val"]})
else:
return pd.Series({"debit": -row["decimal_val"], "credit": 0})
示例
import re
from Python.common.common_tools import PARENT_CHILD_PARSE
from Python.deep_consol.deep_consol import deep_consol as dc
from Python.deep_consol.journal_model import journal_model
def custom_process_control_post_python(p2):
if p2.get("action") not in ["consolidation", "contribution"]:
return
if p2.get("completed_entity"):
total_entity_list = [entity for entity in p2["completed_entity"] if re.match(PARENT_CHILD_PARSE, entity)]
# 删除指定凭证
if total_entity_list:
t = journal_model.table
where = ((t.year == p2['year']) &
(t.period == p2['period']) &
(t.scenario == p2['scenario']) &
(t.version == p2['version']) &
(t.value == "Elimination")) & \
(t.entity.isin(total_entity_list))
r = journal_model.delete(where)
print(r)
# 构造凭证
rlt = dc.con_journal_data()
head_df = rlt[0]
line_df = rlt[1]
# 存入凭证模型
if not head_df.empty and not line_df.empty:
head_df['_type'] = 'sys_journal'
r = journal_model.save(head_df=head_df,
line_df=line_df,
)
print(r)
对应HFM实现
无
描述
该方法作用是向calc_data提交数据,多为重分类等计算调整的数据(仅作储存用于后文生成凭证或者别的什么用途,不会回写Deepcube)。
语法
deep_consol.calc_journal_data(row: Union[pd.DataFrame, pd.Series], nature: str):
|
参数 |
参数类型 |
参数含义 |
|---|---|---|
|
row |
Union[pd.DataFrame, pd.Series] | |
|
nature |
str |
备注 |
示例
df=main_cube.loc[audittrail['Reclassification']].to_dataframe()
dc.append_calc_journal(df,'重分类')
描述
该方法作用是从calc_data(即append_calc_journal函数所提交的数据)中拆分凭证头、凭证行,按照科目类型拆分借贷方,并补充一些字段信息如comment_head、post_status、journal_id、journal_name。其中journal_id及journal_name 按照情景、版本、年度、期间、备注生成、post_status默认为True。
语法
deep_consol.calc_journal_data(entity: str = None) -> Tuple[pd.DataFrame, pd.DataFrame]
|
参数 |
参数类型 |
参数含义 |
|---|---|---|
|
entity |
Optional[str] |
entity维度成员字符串,可不传默认为None |
若entity传参为None,则构造全部实体的凭证,否则构造对应entity的凭证。
返回值为两个Dataframe组成的元组,第一个Dataframe是凭证头,第二个是凭证行。
借贷方拆分规则:暂时仅按照科目类型拆分
def split_debit_credit(row):
row["accounttype"] = account_dim[row[account_field]].accounttype
if row["accounttype"] in ["ASSET", "EXPENSE"]:
if row[DECIMAL_COL] > 0:
return pd.Series({"debit": row["decimal_val"], "credit": 0})
else:
return pd.Series({"debit": 0, "credit": -row["decimal_val"]})
else:
if row["decimal_val"] > 0:
return pd.Series({"debit": 0, "credit": row["decimal_val"]})
else:
return pd.Series({"debit": -row["decimal_val"], "credit": 0})
示例
import re
from Python.common.common_tools import PARENT_CHILD_PARSE
from Python.deep_consol.deep_consol import deep_consol as dc
from Python.deep_consol.journal_model import journal_model
def custom_process_control_post_python(p2):
if p2.get("action") not in ["consolidation", "contribution"]:
return
if p2.get("completed_entity"):
total_entity_list = [entity for entity in p2["completed_entity"] if re.match(PARENT_CHILD_PARSE, entity)]
# 删除指定凭证
if total_entity_list:
t = journal_model.table
where = ((t.year == p2['year']) &
(t.period == p2['period']) &
(t.scenario == p2['scenario']) &
(t.version == p2['version']) &
(t.value == "Elimination")) & \
(t.entity.isin(total_entity_list))
r = journal_model.delete(where)
print(r)
# 构造凭证
rlt = dc.calc_journal_data()
head_df = rlt[0]
line_df = rlt[1]
# 存入凭证模型
if not head_df.empty and not line_df.empty:
head_df['_type'] = 'sys_journal'
r = journal_model.save(head_df=head_df,
line_df=line_df,
)
print(r)
描述
calc_journal_data、con_journal_data返回的凭证头、凭证行 DataFrame 拼接而成的DataFrame
语法
deep_consol.journal_data(entity: str = None) -> Tuple[pd.DataFrame, pd.DataFrame]
|
参数 |
参数类型 |
参数含义 |
|---|---|---|
|
entity |
Optional[str] |
entity维度成员字符串,可不传默认为None |
示例
import re
from Python.common.common_tools import PARENT_CHILD_PARSE
from Python.deep_consol.deep_consol import deep_consol as dc
from Python.deep_consol.journal_model import journal_model
def custom_process_control_post_python(p2):
if p2.get("action") not in ["consolidation", "contribution"]:
return
if p2.get("completed_entity"):
total_entity_list = [entity for entity in p2["completed_entity"] if re.match(PARENT_CHILD_PARSE, entity)]
# 删除指定凭证
if total_entity_list:
t = journal_model.table
where = ((t.year == p2['year']) &
(t.period == p2['period']) &
(t.scenario == p2['scenario']) &
(t.version == p2['version']) &
(t.value == "Elimination")) & \
(t.entity.isin(total_entity_list))
r = journal_model.delete(where)
print(r)
# 构造凭证
rlt = dc.journal_data()
head_df = rlt[0]
line_df = rlt[1]
# 存入凭证模型
if not head_df.empty and not line_df.empty:
head_df['_type'] = 'sys_journal'
r = journal_model.save(head_df=head_df,
line_df=line_df,
)
print(r)
描述
con函数提交的数据,本质上是个字典(以当前实体名称为key,value为一个key为con函数参数中的nature,value是一个以Series或者Dataframe组成的列表 构成的字典),可用来生成凭证。
con_data: Dict[str:Dict[str:List[Union[pd.Series, pd.DataFrame]]]]
描述
append_calc_journal函数提交的数据,本质上是个字典(以当前实体名称为key,value为一个key为con函数参数中的nature,value是一个以Series或者Dataframe组成的列表 构成的字典),可用来生成凭证。
calc_data: Dict[str:Dict[str:List[Union[pd.Series, pd.DataFrame]]]]
描述
自定义数据源,通常为存储在关系库的某些数据,比如规则配置表,浮动行报表等。
该属性的返回值由钩子函数中custom_get_consol_custom_data_async确定,如未定义这个钩子函数则默认为None。
描述
con函数round位数,默认为两位。
描述
EntityStr 存在的目的在于抹平 Entity维度成员结构(父.子)与Partner维度成员结构的差异(只有子)。
当_**dc.EntityMD.list**_传入的是父.子结构时,返回的成员列表即以EntityStr组成。
特性
pc:返回父.子结构的字符串(系统标准的字符串)
test_str=EntityStr('[A].[B]')
print(test_str.pc())
# 返回[A].[B]
- ch:返回只有子代的字符串(系统标准的字符串)
test_str=EntityStr('[A].[B]')
print(test_str.ch())
# 返回B
描述
config.py记载了合并sdk所需要的模型、数据表、维度路径、日志级别、使用的数据库类型,EC结转、审计线索结转等相关配置。
示例
from Python.deep_consol.config import config
path_setting = {
'DIM': {
'Entity': '/Consolidation_App/Common/Dimension/',
'Partner': '/Consolidation_App/Common/Dimension/',
'EntityFilter': '/Consolidation_App/Common/Dimension/',
'Period': '/Consolidation_App/Common/Dimension/',
},
'Table': {
'EntityHierarchy': '/Consolidation_App/Entity/Entity_Hierarchy/',
'Entity': '/Consolidation_App/Entity/Entity/',
'Investment': '/Consolidation_App/Entity/Investment/',
'consol_unit': '/Consolidation_App/Consol_Process_New/consol_process_new_tables_ectl/',
'consol_unit_contribution': '/Consolidation_App/Consol_Process_New/consol_process_new_tables_ectl/'
},
'Cube': {'main_cube': ['Main_Cube', '/Consolidation_App/Consolidation/Main_Model/'],
'fvadj_cube': ['FVadj_Cube', '/Consolidation_App/PPA/PPA_Model/'],
'exrate_cube': ['ExRate_Cube', '/Consolidation_App/Rate/Rate_Model/'],
},
'Journal_model': {
'name': 'Journal_Model',
'path': '/Consolidation_App/Journal/Journal_Model/'
}
}
# 调度顺序是否依赖投资关系
config.is_rely_investment = True
# 是否分层调度
config.is_stratified_schedule = False
# ec 结转 时 'ASSET', 'LIABILITY', 'EQUITY', 'REVENUE', 'EXPENSE' 科目的 round位数
config.ec_round_num = 2
# 是否需要按照审计线索结转
config.need_audit_carry = True
# 子代实体对应的审计线索维度成员
config.source_member = 'REPORT'
# 父级实体对应的审计线索维度成员
config.dest_member = 'CARRY'
# 审计线索对应的列名
config.audit_column = 'audittrail'
# 合并下级是否执行贡献
config.consolidate_subordinate_with_contribute = True
# 合并下级是否执行折算
config.consolidate_subordinate_with_translate = True
# SchedulerData.all_entity 是否限定在当前节点的后代范围内(可能影响deep_cube 初始化取数范围),如果是False可能会扩大取数范围取别的架构下的数据
config.all_entity_restricted_within_descendant_member = False
# 是否使用自定义的结转ec方法
config.use_custom_data_to_ec = False
# 是否使用celery
config.use_celery = True
# 最大celery并发数
config.max_celery_concurrency = 4
# 状态是否需要向上传播
config.upward_status_transmission = True
# 使用celery进行并发调度情况下 一次合并最多跑多少个合并实体
config.max_consolidated_entity = 32
from deepfos.element.datatable import DataTableMySQL as DataTable # noqa
from deepfos.element.datatable import AsyncDataTableMySQL as AsyncDataTable # noqa
try:
from deepfos.element.pyscript import redis_cli # noqa
except Exception: # noqa
redis_cli = None
cache_client = redis_cli
# from deepfos import OPTION
# OPTION.general.log_level = 'DEBUG'
# OPTION.api.dump_always = True
描述
deepcube_config.py记载了deepcube初始化所需要的一些信息(包括数据库链接信息,目标集范围、来源集范围)。
由于deepcube的一些特性需要指定当前计算的来源集范围,deepcube_config.py主要功能在于自动指定这些信息无需顾问手工指定。
默认配置
目标集及来源集
- 在执行计算时以当前情景、当前版本、当前实体、当年以及上一年度的当月、上月、12月的数据为来源集。
- 在执行折算时以当前情景、当前版本、当前实体、当年以及上一年度的当月、上月、12月的数据为来源集。
- 在执行贡献时以当前情景、当前版本、当前实体以及与当前实体有计算依赖关系的实体、当年以及上一年度的当月、上月、12月的数据为来源集。
- 在执行合并时以当前情景、当前版本、当前实体以及与当前实体有计算依赖关系的实体、当年以及上一年度的当月、上月、12月的数据为来源集。
在rule.py文件中使用deepcube时,等号左侧 右侧需在指定来源集范围中。
修改这些配置信息可能需要对deepcube整体功能有一定了解,建议修改这些配置信息时与@陈思聪和@张健坤联系
示例
from concurrent.futures import as_completed
from typing import Dict
from deepcube.cube.cube import DeepCube
from deepfos.lib.concurrency import ThreadCtxExecutor
from Python.common.common_tools import time_log
from Python.config import path_setting
from Python.deep_consol.action_enum import ActionEnum
from Python.deep_consol.cube_info import CubeInfo
from .common.log import logger
from deepcube import Config
Config.dim_init_max_concurrency = 16
Config.data_submit_chunksize = 1000000
Config.db_direct_access = False
cube_dict: Dict[str, DeepCube] = {}
main_cube = DeepCube(element_name=path_setting['Cube']['main_cube'][0], path=path_setting['Cube']['main_cube'][1])
ex_rate_cube = DeepCube(element_name=path_setting['Cube']['exrate_cube'][0], path=path_setting['Cube']['exrate_cube'][1])
# 数据库链接信息
ck_connection_prop = None
class CubeConfig(CubeInfo):
@time_log
def init_cube(self, action_code: ActionEnum, con_p2: Dict, all_entity=None):
def init_ex_cube():
try:
# 初始化汇率模型
ex_rate_cube.init_data()
cube_dict['exrate_cube'] = ex_rate_cube
except Exception as e:
logger.exception('初始化汇率模型失败')
raise e
def init_main_cube():
try:
# 初始化主模型
if str(con_p2['period']) == '1':
last_period = '1'
else:
last_period = str(int(con_p2['period']) - 1)
last_year = str(int(con_p2['year']) - 1)
if (action_code == ActionEnum.consolidation) or (action_code == ActionEnum.consolidation_all) or (
action_code == ActionEnum.consolidate_subordinate):
entity_member_set = self.entity_dim[all_entity]
# 从cube加载数据
source_list = [
(
self.year_dim[con_p2['year']],
self.period_dim[[con_p2['period'], last_period]],
self.scenario_dim[con_p2['scenario']],
self.version_dim[con_p2['version']],
entity_member_set,
),
(
self.year_dim[last_year],
self.period_dim[["12"]],
self.scenario_dim[con_p2['scenario']],
self.version_dim[con_p2['version']],
entity_member_set,
)
]
self.main_cube.init_data(*source_list, partition_by=self.entity_field)
cube_dict['main_cube'] = self.main_cube
elif action_code == ActionEnum.calculation:
# 默认为当前实体
entity_member_set = self.entity_dim[con_p2['entity']]
# 从cube加载数据
source_list = [
(
self.year_dim[con_p2['year']],
self.period_dim[[con_p2['period'], last_period]],
self.scenario_dim[con_p2['scenario']],
self.version_dim[con_p2['version']],
entity_member_set,
),
(
self.year_dim[last_year],
self.period_dim[["12"]],
self.scenario_dim[con_p2['scenario']],
self.version_dim[con_p2['version']],
entity_member_set,
)
]
self.main_cube.init_data(*source_list, partition_by=self.entity_field)
cube_dict['main_cube'] = self.main_cube
elif action_code == ActionEnum.translation:
# 默认为当前实体
entity_member_set = self.entity_dim[con_p2['entity']]
# 从cube加载数据
source_list = [
(
self.year_dim[con_p2['year']],
self.period_dim[[con_p2['period'], last_period]],
self.scenario_dim[con_p2['scenario']],
self.version_dim[con_p2['version']],
entity_member_set,
),
(
self.year_dim[last_year],
self.period_dim[["12"]],
self.scenario_dim[con_p2['scenario']],
self.version_dim[con_p2['version']],
entity_member_set,
)
]
self.main_cube.init_data(*source_list, partition_by=self.entity_field)
cube_dict['main_cube'] = self.main_cube
elif action_code == ActionEnum.contribution:
entity_member_set = self.entity_dim[all_entity]
# 如果要做在合并流程中做抵销的轧差,可能需要取兄弟节点的数据(及其后代)
# entity_member_set = self.entity_dim[all_entity] + self.entity_dim[con_p2['parent']].Descendant(
# with_parent=True) + self.entity_dim[con_p2['parent']].Descendant()
# 从cube加载数据
source_list = [
(
self.year_dim[con_p2['year']],
self.period_dim[[con_p2['period'], last_period]],
self.scenario_dim[con_p2['scenario']],
self.version_dim[con_p2['version']],
entity_member_set,
),
(
self.year_dim[last_year],
self.period_dim[["12"]],
self.scenario_dim[con_p2['scenario']],
self.version_dim[con_p2['version']],
entity_member_set,
)
]
self.main_cube.init_data(*source_list, partition_by=self.entity_field)
cube_dict['main_cube'] = self.main_cube
else:
raise ValueError('ACTION_CODE 不合法')
except Exception as e:
logger.exception('初始化主模型失败')
raise e
with ThreadCtxExecutor(max_workers=2) as executor:
futures = [
executor.submit(init_ex_cube),
executor.submit(init_main_cube),
]
for f in as_completed(futures):
print(f.result())
cube_config = CubeConfig(main_cube)
def get_main_cube():
if 'main_cube' in cube_dict:
cube = cube_dict['main_cube']
return cube
else:
raise RuntimeError('cube 尚未被初始化')
def get_ex_rate_cube():
if 'exrate_cube' in cube_dict:
cube = cube_dict['exrate_cube']
return cube
else:
raise RuntimeError('cube 尚未被初始化')
描述
计算、折算、贡献、合并执行完毕后(deepcube数据写入数据库后)自动调起来的函数
传参
当前业务逻辑执行的p2,包含当前情景、版本、年度、期间、实体的信息,如果是执行的是合并逻辑还会包括目标节点(target_vertex)、所有与目标节点有计算依赖关系的实体集合(all_entity)、以及执行完毕的实体集合(completed_entity)。
传参示例:
{
'parent': None,
'jobId': 'consol_process_9e364a47-5f83-44b3-a587-510e0f2af3df',
'period': '10',
'year': '2021',
'scenario': 'Actual',
'consolProcessElement': {
'elementName': 'Consol_Process',
'elementType': 'CONSOLPROC',
'folderId': 'DIRcca5ba18db1d',
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\',
'serverName': 'consolidation-process-server1-0'
},
'consolUnitContribution': {
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\\\Consol_Process_tables_ywdy',
'serverName': 'data-table-mysql-server1-0',
'elementType': 'DAT_MYSQL',
'folderId': 'DIRdf5a85392bd9',
'elementName': 'consol_unit_contribution',
'tableName': 'eijuqd015_t_contribution_fk09y'
},
'consolUnit': {
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\\\Consol_Process_tables_ywdy',
'serverName': 'data-table-mysql-server1-0',
'elementType': 'DAT_MYSQL',
'folderId': 'DIRdf5a85392bd9',
'elementName': 'consol_unit',
'tableName': 'eijuqd015_consol_unit_8ws4a'
},
'unitId': '2023_1_Actual_Working_TotalEntity',
'version': 'Working',
'entity': 'MC',
# 执行逻辑的枚举:包括calculation、consolidation、consolidation_all、contribution、translation
'action': 'consolidation',
'target_vertex': 'MC',
# 目标实体
'all_entity': {
# 与当前合并实体有依赖关系的所有实体 包括当前实体
'M',
'P',
'C',
'B',
'[AC].[A]',
'[HC].[E]',
'[MC].[P]',
'[AC].[BC]',
'[HC].[F]',
'[HC].[D]',
'[MC].[AC]',
'N',
'D',
'[MC].[M]',
'BC',
'[BC].[C]',
'AC',
'E',
'[MC].[N]',
'A',
'HC',
'F',
'[BC].[B]',
'MC',
'[MC].[HC]'
},
'consol_task_id': '25ec16de-0ab8-4caa-a7c8-401d8ffb3051',
'completed_entity'
# 已经执行完合并逻辑的所有实体 不包括当前实体
: {
'M',
'P',
'C',
'B',
'[AC].[A]',
'[HC].[E]',
'[MC].[P]',
'[AC].[BC]',
'[HC].[F]',
'[HC].[D]',
'[MC].[AC]',
'N',
'D',
'[MC].[M]',
'BC',
'[BC].[C]',
'AC',
'E',
'[MC].[N]',
'A',
'HC',
'F',
'[BC].[B]',
'[MC].[HC]'
},
'submit_success_flag':True #数据是否已经提交 应该为True
}
触发时点
单体计算、折算、贡献完成后、所有合并逻辑执行完毕后。(此时deepcube中的数据已经写入数据库)
使用方法
在custom文件夹下建custom_process_control_post_python.py文件,文件中定义一个custom_process_control_post_python函数,入参为p2。
函数示例:
import re
from Python.common.common_tools import PARENT_CHILD_PARSE
from Python.deep_consol.deep_consol import deep_consol as dc
from Python.deep_consol.journal_model import journal_model
def custom_process_control_post_python(p2):
if p2.get("action") not in ["consolidation", "contribution"]:
return
if p2.get("completed_entity"):
total_entity_list = [entity for entity in p2["completed_entity"] if re.match(PARENT_CHILD_PARSE, entity)]
# 删除指定凭证
if total_entity_list:
t = journal_model.table
where = ((t.year == p2['year']) &
(t.period == p2['period']) &
(t.scenario == p2['scenario']) &
(t.version == p2['version']) &
(t.value == "Elimination")) & \
(t.entity.isin(total_entity_list))
r = journal_model.delete(where)
print(r)
# 构造凭证
rlt = dc.journal_data()
head_df = rlt[0]
line_df = rlt[1]
# 存入凭证模型
if not head_df.empty and not line_df.empty:
head_df['_type'] = 'sys_journal'
r = journal_model.save(head_df=head_df,
line_df=line_df,
)
print(r)
描述
计算、折算、贡献、合并执行失败后自动调起来的函数
传参
当前业务逻辑执行的p2,包含当前情景、版本、年度、期间、实体的信息,如果是执行的是合并逻辑还会包括目标节点(target_vertex)、所有与目标节点有计算依赖关系的实体集合(all_entity)、以及执行完毕的实体集合(completed_entity)。
传参示例:
{
'parent': None,
'jobId': 'consol_process_9e364a47-5f83-44b3-a587-510e0f2af3df',
'period': '10',
'year': '2021',
'scenario': 'Actual',
'consolProcessElement': {
'elementName': 'Consol_Process',
'elementType': 'CONSOLPROC',
'folderId': 'DIRcca5ba18db1d',
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\',
'serverName': 'consolidation-process-server1-0'
},
'consolUnitContribution': {
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\\\Consol_Process_tables_ywdy',
'serverName': 'data-table-mysql-server1-0',
'elementType': 'DAT_MYSQL',
'folderId': 'DIRdf5a85392bd9',
'elementName': 'consol_unit_contribution',
'tableName': 'eijuqd015_t_contribution_fk09y'
},
'consolUnit': {
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\\\Consol_Process_tables_ywdy',
'serverName': 'data-table-mysql-server1-0',
'elementType': 'DAT_MYSQL',
'folderId': 'DIRdf5a85392bd9',
'elementName': 'consol_unit',
'tableName': 'eijuqd015_consol_unit_8ws4a'
},
'unitId': '2023_1_Actual_Working_TotalEntity',
'version': 'Working',
'entity': 'MC',
# 执行逻辑的枚举:包括calculation、consolidation、consolidation_all、contribution、translation
'action': 'consolidation',
'target_vertex': 'MC',
# 目标实体
'all_entity': {
# 与当前合并实体有依赖关系的所有实体 包括当前实体
'M',
'P',
'C',
'B',
'[AC].[A]',
'[HC].[E]',
'[MC].[P]',
'[AC].[BC]',
'[HC].[F]',
'[HC].[D]',
'[MC].[AC]',
'N',
'D',
'[MC].[M]',
'BC',
'[BC].[C]',
'AC',
'E',
'[MC].[N]',
'A',
'HC',
'F',
'[BC].[B]',
'MC',
'[MC].[HC]'
},
'consol_task_id': '25ec16de-0ab8-4caa-a7c8-401d8ffb3051',
'completed_entity'
# 已经执行完合并逻辑的所有实体 不包括当前实体
: {
'M',
'P',
'C',
'B',
'[AC].[A]',
'[HC].[E]',
'[MC].[P]',
'[AC].[BC]',
'[HC].[F]',
'[HC].[D]',
'[MC].[AC]',
'N',
'D',
'[MC].[M]',
'BC',
'[BC].[C]',
'AC',
'E',
'[MC].[N]',
'A',
'HC',
'F',
'[BC].[B]',
'[MC].[HC]'
},
'submit_success_flag':True #数据是否已经提交 可能为False
}
触发时点
单体计算、折算、贡献执行失败后、合并节点合并执行失败后。
使用方法
在custom文件夹下建custom_process_control_post_python_failed.py文件,文件中定义一个custom_process_control_post_python_failed函数,入参为p2。
函数示例:
import re
from Python.common.common_tools import PARENT_CHILD_PARSE
from Python.deep_consol.deep_consol import deep_consol as dc
from Python.deep_consol.journal_model import journal_model
def custom_process_control_post_python_failed(p2):
if p2.get("action") not in ["consolidation", "contribution"] and p2.get('submit_success_flag'):
return
if p2.get("completed_entity"):
total_entity_list = [entity for entity in p2["completed_entity"] if re.match(PARENT_CHILD_PARSE, entity)]
# 删除指定凭证
if total_entity_list:
t = journal_model.table
where = ((t.year == p2['year']) &
(t.period == p2['period']) &
(t.scenario == p2['scenario']) &
(t.version == p2['version']) &
(t.value == "Elimination")) & \
(t.entity.isin(total_entity_list))
r = journal_model.delete(where)
print(r)
# 构造凭证
rlt = dc.journal_data()
head_df = rlt[0]
line_df = rlt[1]
# 存入凭证模型
if not head_df.empty and not line_df.empty:
head_df['_type'] = 'sys_journal'
r = journal_model.save(head_df=head_df,
line_df=line_df,
)
print(r)
描述
业务逻辑执行前调度起来的函数,目标是给Deepconsol提供定制化数据源,函数的返回可以通过deepconsol.consol_custom_data 拿到。
这个函数因为要和deepconsol的数据初始化一起异步并发运行,所以必须是个异步函数 。
传参
当前业务逻辑执行的p2,包含当前情景、版本、年度、期间、实体的信息。
传参示例:
{
'parent': None,
'jobId': 'consol_process_9e364a47-5f83-44b3-a587-510e0f2af3df',
'period': '10',
'year': '2021',
'scenario': 'Actual',
'consolProcessElement': {
'elementName': 'Consol_Process',
'elementType': 'CONSOLPROC',
'folderId': 'DIRcca5ba18db1d',
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\',
'serverName': 'consolidation-process-server1-0'
},
'consolUnitContribution': {
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\\\Consol_Process_tables_ywdy',
'serverName': 'data-table-mysql-server1-0',
'elementType': 'DAT_MYSQL',
'folderId': 'DIRdf5a85392bd9',
'elementName': 'consol_unit_contribution',
'tableName': 'eijuqd015_t_contribution_fk09y'
},
'consolUnit': {
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\\\Consol_Process_tables_ywdy',
'serverName': 'data-table-mysql-server1-0',
'elementType': 'DAT_MYSQL',
'folderId': 'DIRdf5a85392bd9',
'elementName': 'consol_unit',
'tableName': 'eijuqd015_consol_unit_8ws4a'
},
'unitId': '2023_1_Actual_Working_TotalEntity',
'version': 'Working',
'entity': 'MC',
# 执行逻辑的枚举:包括calculation、consolidation、contribution、translation
'action': 'consolidation',
'target_vertex': 'MC',
# 目标实体
'all_entity': {
# 与当前合并实体有依赖关系的所有实体 包括当前实体
'M',
'P',
'C',
'B',
'[AC].[A]',
'[HC].[E]',
'[MC].[P]',
'[AC].[BC]',
'[HC].[F]',
'[HC].[D]',
'[MC].[AC]',
'N',
'D',
'[MC].[M]',
'BC',
'[BC].[C]',
'AC',
'E',
'[MC].[N]',
'A',
'HC',
'F',
'[BC].[B]',
'MC',
'[MC].[HC]'
},
'consol_task_id': '25ec16de-0ab8-4caa-a7c8-401d8ffb3051',
'completed_entity'
# 已经执行完合并逻辑的所有实体 不包括当前实体
: {
},
'submit_success_flag':False #数据是否已经提交 应该为False
}
使用方法
在custom文件夹下建custom_get_consol_custom_data_async.py文件,文件中定义一个custom_get_consol_custom_data_async 异步函数,入参为p2。
函数示例:
from Python.config import AsyncDataTable
async def custom_get_consol_custom_data_async(p2):
table = AsyncDataTable(element_name='a')
rlt = await table.select()
return rlt
描述
业务逻辑执行前调度起来的函数,目标提供诸如定制化校验之类的功能。
传参
当前业务逻辑执行的p2,包含当前情景、版本、年度、期间、实体的信息。
传参示例:
{
'parent': None,
'jobId': 'consol_process_9e364a47-5f83-44b3-a587-510e0f2af3df',
'period': '10',
'year': '2021',
'scenario': 'Actual',
'consolProcessElement': {
'elementName': 'Consol_Process',
'elementType': 'CONSOLPROC',
'folderId': 'DIRcca5ba18db1d',
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\',
'serverName': 'consolidation-process-server1-0'
},
'consolUnitContribution': {
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\\\Consol_Process_tables_ywdy',
'serverName': 'data-table-mysql-server1-0',
'elementType': 'DAT_MYSQL',
'folderId': 'DIRdf5a85392bd9',
'elementName': 'consol_unit_contribution',
'tableName': 'eijuqd015_t_contribution_fk09y'
},
'consolUnit': {
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\\\Consol_Process_tables_ywdy',
'serverName': 'data-table-mysql-server1-0',
'elementType': 'DAT_MYSQL',
'folderId': 'DIRdf5a85392bd9',
'elementName': 'consol_unit',
'tableName': 'eijuqd015_consol_unit_8ws4a'
},
'unitId': '2023_1_Actual_Working_TotalEntity',
'version': 'Working',
'entity': 'MC',
# 执行逻辑的枚举:包括calculation、consolidation、contribution、translation
'action': 'consolidation',
'target_vertex': 'MC',
# 目标实体
'all_entity': {
# 与当前合并实体有依赖关系的所有实体 包括当前实体
'M',
'P',
'C',
'B',
'[AC].[A]',
'[HC].[E]',
'[MC].[P]',
'[AC].[BC]',
'[HC].[F]',
'[HC].[D]',
'[MC].[AC]',
'N',
'D',
'[MC].[M]',
'BC',
'[BC].[C]',
'AC',
'E',
'[MC].[N]',
'A',
'HC',
'F',
'[BC].[B]',
'MC',
'[MC].[HC]'
},
'consol_task_id': '25ec16de-0ab8-4caa-a7c8-401d8ffb3051',
'completed_entity'
# 已经执行完合并逻辑的所有实体 不包括当前实体
: {
},
'submit_success_flag':False #数据是否已经提交 应该为False
}
使用方法
在custom文件夹下建custom_process_control_pre_python.py文件,文件中定义一个custom_process_control_pre_python 同步函数,入参为p2。
函数示例:
def custom_process_control_pre_python(p2):
print(p2)
描述
合并中每个entity执行完毕后的自动调起来的函数
传参
当前业务逻辑执行的p2,包含当前情景、版本、年度、期间、实体的信息,如果是执行的是合并逻辑还会包括目标节点(target_vertex)、所有与目标节点有计算依赖关系的实体集合(all_entity)、以及执行完毕的实体集合(completed_entity)。
传参示例:
{
'parent': None,
'jobId': 'consol_process_9e364a47-5f83-44b3-a587-510e0f2af3df',
'period': '10',
'year': '2021',
'scenario': 'Actual',
'consolProcessElement': {
'elementName': 'Consol_Process',
'elementType': 'CONSOLPROC',
'folderId': 'DIRcca5ba18db1d',
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\',
'serverName': 'consolidation-process-server1-0'
},
'consolUnitContribution': {
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\\\Consol_Process_tables_ywdy',
'serverName': 'data-table-mysql-server1-0',
'elementType': 'DAT_MYSQL',
'folderId': 'DIRdf5a85392bd9',
'elementName': 'consol_unit_contribution',
'tableName': 'eijuqd015_t_contribution_fk09y'
},
'consolUnit': {
'path': '\\Consolidation_App\\Consol_Process\\UX_Process\\\\Consol_Process_tables_ywdy',
'serverName': 'data-table-mysql-server1-0',
'elementType': 'DAT_MYSQL',
'folderId': 'DIRdf5a85392bd9',
'elementName': 'consol_unit',
'tableName': 'eijuqd015_consol_unit_8ws4a'
},
'unitId': '2023_1_Actual_Working_TotalEntity',
'version': 'Working',
'entity': 'MC',
# 执行逻辑的枚举:包括calculation、consolidation、consolidation_all、contribution、translation
'action': 'consolidation',
'target_vertex': 'MC',
# 目标实体
'all_entity': {
# 与当前合并实体有依赖关系的所有实体 包括当前实体
'M',
'P',
'C',
'B',
'[AC].[A]',
'[HC].[E]',
'[MC].[P]',
'[AC].[BC]',
'[HC].[F]',
'[HC].[D]',
'[MC].[AC]',
'N',
'D',
'[MC].[M]',
'BC',
'[BC].[C]',
'AC',
'E',
'[MC].[N]',
'A',
'HC',
'F',
'[BC].[B]',
'MC',
'[MC].[HC]'
},
'consol_task_id': '25ec16de-0ab8-4caa-a7c8-401d8ffb3051',
'completed_entity'
# 已经执行完合并逻辑的所有实体 不包括当前实体
: {
'M',
'P',
'C',
'B',
'[AC].[A]',
'[HC].[E]',
'[MC].[P]',
'[AC].[BC]',
'[HC].[F]',
'[HC].[D]',
'[MC].[AC]',
'N',
'D',
'[MC].[M]',
'BC',
'[BC].[C]',
'AC',
'E',
'[MC].[N]',
'A',
'HC',
'F',
'[BC].[B]',
'[MC].[HC]'
},
'submit_success_flag':True #数据是否已经提交 可能为False
}
触发时点
合并中每个entity执行完毕后
使用方法
在custom文件夹下建custom_process_control_python_after_each_entity_finished.py文件,文件中定义一个custom_process_control_python_after_each_entity_finished函数,入参为p2。
函数示例:
import re
def custom_process_control_python_after_each_entity_finished(p2):
print(p2)
描述
定制化结转EC函数(适用于非分层调度逻辑)
传参
deep_consol自身
前提
需在config.py文件中设定config.use_custom_data_to_ec 为True
使用方法
在custom文件夹下建custom_data_to_ec.py文件,文件中定义一个custom_data_to_ec 函数,入参为deep_consol。
函数示例:
def custom_data_to_ec(dc):
print(dc.cur_entity)
from ..deepcube_config import get_main_cube
from deepcube.dimension.dimension import ValueDim, PeriodDim, YearDim, VersionDim, \
ScenarioDim, VerEntityDim, GenericDim, AccountDim, ViewDim
main_cube = get_main_cube()
from deepcube.cube.function import sum, round
ye: YearDim = main_cube.year
per: PeriodDim = main_cube.period
sc: ScenarioDim = main_cube.scenario
ver: VersionDim = main_cube.version
en: VerEntityDim = main_cube.entity
ac: AccountDim = main_cube.account
val: ValueDim = main_cube.value
d1: GenericDim = main_cube.d1
view: ViewDim = main_cube.View
main_cube.clear_scope()
all_account_set = ac.root.Descendant()
consolidated_account_set = all_account_set.loc[
all_account_set.attr('ud2').isin(['true'])]
all_d1_set = d1.root.Descendant()
consolidated_d1_set = all_d1_set.loc[
all_d1_set.attr('ud2').isin(['true'])]
scopes = (en[dc.cur_entity],
ye[dc.cur_year],
per[dc.cur_period],
sc[dc.cur_scenario],
ver[dc.cur_version],
val[dc.local_currency],
consolidated_account_set,
consolidated_d1_set,
view['YTD'],
)
main_cube.scope(*scopes)
main_cube.clear_data()
child_member_list = ['[' + dc.node.member() + ']' + '.' + '[' + i + ']'
for i in dc.node.list(dc.node.member(), 'child')]
main_cube.loc[val[dc.local_currency]] = round(sum(
main_cube.loc[
val['ContributionTotal'], en[child_member_list]]), 2)
描述
定制化结转EC函数(适用于分层调度逻辑)
传参
deep_consol自身
前提
需在config.py文件中设定config.use_custom_data_to_ec 为True
使用方法
在custom文件夹下建custom_data_to_ec_batch.py文件,文件中定义一个custom_data_to_ec_batch 函数,入参为deep_consol。
函数示例:
def custom_data_to_ec_batch(dc):
print(dc.cur_entity_set)
from ..deepcube_config import get_main_cube
from deepcube.dimension.dimension import ValueDim, PeriodDim, YearDim, VersionDim, \
ScenarioDim, VerEntityDim, GenericDim, AccountDim, ViewDim
main_cube = get_main_cube()
from deepcube.cube.function import sum, round
ye: YearDim = main_cube.year
per: PeriodDim = main_cube.period
sc: ScenarioDim = main_cube.scenario
ver: VersionDim = main_cube.version
en: VerEntityDim = main_cube.entity
ac: AccountDim = main_cube.account
val: ValueDim = main_cube.value
d1: GenericDim = main_cube.d1
view: ViewDim = main_cube.View
main_cube.clear_scope()
all_account_set = ac.root.Descendant()
consolidated_account_set = all_account_set.loc[
all_account_set.attr('ud2').isin(['true'])]
all_d1_set = d1.root.Descendant()
consolidated_d1_set = all_d1_set.loc[
all_d1_set.attr('ud2').isin(['true'])]
for cur_entity in dc.cur_entity_set:
scopes = (en[cur_entity],
ye[dc.cur_year],
per[dc.cur_period],
sc[dc.cur_scenario],
ver[dc.cur_version],
val[dc.local_currency],
consolidated_account_set,
consolidated_d1_set,
view['YTD'],
)
main_cube.scope(*scopes)
main_cube.clear_data()
child_member_list = ['[' + cur_entity + ']' + '.' + '[' + i + ']'
for i in dc.node.list(cur_entity, 'child')]
main_cube.loc[val[dc.local_currency]] = round(sum(
main_cube.loc[
val['ContributionTotal'], en[child_member_list]]), 2)

当account维度中存在别名为is_consolidated的ud时,只会对该ud为true的维度成员执行结转ec。
2.6.2.21版本修改了标准deepcube_config.py文件,修改如下,项目组同步代码需注意。

回到顶部
咨询热线
