MapReduce框架下一种负载均衡的Top-k连接查询算法

2018-08-24 07:51,,
计算机测量与控制 2018年8期
关键词:直方图排序分数

,,

(1.首都师范大学 信息工程学院,北京 100048; 2.北京交通大学 交通运输学院,北京 100044)

0 引言

排序查询处理对于大规模数据分析至关重要,通常使用的排序查询方法称为Top-k连接查询算法[1]。Top-k查询中,根据每个对象的属性计算一个权重,再通过给定的评分函数为对象进行评分,返回k个最重要的结果[2]。在-+大数据时代,用户检查大量未排序的查询结果集是不现实的。并行化执行不仅可以实现高效地运行,并且可以返回精准的结果。目前MapReduce是一种广泛应用的并行编程环境[3]。

目前,学者也提出了一些并行的Top-k连接查询算法。例如,文献[4]在MapReduce的背景下,提出了两种关于Top-k连接的方法。一种称为RanKloud的算法,其在扫描记录期间计算统计数据,并使用这些统计数据计算提前终止的阈值(Top-k结果的最低分数)。此外,还提出了一种新的分区方法,称为uSplit,旨在以使用敏感方式对数据进行重新分区。然而,RanKloud不能保证正确的返回k个检索结果。另外,常用的一种基于 MapReduce 框架的用来计算Top-k连接结果的通用二路连接算法为Reduce-side join,简称为RSJ[5],其连接是在Reduce函数中实现。

本文在MapReduce编程模型中实现并行Top-k连接查询算法(Parallel Top-k Join,P-TKJ),同时融入提前终止机制和负载均衡机制来增强Top-k连接处理的性能。主要创新点为:在MapReduce中提出了一个新的Top-k连接处理框架,尽可能地利用并行性,并避免链接MapReduce作业的初始化开销;使用直方图形式的数据表示,并融入了提前终止策略、数据过滤和负载平衡策略,以便设计出高效的并行Top-k连接算法。

1 MapReduce 编程模型

MapReduce是Hadoop中的一个编程框架,为并行算法提供了一个容错和可靠的编程环境。为了处理大量的数据,该框架支持一个可扩展的文件系统,称为Hadoop分布式文件系统(HDFS),用于在硬件群集中的机器上存储大量文件。

MapReduce计算过程分成Map和Reduce两个阶段[6],其中数据的格式以键值对呈现,其处理过程如图1所示。

图1 MapReduce编程模型的处理过程

2 问题描述

2.1 Top-k连接查询

给定具有n个得分属性的输入表或关系T,使用τ代表T的记录(或元组),τ[i]是指第i个得分属性(i∈[1,n])。Top-k查询q(k,f)基于单调评分函数f返回k个最佳查询结果。当应用于关系T时,Top-k查询q(k,f)的结果是T中一组k个记录τi,…,τk中得分最小的Δk,即f(θ)的值。在不失一般性的情况下,分数最低的记录被认为是最好的[7]。

通常在排名感知处理中,需要两个(或更多)输入关系连接的Top-k结果,视为一个运算符,称之为Top-k连接查询[8]。可以通过先执行连接,然后通过评分函数对连接记录进行排名,并输出前k个排名结果。然而,这会导致处理过程的资源浪费,所以需要提出高效的算法来解决交织排序和连接的问题[9]。

在本文中,认为输入关系Ti包含了一个连接属性ai,一个得分属性si,以及其他一些属性。因此,Ti由唯一标识符(τ,id)、连接属性值或连接值(τ,ai)和得分属性值(τ,si)所描述的记录组成。本文关注二元多对多的Top-k等值连接,其中输入表T0和T1连接在连接属性a0=a1上,得分属性(s0和s1)的组合是为了生成Top-k连接记录,作为得分函数f的输入。

2.2 问题描述

考虑两个输入表T0和T1,它们分别在一组机器上被水平分割,并具有连接属性a0,a1和得分属性s0,s1。给定由整数k定义的Top-k连接查询q(k,f,T0,T1),和用于组合得分属性s0和s1产生连接记录的单调得分函数f。并行Top-k连接问题要求产生具有最低分数的Top-k连接记录。

在MapReduce环境中,输入表T0和T1被拆分为HDFS块,并按照水平分区的概念存储在HDFS中。一个记录τ在每个文件中都是(τ.id,τ.ai,τ.si)形式,其中τ.id是唯一标识符,τ.ai是连接属性,τ.si是得分属性。除了这个三元组之外,每一行可能都包含其他任意长度的记录元素的属性τ。因此,在一般情况下,每个节点只存储每个关系记录的一个子集。问题在于设计一个由Map和Reduce阶段组成的算法,通过并行方式有效计算Top-k连接方式。

最后,本文注意到Top-k连接并行处理中最昂贵的部分是计算每个连接值的Top-k连接记录。因此,在本文中,我们着重于提供一个完全并行的解决方案来解决这个问题。获得Top-k连接结果的最后一步需要处理k·m个连接记录(其中m表示不同连接值的个数),这通常比初始表Ti的值小几个数量级,即k·m<<|Ti|。因此,可利用一个集中程序来处理这些单独的Top-k结果,而没有显著的开销。

3 提出的并行Top-k连接查询算法

3.1 方法概述

上传两个输入表T0和T1,并作为单独的文件存储在HDFS中,根据得分属性以升序排序。此外,对于每个输入表,计算并存储在HDFS直方图H(T0)和H(T1)中,它们维护一系列连接属性值的记录数。需要注意的是,这些信息可以在输入表上传到HDFS的过程中构建,而开销可以忽略不计。

给定一个Top-k连接查询,计算每个输入表(基于直方图)的分数范围,这些范围决定了作业执行前足以产生正确结果的记录子集。因此,可以选择性地在Map阶段加载和处理存储数据的一小部分,一旦遇到分数值大于边界的记录,就终止Mappers的处理。此外,通过引入数据过滤和负载均衡机制来优化Reduce端连接的性能,该机制均衡地将连接值分配给Reduce任务。

3.2 直方图构建

在Hadoop中处理数据需要上传数据,整个数据集从外部源按顺序读取并存储在HDFS中[10]。这个阶段主要是I/O密集型任务,CPU没有充分利用,可以利用这个阶段在后台建立直方图。通常情况下,直方图的大小比初始数据集要小几个数量级,但是在准确性和磁盘大小之间权衡,即在构建过程中更大直方图可以实现更高的准确性,同时会消耗更多磁盘空间的。

为达到预期的目的,本文选择了构建等宽直方图,其构造简单且符合一次通过的要求。更详细地说,当一个记录τ(τ.ai,τ.si)在上传阶段被读取,可以通过增加对应分数值τ.si的bin的内容来更新连接值为τ.ai的直方图。

图2描绘了相同连接属性值下,T0和T1的等宽直方图。对于每个输入表Ti,创建与连接属性中单独值数量一样多的直方图。每个直方图被表示为H(Ti)。例如,所描述的T1的直方图H(T1)表示它总共包含11个具有连接值a1=x的记录。此外,第一个直方图框表示存在2条记录,得分在0-10之间(表示为[0-10]:2),剩下的bin是:[10-20]:3,[20-30]:2以及[30-40]:4。

图2 相同的连接属性值(a0=a1=x)下,T0和T1的等宽直方图的例子

3.3 提前终止机制

为了减少连接的处理成本,本文只处理两个表的输入记录子集,来保证提供正确的Top-k连接结果。直观地说,只有表Ti中分数低于bi的记录才会参与连接,用来产生Top-k连接结果。 因此,为了实现提前终止操作,需要有一种方法来确定分数范围b0和b1,以便尽可能早地放弃高于bi分数的记录。

1)分数界限估计:将两个表的直方图作为输入,问题在于要计算每个表Ti中输入记录得分的正确分数界限bi。为此,本文使用文献[11]中提出的算法来进行分数界限估计。在实践中,这个算法对两个表格的直方图执行连接,并估计连接结果的数量和分数范围。这个算法的用处为:第一,识别直方图bin和相应分数范围用来产生k个连接记录;第二,确保没有其他直方图bin组合可以产生具有比这第k个连接记录更小分数值的连接记录。为此直方图bin不断被访问和加入,直到加入记录的数量超过k,或者任何直方图bin产生的连接记录得分都不小于当前第k个记录的得分。用一个例子来解释算法的操作,描述如下。

示例1:考虑图2中描述的直方图,并假设Top-k连接结果(k=1)被要求使用作为评分函数的总和。通过检查每个直方图的第一个bin,可以知道在[0-15]范围内存在2(= 1×2)个连接记录,即[0-15]:2。通过每个直方图,还可以知道存在[10-25]:3,[5-20]:4和[15-30]:6。只有在T0的第三个bin被检查后(产生的连接记录没有显示在这里),才可以安全地停止处理,并且报告得分范围b0=15和b1=20。这是因为得分[0-15]内已经有至少2条记录(即多于k=1),并且T0或T1bin组合产生的任何连接记录的分数都将大于15。

2)在Hadoop中实现提前终止操作:假设输入表以HDFS格式存储,并且直方图也可用,创建一个提前终止机制,在Map阶段有选择地只处理分数比各自界限低的输入记录。需要注意的是,提前终止机制是通过扩展Hadoop来实现的,也就是说,不会更改Hadoop核心。

3.4 数据过滤

Map任务会处理一组输入记录(以键值对的形式)并生成一组输出记录。限制输出记录的数量非常重要,这会影响整体性能,因为这些记录需要通过Reduce任务进行混洗(消耗通信成本)和处理(消耗处理成本)。数据过滤技术通常是通过消除不影响结果的输入记录来限制Map输出记录的数量。应该注意的是,数据过滤是依赖于作业的,这意味着每个作业都需要基于查询类型的不同过滤机制。

Top-k查询的过滤过程中,考虑在n维空间Rn中定义的多维数据集S(例如,p∈S且p=[p1,…pn]),以及一个Map任务,即访问完整数据集S的子集S′。另外,让一个偏好函数f(p)=ω1·p1+…+ωn·pn为数据对象赋值。目标是检索出得分最高的top-k对象。对于由Map任务读取的每个对象p∈S′,分配一个分数f(p)。通过在优先队列中保存k个最高得分对象来执行Map任务中的过滤。只有这些k个对象需要发送到Reduce阶段,而不是由Map任务访问的|S′|个对象。

图3所示为一个2维数据集中的Top-k查询过滤例子。白点和黑点对应于由两个不同Map任务访问的对象。 黑点对象的局部Skyline集合用虚线连接。这些是一个Map任务中唯一需要发送到Reduce阶段的象,而剩余的黑点则被过滤。

图3 具有两个Map任务的Top-k示例,空心点对应第1个Mapper,实心点对应第2个Mapper

3.5 负载均衡机制

Reduce任务的工作量由其需要处理和连接的记录数决定[12]。为了执行负载均衡,本文目标是将一些连接值分配给Reduce任务,以最小化每个Reduce任务的最大记录数,这个问题相当于多处理器调度问题。然而,多处理器调度问题是一种NP-hard问题,因此本文使用了一种名为LPT(最长处理时间)的启发式算法来进行调度。该算法根据连接记录的数量对连接值进行排序,然后将它们分配给迄今为止连接总数最低的处理器(Reducer)。

3.6 基于 MapReduce的并行实现

算法1展示了如何在Map阶段实现提前终止、数据过滤和负载均衡机制。该算法将每个输入表的分数界限作为输入,并访问排序的输入表。另外,如上所述,HashMapH用来捕获一些连接值分配给Reduce任务。只要表T1中的输入记录τ的得分低于得分边界b1,即∑si≤bi,则将该记录传递给Reduce任务。以此确保没有得分高于边界的记录可以产生属于Top-k连接的连接结果,从而可以弃用高得分记录的连接结果,显著减少需要传递和处理的记录数量。

算法1:P-TKJ Map阶段输入:T0,T1,b0,b1,H

输出:T0,T1中分数低于b0,b1的记录

Function Map(τ(τ.ai,τ.si))//表Ti中的记录

1:r←H.get(τ.ai)

2:if (τ∈T0) then

3: if(τ.s0≤b0) then

4:τ.tag←0

5: output[(τ.ai,τ.si,τ.tag,r),τ]

6; else

7: if(τ.s0≤b1) then

8:τ.tag←1

9: output[(τ.ai,τ.si,τ.tag,r),τ]

10: 执行数据过滤

11:end

算法2展示了Reduce阶段的流程。将Map阶段的输出键值对根据连接值(τ.ai)分组,并使用自定义分区程序分配给Reduce任务。在每个Reducer中,需要按照得分(τ.si)的升序对每个组中的记录进行排序,这是通过使用组合键排序来实现的。Reduce阶段的输出形式为a,τ.id,τ'.id,f(τ.τ')。

每个Reduce任务将与特定连接属性值相关的所有记录作为输入,并独立于其他Reduce任务,对每个这样的连接值执行Top-k连接。而且,由于按升序对记录进行排序访问,因此只要在存储器(M0和M1)中,从每个输入表(第6行)中只读取与k相同数量的记录即可,因为任何其他记录都不能产生Top-k连接结果。

算法2 :P-TKJ Reduce 阶段

输出:连接值key的Top-k记录。

Function Reduce(key,V)

1:for (τ∈V) do

2:if(τ.tag=0) then

3:载入τinM0

4:else

5:载入τinM1

6:if(M0.size()≥k)and(M1.size()≥k)则

随着互联网和云计算技术的急速发展和普及,云计算在提高使用效率的同时,为数字内容安全和用户个人敏感信息保护带来了很大的挑战。

7:执行提前终止机制

8:output[RankJoin(k,f,M0,M1)]

9:end

4 实验评估

4.1 实验设置

将算法部署在由8个服务器节点组成的内部Hadoop集群[13]中。对于Map和Reduce任务,JVM堆大小设置为2GB。HDFS大小配置为128MB,默认复制因子为3。

使用了两种Hadoop平台上计算Top-k连接的算法进行比较,分别为传统RSJ算法和本文提出的P-TKJ算法。这两种算法的区别在于,本文P-TKJ算法具有提前终止、数据过滤和负载均衡机制。

对于记录数据集,使用了一个合成数据生成器来生成大量的输入数据集。输入表Ti的大小从1 GB到50 GB。根据偏态分布(ZIPF分布)来生成评分属性,其中偏度为0.5,表示为ZI0.5。改变每个表中不同连接值的数量(从100到2000),从而影响连接选择性,以研究它对本文算法的影响。为了验证算法的可扩展性,本文创建了4个不同大小的数据集,记为DS1-DS4。这些数据集的各个参数显示在表1中。另外,各种算法中都设置Top-k中的k=10。

对于性能指标,本文使用的主要度量是每个作业的总执行时间。另外,还测量了在Map和Reduce阶段消耗的CPU时间。

表1 用于可扩展性研究的数据集

4.2 实验结果

图4给出不同数据集大小下,两种算法的总执行时间。图5给出了分别在Map和Reduce阶段所消耗的CPU处理时间。

图4 算法的总执行时间

图5 Map和Reduce阶段所消耗的CPU处理时间

可以看出,P-TKJ算法的执行时间优于RSJ 算法将近1倍。而且,当数据集的大小增加时,优势更加明显。以上实验这有力证明了本文算法支持大量输入的可扩展性。

这是因为RSJ虽然为并行Top-k连接问题提供了一个正确的解决方案,但是它在性能方面有严重的局限性。首先,尽管直观上一小部分列表记录就足以产生正确的结果,但是它需要完整地访问两个输入表。换句话说,就磁盘访问、处理成本以及通信而言,这明显导致资源的浪费。理想情况下,如果确定已经访问过的记录能够产生正确的结果,只需要有选择地只访问几个HDFS块,并终止Map阶段的处理。其次,由于RSJ不使用与每个连接值关联的记录数量知识,为此其将Map输出键(连接值)分配给Reduce任务是随机执行的,这可能会导致不均衡的工作分配,从而延迟了工作的完成。

相比而言,本文使用了提前终止策略,使Map阶段输入记录的数量减少,所以算法比RSJ执行更快。另外,由于本文方法很好地对Reducer任务进行了负载平衡。在没有负载均衡机制时,使用Hadoop默认的基于散列的分区,将Map输出键分配给Reducers,这本质上是一种随机分区。而由于本文的负载均衡机制,以更统一的方式将连接结果分配给Reducers,从而以更公平的方式分配工作。另外,本文融入了数据过滤操作,减少了Reducer任务数量,这也一定程度上提高了算法执行速度。

为了验证不同k对算法性能的影响,这里设定k=5、10、15、20和25。在DS1上分别进行实验,并统计相应的执行时间,结果如图6所示。可以看出,不同k值下两种算法的执行时间几乎不受影响。这是因为连接查询是消耗时间最高的操作。但Top-k通过在连接阶段实行部分合并,不同k值下所维护的元组数量基本相同,所以执行时间也基本不变。

图6 不同k值下的执行时间

5 结论

本文介绍了一种在MapReduce框架上处理Top-k连接的并行化计算框架。使用数据汇总,以直方图的形式表示,并将这些操作在数据上传过程中通过后台CPU处理,以此提高CPU利用率。同时利用提前终止策略、数据过滤和负载均衡策略提高了算法对数据分析访问和处理的效率。实验结果证明了提出算法的可扩展性和有效性。

猜你喜欢
直方图排序分数
符合差分隐私的流数据统计直方图发布
作者简介
分数的由来
Bp-MRI灰度直方图在鉴别移行带前列腺癌与良性前列腺增生中的应用价值
基于差分隐私的高精度直方图发布方法
恐怖排序
节日排序
把握物理难点,分数更上一步
中考频数分布直方图题型展示
……的近似分数的若干美妙性质