手机版

ETL工作流活动优先级的确定及并行实现(3)

发布时间:2021-06-05   来源:未知    
字号:

工作流技术介绍

确定活动优先级的时候,把每一个抽取操作(即起始操作)的优先级定义为1(假定ETL工作流的优先级从数值1开始,依次递增,数值越小优先级越高),然后分析其他的非起始活动。对于每一个尚未确定其优先级的活动,如果它的所有前驱活动都已经确定了优先级,那么可以通过计算它的所有前驱活动优先级的最大值,再加1取得。通过遍历活动集中的所有活动,就可以确定ETL工作流中所有活动的优先级。

算法1 确定ETL工作流活动优先级

输入:图G(V,E)表示ETL工作流,V表示工作流中的活动集,E表示活动之间的拓扑关系;

输出:活动节点集V,包含了ETL工作流中各个活动的执行优先级信息。

LL1 L2 unvisitedforeachact=inV;L3    if(act∈V{

startActivity)L4     L5     actvisited.order{==1;

L6     L7 while(unvisited=visitedunvisited∪{act–}{;

act};}}L8  L9     foreachunvisitedL10     ifactisnotempty){(act橙(in.orderaunvisited,act{

=)max∈{Ea.andordera}∈+1;visited){L11     12     visitedunvisited==visitedunvisited∪{act–}{;

act};}}}

在上面的算法描述中,L1将活动节点集的所有活动都放在unvisited数组中,表示所有活动都是未访问的;L2~L6遍历整个节点集并判断活动act是否是起始操作,如果是则将其优先级设置为1,将其加入visited数组,并从unvisited数组中将其移除;L7~L12不断遍历unvisited数组直到其为空,L9表示如果一个活动act的所有前驱活动的优先级都已经确定,则act的优先级也可以确定,L10计算act所有前驱活动优先级的最大值,再加1作为act的优先级,同时将act加入visited数组,并从unvisited数组中将其移除。2畅2 并行执行ETL工作流中的活动

确定了ETL逻辑模型中每一个活动的优先级之后,ETL工作流引擎会按照优先级从高到低依次执行工作流中的每一个活动。

在使用串行方法执行的工作流引擎中,每次从活动集中获取优先级最高的一个活动并执行,执行完一个活动之后再选择另一个当前优先级最高的活动执行,不断重复这样的过程直到ETL工作流中的所有活动执行完毕如果存在多个活动的优先级相同。

,在上述的串行方法中需

要依次顺序执行这些活动,但是这些活动是可以并行执行的。把这些活动放进一个执行阶段,每一个执行阶段都是一个容器,存放多个优先级相同的活动。同一执行阶段的活动不存在依赖关系,根据前面的讨论,存在依赖关系的活动构成严格偏序关系,其优先级不可能相等。并行执行优先级相同的多个活动能够获得更高的时间效率。

在同一执行阶段中的各个活动,在执行顺序上不存在依赖关系,但ETL工作流是以数据为中心的,各个活动处理的对象是数据库中的数据表或者操作系统中的文件,有可能存在同一执行阶段的两个或者多个活动对同一对象(数据表或者文件)进行操作的情况。如果操作对象是数据库中的数据表,大多数

数据库管理系统都有比较完善的并发执行控制机制,所以ETL工作流引擎把并发读写一个数据表所需要处理的工作(如对数据表加锁等)交给相应的数据库管理系统进行处理;如果在同一执行阶段存在多个活动对同一个文件进行操作,则必须把这些活动分配到不同的执行阶段中执行,每次只允许相互争夺资源(即文件)的一个活动对文件进行读写,这样可以保证ETL工作流中每一个活动所处理的数据满足一致性的要求在并行计算环境中,可以创建多个线程分别执行同一个执

行阶段中可以并行执行的活动。下面的算法描述工作流引擎识别并执行这些活动的过程。

算法2 ETL工作流引擎识别与执行活动算法描述

输入:活动节点集V,包含了ETL工作流中各个活动的执行优先级信息。

输出L:ETL工作流的执行结果L1 L2 unfinishedActsL3  while=V;

runActs(unfinishedActsisnotempty){L4  L5  runActs=foreach=getHighestPriorityActshandleConflictAct(runActs(););L6   7   createThreadactinrunActsunfinishedActs(act{=)unfinishedAct;

–{act};}}

在上面的算法描述中,L1把所有的活动都放在unfinished数组中,表示这些活动是未执行的;L2不断遍历活动集直到其为空,L3函数getHighestPriorityActs遍历所有尚未执行的活动,识别出优先级最高的活动集合,将其放到一个容器中,等待执行;L4函数handleConflictAct处理一个执行阶段中存在多个活动对同一个文件进行操作的情况,实现的功能是只保留互相争夺资源的一个活动在容器中,将其他活动从runActs中移出,放在后续的执行阶段执行,这样可以确保对于任意一个文件,每个执行阶段只有一个活动对其进行操作。L5~L7工作流引擎为可以并行执行的每一个活动act各创建一个线程,并将活动act从通过并行执行多个活动unfinishedActs数组中移除,可以明显提高。

ETL工作流的执

行效率,在更短的时间之内完成目标数据仓库数据的加载与更新,使数据仓库的用户可以及时使用最新的数据进行分析,从而作出更加正确的决策。

3 实验结果分析

本文对上述的算法进行实现,采用C++语言,并使用boost标准模板库thread函数库为每个可以并行执行的活动创建线程(STL)中的vector容器存放优先级相同的活动,使用。

下面对以上提出的算法作一个性能分析,在主流的数据库环境下,对算法中所提出的为优先级相同且相互之间没有依赖关系的多个活动创建线程,并行执行这些活动,与每次只选取ETL间性能的比较工作流中优先级最高的一个活动串行执行的方法进行时。

实验环境为:IntelCoreT23001.6GHz×2,内存为DDR22GBSP2,,磁盘为开发环境为80GBMS,操作系统为VisualStudio.MSNETWindows2003。XPProfessionalOracle10.2g、MSSQLServer2005、MySQL5.1。

数据库环境包括

实验场景是模拟1.2节所举例子的数据,从两个源数据库

中分别抽取注册用户信息到一个中间工作数据库进行处理。

ETL工作流活动优先级的确定及并行实现(3).doc 将本文的Word文档下载到电脑,方便复制、编辑、收藏和打印
×
二维码
× 游客快捷下载通道(下载后可以自由复制和排版)
VIP包月下载
特价:29 元/月 原价:99元
低至 0.3 元/份 每月下载150
全站内容免费自由复制
VIP包月下载
特价:29 元/月 原价:99元
低至 0.3 元/份 每月下载150
全站内容免费自由复制
注:下载文档有可能出现无法下载或内容有问题,请联系客服协助您处理。
× 常见问题(客服时间:周一到周五 9:30-18:00)