«上一篇 下一篇»
  计算机工程  2019, Vol. 45 Issue (7): 20-25  DOI: 10.19678/j.issn.1000-3428.0051077
0

引用本文  

赵宝琦, 李卫东, 邹佳恒, 等. 基于MPI的分布式数据处理系统[J]. 计算机工程, 2019, 45(7), 20-25. DOI: 10.19678/j.issn.1000-3428.0051077.
ZHAO Baoqi, LI Weidong, ZOU Jiaheng, et al. Distributed Data Processing System Based on MPI[J]. Computer Engineering, 2019, 45(7), 20-25. DOI: 10.19678/j.issn.1000-3428.0051077.

基金项目

中国科学院战略性先导科技专项(A类)(XDA10010900);大科学装置联合基金(U1532258)

作者简介

赵宝琦(1993-), 男, 硕士研究生, 主研方向为分布式计算, E-mail:zhaobq@ihep.ac.cn;
李卫东, 研究员、博士、博士生导师; 邹佳恒, 副研究员、博士; 林韬、颜田, 博士

文章历史

收稿日期:2018-04-03
修回日期:2018-05-25
基于MPI的分布式数据处理系统
赵宝琦1,2 , 李卫东1,2 , 邹佳恒2 , 林韬2 , 颜田2     
1. 中国科学院大学 计算机与控制学院, 北京 100049;
2. 中国科学院高能物理研究所, 北京 100049
摘要:为简化江门中微子实验的离线数据处理流程,减少资源消耗,提出一种在分布式计算环境中进行数据处理的通用软件系统。基于信息传递接口实现节点间的通信与数据交换,使用Master/Worker架构对计算作业生命周期进行管理,包括计算作业拆分、计算资源分配以及计算任务执行与监控。测试结果表明,该系统具有良好的可扩展性,其产生的数据与人工逐步执行作业脚本运行模拟软件产生的数据一致。
关键词高能物理    江门中微子实验    离线数据处理    分布式计算    信息传递接口    
Distributed Data Processing System Based on MPI
ZHAO Baoqi1,2 , LI Weidong1,2 , ZOU Jiaheng2 , LIN Tao2 , YAN Tian2     
1. School of Computer and Control Engineering, University of Chinese Academy of Sciences, Beijing 100049, China;
2. Institute of High Energy Physics, Chinese Academy of Sciences, Beijing 100049, China
Abstract: In order to simplify the off-line data processing flow of Jiangmen Underground Neutrino Observation(JUNO) and reduce resource consumption, a general software system is proposed to process data in distributed computing environment.Based on Message Passing Interface(MPI), communication and data exchange between nodes are realized.Master/Worker architecture is used to manage the life cycle of computing jobs, including computing job splitting, computing resource allocation, computing task execution and monitoring.Test results show that the proposed system has good scalability, and the data generated by the system is consistent with the data generated by manual step-by-step execution of job scripts to run simulation software.
Key words: high energy physics    Jiangmen Underground Neutrino Observation(JUNO)    off-line data processing    distributed computing    Message Passing Interface(MPI)    
0 概述

江门中微子实验(Jiangmen Underground Neutrino Observation, JUNO)[1-2]探测来自反应堆的中微子能谱及其振荡信号, 从而确定中微子的质量顺序, 并精确测量PMNS混合矩阵[2]中的θ12参数和2个质量平方差。此外, JUNO还研究超新星中微子、地球中微子、大气中微子、太阳中微子、惰性中微子等。江门中微子探测器置于水池中心, 四周均被2 m以上的水包围以屏蔽来自外太空的干扰信号。在中心探测器安装约18 000支20英寸、约25 000支3英寸的光电倍增管(Photomultiplier Tube, PMT), 在水池内壁安装约2 400支20英寸的PMT, 以探测宇宙射线[3]

在JUNO观测站建成之后, 每年将会产生PB量级的实验数据, 这些数据经过复杂的离线处理后才能供物理分析使用。目前, 实验装置处于探测器建设阶段, 尚未产生真实的实验数据。为开展事例重建、数据刻度修正以及分析等工作, 有研究者使用蒙特卡洛(Monte Carlo, MC)方法产生与真实数据格式相同的模拟数据, 利用这些模拟数据进行预研究, 从而实现探测器与算法性能的优化。

目前, 学者们利用高速网络构建分布式系统, 将运行在不同计算机上的离线数据处理程序集成到单个系统中, 以实时、高效地完成JUNO中的数据处理。在高能物理领域, 主要以作业的形式描述并执行各实验步骤。可用于作业流的软件有DIANE[4]和ILCDirac[5]。其中, DIANE实现了实验流程的自动化管理与执行, 但其使用复杂且低效的公共对象请求代理体系结构(Common Object Request Broker Architecture, CORBA)技术进行分布式节点间的通信, 导致存在一定的线程安全隐患。ILCDirac设计一种多作业的处理流程, 但其基于DIRAC进行开发并与DIRAC紧紧耦合, 缺乏通用性, 无法应用于JUNO等高能物理实验。

本文对系统需求进行分析, 基于信息传递接口(Message Passing Interface, MPI)实现分布式节点间的通信, 使用Master/Worker架构设计一种离线数据处理框架, 以实现JUNO离线数据处理流程的自动化管理与执行, 减少人力资源消耗, 为其他高能物理实验数据提供通用处理系统。

1 需求分析

离线数据处理是JUNO实验的主要任务之一, 其流程如图 1所示。离线数据处理的主要步骤为:MC模拟, 数据刻度修正, 事例重建与分析[6]。离线数据需要通过部署在计算节点上的JUNO离线软件[7]进行处理, 研究者们以提交作业的形式完成该操作。JUNO离线软件基于SNiPER框架[8]开发, 其实现了MC模拟数据产生、刻度修正、数据分析等相关算法与功能。SNiPER框架是为非对撞高能物理实验设计的通用型框架, 其为离线软件提供了事例执行管理与内存管理等服务。目前, 基于SNiPER框架已经实现了JUNO数据的模拟与应用等[9]。作为高能物理实验, JUNO具有数据规模庞大、计算密集的特点, 实验数据处理与分析主要以事例为单位[10], 存储方式以文件为主。每个实验作业可能包含上万个事例, 某些实验步骤需要反复迭代, 因此, 将产生大量的输入、输出以及计算时间消耗。此外, 离线数据处理、物理分析等操作具有流程化特性, 每个步骤均需用户以作业的形式进行描述并执行, 从而消耗大量的人力。因此, 用户需要一种在分布式环境中高效、易用的系统来管理具有不同计算需求的作业流, 以完成作业生命周期中的各项操作。

Download:
图 1 JUNO离线数据处理流程

在本文的JUNO实验中, 数据处理等工作主要由中科院高能物理研究所负责, 并由欧洲的CNAF计算站点进行辅助。高能物理研究所使用HTCondor作为作业调度系统, CNAF使用LSF(Load Sharing Facility)作为资源调度系统。其中, HTCondor是针对高吞吐量计算作业的调度系统, 可以管理和整合集群计算资源, 实现作业队列机制、调度策略、资源监控等功能。HTCondor与LSF的作业提交、监控方式均不同, 系统需要支持这2种后端调度系统, 同时也需进一步提供易于扩展的设计, 以便将来支持新的后端系统。

2 系统框架整体设计

分布式离线数据处理系统是针对分布式环境下的JUNO作业流而设计的一种管理与运行系统。其主要功能是依据作业流中的依赖关系来管理作业流, 同时根据分布式环境将作业细粒度化并调度至分布式节点, 以完成生命周期中的各项操作, 最终收集并整合各分布式节点的执行结果。该系统通过自动化管理流程并执行作业, 在减少人工干预的同时可以高效地完成数据处理工作。

2.1 流程设计

分布式离线数据处理的工作流程如图 2所示, 主要包括4个步骤:申请节点资源, 加载并拆分作业流, 分发并调度任务, 收集任务结果。

Download:
图 2 分布式离线数据处理流程

用户可以通过接口加载作业流、配置系统以及启动系统。系统启动后向分布式集群资源管理器申请计算资源, 计算资源分为管理节点与工作节点2个部分。管理节点启动管理器并载入作业流, 以事例为单位对作业进行拆分以获得细粒度的子任务。当工作节点与管理节点完成连接时, 管理节点通过调度器将作业调度分发至工作节点运行。当所有任务执行完成, 工作节点将结果存放至分布式文件系统中, 收集器负责收集结果存放路径并对结果进行整合。

2.2 架构设计

根据功能与流程的设计需求, 进行系统架构设计, 如图 3所示。该系统架构主要由通信模块、控制器模块、工作代理模块以及配置模块组成。

Download:
图 3 系统架构

通信模块使用服务器/客户端(S/C)架构, 向管理节点控制器提供服务器功能, 向工作节点代理提供客户端功能。服务器与客户端间的信息交换和数据传输基于MPI[11-12]标准完成。MPI是一种应用于集群和异构网络的消息传递标准, 能够以高效、高移植性的方式实现并行程序, 并提供多进程及多节点间的通信管理方式, 包括点对点通信、广播通信、单点传输、进程拓扑等。图 3系统使用MPICH[13]来开发通信层。MPICH是MPI标准的一个重要实现, 由美国阿贡实验室与密西西比州立大学共同完成, 它具有较高的性能和良好的可移植性。在通信模块中, 不同节点进程间的通信通过通信子建立, 相同通信子中的不同进程使用进程号加以区分。消息数据结构如图 4所示, 其中, 标签字段用于识别不同类型的信息, 数据字典中包含具体指令和数据。

Download:
图 4 通信数据包结构

系统中的逻辑控制与主要功能由控制器模块(Master)和工作代理模块(Worker)负责实现, 其设计采用主从(Master/Worker)模型。Master作为控制器运行于管理节点, 内含应用管理器、调度器、工作节点注册表等组件。应用管理器用于加载用户的应用, 然后将其拆分为任务, 最后合并任务结果; 调度器决定任务与工作节点的映射关系以及当任务执行异常时执行特定操作, 其维护一个就绪队列, 管理可被调度的任务以及任务的分发; 工作节点注册表主要管理工作节点的信息。Worker运行于工作节点, 内含节点信息管理、执行进程以及心跳通信等组件, 主要负责任务的执行以及向管理节点进行信息反馈。当新任务到来时, 执行模块启动执行进程, 同时工作节点通过心跳通信不间断地向管理节点反馈当前节点的任务执行情况以及节点资源使用状况, 使管理节点实时监控工作节点的运行情况, 以便在工作节点发生异常时可以及时响应并处理, 从而提高系统的可靠性。

2.3 接口设计

为提高系统的通用性、可扩展性与复用性, 本文在系统架构中加入了配置模块并提供应用接口与任务接口, 以帮助开发者定义特定的作业流。

应用接口为应用的具体定制提供了基本骨架、运行时的上下文以及初始环境。在开发者定制应用时, 需要自定义以实现其中的拆分与合并操作, 从而使系统可以根据定制应用的需求自主地对应用进行拆分与汇总。

任务接口为应用提供了其拆分后的任务骨架, 同时提供流程化的应用控制, 每个任务是执行命令或程序的集合。任务接口使用有向无环图(Directed Acyclic Graphs, DAG)描述任务的前置与后置关系, 并将其保存在管理节点中。

3 系统实现 3.1 系统与应用配置

为使系统更加灵活, 用户可以针对特定需求对配置文件进行修改, 在加载系统中的配置模块后实现系统定制与应用配置。配置文件主要分为系统配置文件和应用配置文件。系统配置文件提供系统运行时所需的配置条目, 可以针对不同的硬件环境与系统执行策略进行相关配置。在应用配置文件中, 开发者或用户可以根据自己的需求设置相关的配置条目, 以供应用运行使用。

3.2 应用控制与调度

在管理节点中, 系统负责实现作业流的拆分与合并、任务调度、工作节点管理等功能。图 5所示为系统框架运行在管理节点上的主要组件及其逻辑关系, 其中包括应用管理器(ApplicationMgr)、调度器(SimpleScheduler)以及控制器(JobMaster)。

Download:
图 5 管理节点的主要组件及其逻辑关系

控制器是管理节点提供的核心组件, 其包含2个线程:

1) 消息侦听处理线程(MessageHandlerThread), 其通过不断侦听通信层以接收工作节点发来的消息, 并针对不同的消息类型进行反馈。例如, 当接收到任务执行成功或失败的信息时, 消息侦听处理线程会调用task_complete()或task_failed()方法并交由调度器处理。

2) 看门狗线程(WatchDogThread), 每隔一段时间对工作节点信息表进行一次扫描, 记录监控节点的健康情况, 以诊断工作节点是否失去连接或处于空闲超时状态, 然后清除处于异常状态的工作节点, 从而提高系统运行的稳定性。

调度器运行在控制器的消息侦听处理线程中, 负责决策任务与工作节点间的映射关系, 同时对执行失败的任务进行处理。调度器对象派生自IScheduler接口, 在内部维护了一个任务队列, 用于存储和分配任务, 并向控制器提供调度服务。当控制器调用调度器对象的schedule(tasks)方法时, 调度器通过一定的调度算法来指定任务分配的工作节点。系统框架中默认使用SimpleScheduler对象作为调度器, 内含Fifo调度算法, 开发者也可以通过IScheduler接口使用其他调度器, 并在系统运行前通过执行脚本来指定调度器。

应用管理器用于加载和管理应用。在图 5中, 应用对象派生自IApplication接口, 执行不同操作的应用都是该接口的具体实现。开发者通过定义任务拆分方法(split())与任务合并方法(merge()), 以提供拆分与合并规则。当应用管理器加载应用后, 会自动调用应用拆分方法, 获得多个需要执行的任务。在所有任务执行结束后, 则调用任务合并方法将结果进行合并与反馈。

3.3 任务执行控制

在工作节点中, 系统负责实现任务的执行与信息反馈功能。其中, 主要包含代理类(WorkerAgent)、执行体类(Worker)和执行进程(Process), 逻辑关系如图 6所示。

Download:
图 6 工作节点的主要组件及其逻辑关系

代理对象以进程的形式运行, 通过通信模块与管理节点实现相互间的通信, 以接收管理节点分发的任务, 并将其保存在任务队列(图 6中的Task_queue)中。当任务队列不为空时, 提取一个任务传递给执行体对象, 执行体首先启动执行进程(图 6中的Process)并对其进行初始化, 然后提取任务中的指令与参数并交由执行进程执行。在执行进程运行时, 实时输出任务执行情况并保存在日志中, 执行体组件可以依据日志实时监控执行进程。当分配的所有任务执行结束时, 执行体对象调用finalize()方法对执行进程进行清理并退出, 执行体组件将任务结果传递至代理对象, 最后返回至管理节点控制器。执行进程与代理对象之间以消息队列的形式进行通信, 当执行进程开始执行任务后, 代理对象中的心跳通信线程会持续监听来自管理节点的信息并监控执行进程的状态, 其他线程则进入休眠状态以减少资源消耗。

3.4 流程控制

系统的流程由管理节点的管理器、调度器、应用管理器、工作节点的工作代理和执行进程共同控制。

当管理节点启动后, 首先加载配置文件进行环境配置与初始化工作。随后应用管理器加载应用, 依据应用拆分规则将应用拆分为任务并保存在管理器中。当应用拆分结束后, 任务调度器加载调度算法, 依据调度算法将任务加载至调度队列中。后续流程如图 7所示, 当工作节点启动后, 工作代理对象向管理节点发送注册信息。管理节点收到消息后, 将节点注册信息保存并返回应用初始化所需的命令与数据。工作节点接收并执行应用初始化操作, 向管理节点发送初始化结果。管理节点收到并核实初始化结果后, 发送一个调度消息给任务调度器告知其可以调度任务至相应的工作节点, 然后等待工作节点的代理模块请求派发任务。当管理器收到工作代理的请求后, 通过调度器将就绪队列中的任务派发到相应的工作节点上。工作节点接收到任务后将其解析为命令与数据, 并交由执行进程执行。

Download:
图 7 系统流程控制框架

当任务执行完成后, 计算结果被保存至分布式文件系统中, 结果路径以消息的形式被回传至管理器, 同时进行下一轮的计算任务请求。当调度器中所有任务执行结束后, 应用管理器依据应用的合并规则将所有结果进行整合并得到最终结果。

3.5 任务监控与异常处理

常见的系统错误有执行错误与系统异常。为提高系统运行时的准确性与稳定性, 本文增加了任务监控与异常处理流程。其中, 任务监控用于解决任务执行时所出现的问题, 异常处理用于降低在分布式环境下由于物理原因而引起的系统崩溃现象。

在工作节点中, 工作代理进程会实时监视运行任务的执行进程, 解析执行进程的输出日志, 当出现以下2种情况时, 判定任务执行失败:

1) 输出日志中出现“Error”或其他错误关键字。

2) 任务进程在一定时间内没有输出反馈。

在任务执行失败后, 工作节点将该信息保存在任务中并返回至管理节点控制器。管理节点控制器将失败的任务交给调度器, 调度器根据配置的策略选择忽略或重新分配该任务。

在分布式环境中, 造成系统异常的主要原因包括网络阻塞与节点崩溃。为避免由系统异常造成的系统崩溃现象, 管理节点需要对工作节点的健康状况进行监控。在系统中, 工作节点需要与管理节点进行心跳通信, 管理节点控制器的看门狗线程负责侦听心跳通信, 通信内容包括工作节点所处的状态、正在运行的任务、工作节点的CPU利用率以及内存利用率等, 将上述内容记录在工作节点登记表中。其中, 心跳模型的设计可参考文献[14]。每隔固定的一段时间, 看门狗线程会扫描工作节点登记表, 判断工作节点是否处于异常状态。在系统中定义2种异常状态:

1) 连接丢失。当某工作节点上一次心跳通信时间戳过于久远, 即上一次心跳通信到当前时刻间的间隔超过设定的时间阈值, 管理节点则会认为该节点失效, 将其标记为连接丢失, 并将该工作节点还未完成的任务重新载入调度器, 然后删除该节点。

2) 异常超时。当工作节点始终处于空闲、初始化等非运行状态并超过设定的时间, 则认为该节点没有能力执行任务, 将其标记为异常超时状态, 同时反馈至控制器, 然后删除或重置该节点。

4 性能测试与分析

为验证本文系统的正确性与可扩展性, 将其应用于JUNO实验的离线数据处理与分析。在分布式计算集群中进行测试, 计算节点为多核CPU服务器, 每个节点搭载24个型号为Intel(R) Xeon(R)E5-2650 v4、主频为2.20 GHz的CPU, 计算节点的内存为64 GB, 其资源通过HTCondor资源管理器[15]申请。

4.1 应用执行测试

在执行离线数据处理应用时, 使用相同的随机数种子, 测试生成1 000个能量为1 MeV的正电子事例。当执行进程执行任务时, 进程所属的CPU始终高负载运转, 以充分利用节点的计算资源。为验证作业流执行的正确性, 分别通过人工逐步执行作业(简称为人工执行)和使用本文系统运行应用(简称为系统执行)2种方式产生模拟数据, 比较粒子击中探测器的时间(hitTime)与总光子数(totalPE), 结果如图 8所示。由图 8可以看出, 2种方式下的hitTimetotalPE分布完全相同。

Download:
图 8 2种方式下事例的hitTimetotalPE分布比较
4.2 压力测试

依据本文系统所采用的Master/Worker架构可推测出, 工作节点数量越多, 管理节点所需处理的消息越多、压力越大, 其响应时间可能会增长, 即导致性能瓶颈。因此, 本文主要测试系统在使用不同数量工作节点时管理节点的响应时延。其中, 响应时延记录方法如下:工作节点每次向管理节点发送信息时, 将时间戳保存在发送信息中, 管理节点接收到信息并进行响应处理, 然后向该工作节点发送确认信息。工作节点接收到确认信息后, 将当前时间减去发送信息中的时间戳, 所得部分即为信息响应时延。

在此次测试中, 工作节点与管理节点每隔0.2 s进行一次心跳通信。测试工作节点数量依次为10、20、30、40、50、60、70、80、90、100, 每次持续运行时间为30 min。表 1所示为不同工作节点数量下的平均响应时延情况, 基于此数据绘制出的线性趋势图如图 9所示。其中, 实线为实际测量数据, 虚线是线性趋势线, 其显示响应时延随节点规模的增长趋势。

下载CSV 表 1 不同工作节点数量下系统的平均响应时延情况
Download:
图 9 系统平均响应时延趋势图

图 9可以看出, 管理节点的响应时延会随着工作节点数量的增多而延长, 但总体上升缓慢, 且增幅不超过0.1 s, 该数量级不会对系统性能造成影响。综上, 在压力测试中本文系统表现良好且稳定, 即其具有较好的可扩展性。

5 结束语

JUNO离线数据处理过程耗时较长, 且需要进行复杂的流程控制。为此, 本文提出一种基于MPI的分布式离线数据处理系统。对作业流程进行分析, 采用任务调度器、应用管理器和执行进程等组件实现自动化的流程控制。测试结果验证了该系统的正确性与高效性。下一步考虑将该系统与SNiPER框架相结合, 以提供更友好的用户界面并实现交互式的计算功能。

参考文献
[1]
DJURCIC Z, GUARINO V, CABRERA A, et al.JUNO conceptual design report[EB/OL].[2018-03-20]. https://arxiv.org/vc/arxiv/papers/1508/1508.07166v1.pdf. (0)
[2]
AN Fengpeng, AN Guangpeng, AN Qi, et al. Neutrino physics with JUNO[J]. Journal of Physics G:Nuclear and Particle Physics, 2016, 43(3): 030401. DOI:10.1088/0954-3899/43/3/030401 (0)
[3]
WANG Zhimin. JUNO central detector and its prototyping[J]. Journal of Physics:Conference Series, 2016, 718(6): 062075. (0)
[4]
MOSCICKI J T.Diane-distributed analysis environment for grid-enabled simulation and analysis of physics data[C]//Proceedings of 2003 IEEE Nuclear Science Symposium.Washington D.C., USA: IEEE Press, 2003: 1617-1620. (0)
[5]
ILCDirac for ILC users[EB/OL].[2018-03-20]. http://flcwiki.desy.de/ILCDirac. (0)
[6]
HUANG Xingtao, LI Tao, ZOU Jiaheng, et al.Offline data processing software for the JUNO experiment[EB/OL].[2018-03-15]. https://indico.cern.ch/event/432527/contributions/1072223/contribution.pdf. (0)
[7]
李腾.江门中微子实验离线软件系统的设计与开发[D].济南: 山东大学, 2017. http://cdmd.cnki.com.cn/Article/CDMD-10422-1017062885.htm (0)
[8]
ZOU J H, HUANG Xiangjie, LI W D, et al. SNiPER:an offline software framework for non-collider physics experiments[J]. Journal of Physics:Conference Series, 2015, 664(7): 072053. DOI:10.1088/1742-6596/664/7/072053 (0)
[9]
LIN Tao, ZOU Jiaheng, LI Weidong, et al. The application of SNiPER to the JUNO simulation[J]. Journal of Physics:Conference Series, 2017, 898(4): 042029. (0)
[10]
LI Teng, XIA Xin, HUANG Xingtao, et al. Design and development of JUNO event data model[J]. Chinese Physics C, 2017, 41(6): 066201. DOI:10.1088/1674-1137/41/6/066201 (0)
[11]
CLARKE L, GLENDINNING I, HEMPEL R.The MPI message passing interface standard[M]//DECKER K M, REHMANN R M.Programming environments for massively parallel distributed systems.Berlin, Germany: Springer, 1994: 213-218. (0)
[12]
LUSK E, GROPP W. The MPI message-passing interface standard:overview and status[J]. Advances in Parallel Computing, 1995, 10(6): 265-269. (0)
[13]
GROPP W, LUSK E, DOSS N, et al. MPICH:a high-performance, portable implementation for the MPI message-passing interface[J]. Parallel Computing, 1998, 22(6): 789-828. (0)
[14]
鄢锋, 桂卫华, 胡志坤, 等. 一种网络节点通信控制的心跳模型[J]. 信息与控制, 2008, 37(5): 524-528. DOI:10.3969/j.issn.1002-0411.2008.05.003 (0)
[15]
THAIN D, TANNENBAUM T, LIVNY M. Distributed computing in practice:the condor experience[J]. Concurrency and Computation Practice and Experience, 2005, 17(2-4): 323-356. DOI:10.1002/(ISSN)1532-0634 (0)