«上一篇 下一篇»
  计算机工程  2021, Vol. 47 Issue (10): 180-185  DOI: 10.19678/j.issn.1000-3428.0059668
0

引用本文  

刘美佳, 张箐. 基于分布式集群架构的遥感数据传输机制[J]. 计算机工程, 2021, 47(10), 180-185. DOI: 10.19678/j.issn.1000-3428.0059668.
LIU Meijia, ZHANG Qing. Remote Sensing Data Transmission Mechanism Based on Distributed Cluster Architecture[J]. Computer Engineering, 2021, 47(10), 180-185. DOI: 10.19678/j.issn.1000-3428.0059668.

基金项目

国家民用空间基础设施“十二五”陆地观测卫星地面系统建设项目(HK-KJSEW-CS-XCD-01)

通信作者

张箐(通信作者), 正高级工程师

作者简介

刘美佳(1993-), 女, 硕士研究生, 主研方向为传输系统优化、IPv6网络评估

文章历史

收稿日期:2020-10-09
修回日期:2020-11-10
基于分布式集群架构的遥感数据传输机制
刘美佳1,2 , 张箐1     
1. 中国科学院空天信息创新研究院, 北京 100094;
2. 中国科学院大学 电子电气与通信工程学院, 北京 100049
摘要:对地观测遥感卫星数据传输系统采用Linux虚拟服务器集群架构传输遥感卫星数据,而该集群中承载传输任务的新旧服务器在实现断点续传的过程中由于控制信息不同步会造成数据片重复。设计一种基于分布式系统架构的遥感卫星数据处理机制DPM,采用Kafka消息队列快速存储数据包,利用代码模块使Spark Streaming精准提交消息偏移量,并通过数据传输速率与进度统计实时记录DPM运行状态信息。测试结果表明,DPM机制可精准记录与提交消息偏移量并实现数据续传功能,从而确保遥感卫星数据的准确稳定传输。
关键词数据传输系统    遥感卫星数据    分布式系统架构    数据处理机制    Kafka消息队列    
Remote Sensing Data Transmission Mechanism Based on Distributed Cluster Architecture
LIU Meijia1,2 , ZHANG Qing1     
1. Aerospace Information Research Institute, Chinese Academy of Sciences, Beijing 100094, China;
2. School of Electronic, Electrical and Communication Engineering, University of Chinese Academy of Sciences, Beijing 100049, China
Abstract: The data transmission systems of remote sensing satellites for earth observation generally adopt Linux Virtual Server Cluster (LVS)architecture to transmit remote sensing data.However, when the LVSs for data transmission in the cluster try to resume transmission at breakpoints, the control information of the old servers is not synchronized with that of new servers, leading to duplicated data slices.To address the problem, a data processing mechanism based on distributed system architecture for remote sensing satellites, called DPM, is designed.The mechanism employs the Kafka message queue to quickly store packets, and uses a new code module to enable Spark Streaming to submit message offset value accurately.Then efficient data transmission rate and progress statistics method are used to record the running state of the DPM mechanism in real time.Experimental results show that the proposed mechanism can accurately record and submit the offset value of messages, and resume data transmission.It can ensure the accurate and stable transmission of remote sensing satellite data.
Key words: data transmission system    remote sensing satellite data    distributed system architecture    data processing mechanism    Kafka message queue    

开放科学(资源服务)标志码(OSID):

0 概述

随着传感器技术、航空航天技术和数据通信技术的不断发展,遥感卫星在轨服役数量逐年递增[1-3],遥感数据文件数量及其承载的信息量日益提升,使得遥感卫星的数据获取能力高于遥感卫星数据传输系统[4]的负载能力。为解决遥感卫星数据传输系统与传输需求的矛盾,通常有两种提高传输系统性能的手段。一种是增加数据传输基础设施数量,通过扩展物理承载容量提升传输系统的性能。另一种是优化传输系统架构,通过合理分配现有硬件资源,使得传输系统资源利用率最大化。早期的遥感数据传输系统采用基础的单机模式,在该模式下服务器将遥感数据存储在本地,传输系统通过增加服务器数量扩展系统的吞吐量和存储容量。然而在基础单机模式下服务器之间的传输服务相互独立、数据存储彼此隔离,当一台服务器出现故障时,会导致该服务器上的全部数据无法进行访问和转发。因此,在该模式下的传输系统抗风险能力低,故障恢复速度慢。

随着集群技术的发展和完善,遥感卫星数据传输系统采用Linux虚拟服务器(Linux Virtual Server,LVS)集群模式[5-7]代替基础单机模式。LVS集群将多台服务器虚拟成一台服务器为客户端提供文件传输服务,该集群中的所有服务器共享同一个网络地址和存储系统。在LVS集群架构下,只要发生故障的服务器数量小于提供服务的服务器数量,就能保证传输遥感卫星数据传输任务正常进行,从而增强了传输系统的鲁棒性[8]。但是在该集群架构模式下,在客户端与集群服务器之间的文件传输协议(File Transfer Protocol,FTP)[9]连接中断后,负载服务器将新建连接负载至其他服务器时,如果新旧服务器同时读写共享存储的同一路径文件则会导致该文件中部分数据重复。当收发两端的数据文件大小不一致时,需要重新传输数据文件,重传不仅需要额外地消耗传输系统的资源,而且降低了遥感数据文件的实时性。本文介绍对地观测遥感卫星数据传输系统,分析当前遥感传输技术的研究现状,提出基于分布式系统架构的遥感数据处理机制DPM。

1 相关工作 1.1 对地观测遥感卫星数据传输系统

中国科学院遥感与数字地球研究所负责接收与传输对地观测遥感卫星数据,其地面传输系统由多个地面接收站[10]和一个数据接收中心组成。地面接收站负责对接遥感卫星,并将数据上传到数据接收中心进行汇总和备份。目前,该传输系统拥有5个卫星地面接收站,每天接收并传输约200个遥感卫星数据文件,总数据量高达1 TB,平均每个文件的数据量约60 GB。

当前遥感卫星数据传输系统两端均采用LVS集群技术实现高吞吐、可扩展、高冗余的数据传输,集群内部架构如图 1所示。LVS集群架构分为3层:第1层是负载调度服务器,位于LVS集群前端,采用轮询(Round-Robin,RR)调度算法将客户端的连接请求分发给真实服务器;第2层是服务器池,位于集群服务系统的中间层,由多个性能相同的服务器组成,是用于处理客户端请求的真实服务器;第3层是共享存储,由多个存储设备组成,为真实服务器提供同一的存储接口,实现主存储共享。该集群架构具有高容错能力,可以确保传输任务不受一个甚至多个服务器宕机的影响,从而为客户端提供持续稳定的数据传输服务。

Download:
图 1 LVS集群内部架构 Fig. 1 Internal architecture of LVS cluster

LVS集群基于TCP/IP协议栈第4层协议(TCP、UDP)[11-12]实现负载调度,当遥感卫星数据传输系统重新创建FTP数据连接时套接字字段改变,负载均衡服务器为客户端重新分配真实服务器。此时如果旧服务器未能将本地接收的数据及时写入共享存储,则会导致部分数据片重复。由于无法定位重复数据片在整轨数据中的位置,因此只能丢弃整轨数据并进行重传,这不仅会浪费网络资源,而且会降低遥感数据的实时性。

1.2 遥感传输技术

文献[13]针对C/S模式下存在的遥感数据传输速度慢、负载任务重等问题,提出一种快速的遥感数据传输策略RSDFT。该策略可以根据数据下载速率的变化,动态地选择合适的资源服务器,从而提高客户端下载速率。然而RSDFT并未考虑数据断点续传的情况,因此该策略不适用于当前的对地观测遥感卫星数据传输系统。

文献[14]提出遥感数据传输的多源模式,通过采用多个数据源向同一个用户提供数据传输服务的方式实现遥感数据的快速传输。实现该模式的条件是需要多个数据源,然而由于对地观测遥感数据传输系统的数据源具有唯一性,因此该传输模式也不适用于解决本文的数据传输问题。

上述文献从增加资源服务器数量或数据源数量的角度出发提升系统传输速率,但是上述方法均不能解决当前对地观测遥感卫星数据传输系统中存在的数据片重复问题,因此本文提出DPM机制。

2 基于分布式系统架构的遥感数据处理 2.1 功能设计

DPM机制是针对当前遥感卫星数据传输系统架构提出的工作在FTP服务器和共享存储之间的中间件,其整体架构设计如图 2中的虚线框所示。DPM机制主要包含消息队列、Spark Streaming集群和数据记录模块,其中:消息队列由Kafka发布订阅消息系统[15]实现,用于快速有效地存储真实服务器中的数据;Spark Streaming集群[16]负责按序拉取消息队列中的数据并将其持久化到HDFS[17],偏移量提交模块被用于提高Spark Streaming的准确性;数据记录模块负责记录DPM机制的实时状态,用于系统故障恢复查询和实时监控查询。

Download:
图 2 DPM整体架构设计 Fig. 2 Design of DPM overall architecture
2.2 技术实现 2.2.1 基本原理

Kafka[18]是由Linkedin[19]开发的发布订阅消息系统,架构如图 3虚线框中上半部分所示。Kafka分布式集群系统由多个服务器组成,集群中的服务器被称作代理服务器(Broker)[20-21]。代理服务器中用主题(Topic)代表逻辑上的消息集合,用分区(Partition)表示物理设备上的实际存储队列。消息队列在存储数据时采用备份机制,通过在不同的代理服务器上创建主备消息队列确保数据准确性。Spark[22-23]是大数据处理引擎,可以实现内存的统一管理。Spark Streaming[24-25]是Spark的扩展模块,用于处理实时大规模流式数据,架构如图 3虚线框中下半部分所示。Spark Streaming运行在Spark的核心架构上,首先使用Streaming Context作为数据流的入口,从消息队列中拉取数据。然后在内存中快速处理消息,并将处理后的数据写入硬盘进行永久存储。

Download:
图 3 Kafka和Spark Streaming架构 Fig. 3 Kafka and Spark Streaming architecture
2.2.2 应用设计

DPM代码执行流程如图 4所示。

Download:
图 4 DPM代码执行流程 Fig. 4 Procedure of DPM code execution

DPM代码执行步骤具体如下:

1)创建消息队列,从服务器中快速拉取数据。

2)Spark Streaming不断获取消息队列中的数据,并将处理后的数据存储到HDFS。

3)偏移量提交模块将处理完成的消息偏移量值写入数据库。

4)使用处理一个批次数据所用的时间值除以数据条数得出处理速率,并将该值写入数据库。

5)使用消息队列中的最大偏移量值除以提交偏移量值得出任务进度,并将该值写入数据库。

6)在收到传输结束符且任务进度为100%时,代表数据传输任务完成。

2.3 代码实现

Kafka消息队列核心代码及注释具体如下:

foreachPartition(partitoin = >

{//服务器将数据推入Kafka消息队列

val props = new Properties()

//获取JDK提供的Properties工具类

val producer = new KafkaProducer[String,String](props)

//创建Kafka生产者对象

partitoin.foreach(item = >

{//将数据逐条存入Kafka

val msg = new ProducerRecord[String,String]("qz_log",item)

producer.send(msg)

//调用数据发送函数})}

Spark Streaming核心代码及注释具体如下:

val ssc = new StreamingContext(conf,Seconds(3))

//创建StreamingContext对象

val topics = Array("qz_log")//保存topic

val kafkaMap:Map[String,Object] = Map[String,Object]

(//配置处理数据的参数

"enable.auto.commit" - > (false:lang.Boolean)

//取消自动提交,设计偏移量提块代码)

val inputDStream:InputDStream[ConsumerRecord[String,String]] = if(offsetMap.isEmpty)

{//判断是否有偏移量,若无则重新消费

KafkaUtils.createDirectStream

(ssc,LocationStrategies.PreferConsistent,Consumer Strategies.Subscribe[String,String](topics,kafkaMap))}

else { //若有则根据偏移量继续消费

KafkaUtils.createDirectStream

(ssc,LocationStrategies.PreferConsistent,Consumer Strategies.Subscribe [String,String](topics,kafkaMap,offsetMap))}

dataValueStream.foreachRDD(rdd = >

{ //将数据写入HDFS

val destPath = ConfigurationManager.getProperty("hdfs.dest.path")

//从配置文件处获取写出路径})

数据记录模块核心代码及注释具体如下:

val sqlProxy = new SqlProxy()

//将数据结果写入数据库

val offsetMap = new mutable.HashMap[TopicPartition,Long]()

val client = DataSourceUtil.getConnection

//获取jdbc连接client

valoffsetRanges:Array[OffsetRange]=rdd.asInstanceOf[HasOffsetRanges].offsetRanges

for(or < - offsetRanges)

{sqlProxy.executeUpdate(client,s"replace into 'offset_manager'(groupid,topic,'partition',untilOffset)values(?,?,?,?)",

Array(groupid,or.topic,or.partition.toString,or.untilOffset))}

//偏移量提交模块

val or:OffsetRange = offsetRanges.head

val logSize = or.untilOffset-or.fromOffset

val batchProcessTime = getBatchProcess-Time(accumulator)

val velocity =(logSize × 1.0 / batchProcess-Time).formatted("%.3f")

//传输速率计算方法

val progress =(or.untilOffset × 1.0 / maxOffset).formatted("%.3f")

//传输进度计算方法

sqlProxy.executeUpdate(

client,"replace into 'offset_metrics'(groupid,topic,'partitionID',logsize,velocity,progress)values(?,?,?,?,?,?)",

Array(groupid,or.topic,or.partition.toString,logSize.toString,velocity,progress.toString))

//将处理进度信息写入数据库

3 测试与结果分析 3.1 测试环境搭建

使用3台性能相同的虚拟机搭建成Hadoop分布式集群。硬件参数为8 GB内存、40 GB磁盘,2.6 GHz CPU,Centos6.10操作系统。软件安装顺序与版本号如表 1所示。

下载CSV 表 1 软件安装顺序与版本号 Table 1 Software installation sequence and version number

网络配置信息如表 2所示。Hadoop集群由Hadoop101、Hadoop102、Hadoop103这3个节点组成。Kafka部署在Hadoop集群上,每台设备用Broker-ID唯一标识,标识号1、2、3分别对应主机Hadoop101、Hadoop102、Hadoop103。Apache Spark计算平台兼容Hadoop,可直接部署在Hadoop集群上。

下载CSV 表 2 网络配置信息 Table 2 Network configuration information
3.2 结果分析

Spark Streaming偏移量提交模块的测试结果如图 5所示,其中,offset-DPM表示DPM机制记录的偏移量值,offset-auto表示Spark Streaming自动提交的偏移量值。测试结果表明,Spark Streaming自动提交的偏移量值相比DPM机制记录的偏移量值更精确。

Download:
图 5 偏移量提交模块的测试结果 Fig. 5 Test results of offset submission module

图 6给出了DPM机制在传输数据时的状态信息。测试结果表明,在传输过程中DPM机制的吞吐量约稳定于3 000条/s,证明了DMP机制具有稳定性和高效性。

Download:
图 6 DPM状态信息的测试结果 Fig. 6 Test results of DPM status information

图 7给出了DPM机制续传数据的性能测试结果,其中,maxoffset代表分区中的最大偏移量值,consumeroffset代表偏移量提交模块的记录值。测试结果表明:当服务器停止向消息队列中写入数据时,并不影响DPM机制向HDFS中写入数据;当服务器继续向消息队列写入数据时,DPM机制也能及时处理消息队列中的数据,验证了DPM机制的续传性能良好,且不受服务器状态影响。

Download:
图 7 DPM续传数据的性能测试结果 Fig. 7 Performance test results of DMP resume data
4 结束语

当前对地观测遥感卫星数据传输系统采用集群加分布式存储架构模式存储遥感卫星数据,在实现FTP断点续传数据时因真实服务器变更造成部分数据重复,导致整轨卫星数据重传,从而降低传输效率和数据实时性。本文提出DPM机制对遥感卫星数据传输系统架构进行优化,将消息队列和实时计算框架相结合实现数据实时接收和处理,利用偏移量提交模块精准记录偏移量值,采用数据记录模块记录DPM机制自身的状态信息。测试结果表明,DPM机制可保证传输数据的准确性和实时性。下一步将设计DPM机制的应用程序接口,实现segment文件周期等参数的灵活配置,并将DPM机制在实际的遥感卫星数据传输系统中进行验证,提高遥感数据传输系统的吞吐量。

参考文献
[1]
TOTH C, JOZKOW G. Remote sensing platforms and sensors: a survey[J]. ISPRS Journal of Photogrammetry and Remote Sensing, 2016, 115: 22-36. DOI:10.1016/j.isprsjprs.2015.10.004
[2]
XUE J R, SU B F. Significant remote sensing vegetation indices: a review of developments and applications[EB/OL]. [2020-09-11]. https://downloads.hindawi.com/journals/js/2017/1353691.pdf.
[3]
SUN W W, YANG G, CHEN C, et al. Development status and literature analysis of China's earth observation remote sensing satellites[J]. Journal of Remote Sensing, 2020, 24(5): 479-510. (in Chinese)
孙伟伟, 杨刚, 陈超, 等. 中国地球观测遥感卫星发展现状及文献分析[J]. 遥感学报, 2020, 24(5): 479-510.
[4]
Center for Earth Observation and Digital Earth Chinese Academy of Sciences, College of High Technology Research and Development, Chinese Academy of Sciences. Operation and development of remote sensing satellite ground station[J]. Bulletin of Chinese Academy of Sciences, 2010, 25(3): 353-357. (in Chinese)
中国科学院对地观测与数字地球科学中心, 中国科学院高技术研究与发展局. 遥感卫星地面站的运行与发展[J]. 中国科学院院刊, 2010, 25(3): 353-357. DOI:10.3969/j.issn.1000-3045.2010.03.018
[5]
WEI Q, XU G L, LI Y L. Research on cluster and load balance based on Linux virtual server[C]//Proceedings of International Conference on Information Computing and Applications. Berlin, Germany: Springer, 2010: 169-176.
[6]
ZHANG W S. Linux virtual server clusters[EB/OL]. [2020-09-11]. http://ximesk.free.fr/%5BCours%5D/%5BLinux%5D/Linux_Virtual_Server_Clusters.2003.LinuxMag.pdf.
[7]
YANG L, GUO Q P. On load balancing technique and application based on Linux virtual server[J]. Journal of Wuhan University of Technology(Transportation Science & Engineering), 2004, 28(1): 77-79. (in Chinese)
杨磊, 郭庆平. 负载均衡技术分析及LVS实现[J]. 武汉理工大学学报(交通科学与工程版), 2004, 28(1): 77-79. DOI:10.3963/j.issn.2095-3844.2004.01.022
[8]
XU F, ZHU J F, MIAO J J. The robustness of high-speed railway and civil aviation compound network based on the complex network theory[J]. Complex Systems and Complexity Science, 2015, 12(1): 40-45. (in Chinese)
徐凤, 朱金福, 苗建军. 基于复杂网络的空铁复合网络的鲁棒性研究[J]. 复杂系统与复杂性科学, 2015, 12(1): 40-45.
[9]
POSTEL J, REYNOLDS J. File Transfer Protocol(FTP)[EB/OL]. [2020-09-11]. https://www.researchgate.net/publication/239548910_File_transfer_protocol_FTP.
[10]
WANG W Y, FENG X X. Security design of servo system loop for remote sensing satellite ground receiving station[J]. Telecommunication Engineering, 2014, 54(8): 1156-1159. (in Chinese)
王万玉, 冯旭祥. 遥感卫星地面接收站伺服系统环路安全性设计[J]. 电讯技术, 2014, 54(8): 1156-1159.
[11]
FLOYD S. Transmission control protocol[EB/OL]. [2020-09-11]. https://www.researchgate.net/publication/238670432_Transmission_control_protocol.
[12]
POSTEL J. User datagram protocol[EB/OL]. [2020-09-11]. https://www.cnblogs.com/guopengzhen2020/articles/12942967.html.
[13]
GE Q, CHEN Q C, ZHOU K, et al. Research on a fast remote sensing data transmission policy[J]. Computer Engineering, 2016, 42(6): 27-30. (in Chinese)
葛强, 陈前程, 周珂, 等. 一种遥感数据快速传输策略研究[J]. 计算机工程, 2016, 42(6): 27-30. DOI:10.3969/j.issn.1000-3428.2016.06.005
[14]
ZHANG S M. Research on remote sensing data distribution and sharing strategy under multiple data source mode[D]. Kaifeng: Henan University, 2018. (in Chinese)
张帅民. 多数据源模式下遥感数据分发与共享策略研究[D]. 开封: 河南大学, 2018.
[15]
WANG Z H, DAI W, WANG F, et al. Kafka and its using in high-throughput and reliable message distribution[C]//Proceedings of the 8th International Conference on Intelligent Networks and Intelligent Systems. Washington D.C., USA: IEEE Press, 2015: 117-120.
[16]
CHENG D Z, CHEN Y, ZHOU X B, et al. Adaptive scheduling of parallel jobs in spark streaming[C]//Proceedings of IEEE Conference on Computer Communications. Washington D.C., USA: IEEE Press, 2017: 1-9.
[17]
KOU W L, YANG X J, LIANG C X, et al. HDFS enabled storage and management of remote sensing data[C]//Proceedings of the 2nd IEEE International Conference on Computer and Communications. Washington D.C., USA: IEEE Press, 2016: 80-84.
[18]
THEIN K M M. Apache Kafka: next generation distributed messaging system[J]. International Journal of Scientific Engineering and Technology Research, 2014, 3(47): 9478-9483.
[19]
AURADKAR A, BOTEV C, DAS S, et al. Data infrastructure at LinkedIn[C]//Proceedings of the 28th International Conference on Data Engineering. Washington D.C., USA: IEEE Press, 2012: 1370-1381.
[20]
ESTRADA R, RUIZ I. The broker: Apache Kafka[M]//ESTRADA R, RUIZ I. Big data SMACK. Berlin, Germany: Springer, 2016: 165-203.
[21]
JOHN V, LIU X. A survey of distributed message broker queues[EB/OL]. [2020-09-11]. https://arxiv.org/abs/1704.00411.
[22]
Apache Spark. Lightning-fast unified analytics engine[EB/OL]. [2020-09-11]. http://spark.apache.org/.
[23]
ZAHARIA M, XIN R S, WENDELL P, et al. Apache Spark[J]. Communications of the ACM, 2016, 59(11): 56-65. DOI:10.1145/2934664
[24]
KROB J, KRCMAR H. Modeling and simulating Apache Spark streaming applications[J]. Softwaretechnik-Trends, 2016, 36(4): 1-3.
[25]
LIN J C, LEE M C, YU I C, et al. Modeling and simulation of spark streaming[C]//Proceedings of the 32nd International Conference on Advanced Information Networking and Applications. Washington D.C., USA: IEEE Press, 2018: 407-413.