海胜专访--MaxCompute 与大数据查询引擎的技术和故事

简介: 在2019大数据技术公开课第一季《技术人生专访》中,阿里巴巴云计算平台高级技术专家苑海胜为大家分享了《MaxCompute 与大数据查询引擎的技术和故事》,主要介绍了MaxCompute与MPP Database的异同点,分布式系统上Join的实现,且详细讲解了MaxCompute针对Join和聚合引入的Hash Clustering Table和Range Clustering Table的优化。

摘要:在2019大数据技术公开课第一季《技术人生专访》中,阿里巴巴云计算平台高级技术专家苑海胜为大家分享了《MaxCompute 与大数据查询引擎的技术和故事》,主要介绍了MaxCompute与MPP Database的异同点,分布式系统上Join的实现,且详细讲解了MaxCompute针对Join和聚合引入的Hash Clustering Table和Range Clustering Table的优化。


以下内容根据演讲视频以及PPT整理而成。


一、MaxCompute VS MPP Database


MaxCompute 与 MPP Database有非常大的不同,主要体现在性能(Performance)、成本(Cost)、可扩展性(Scalability)及灵活性(Flexibility)等度量纬度。

  • 性能(Performance):作为一个数据仓库,大家首先关心的指标是性能。MPP Database典型的产品有Greenplum,Vertica和Redshift等,它们主要针对的在线实时数据的分析,性能要求一般是毫秒级别。而MaxCompute多数场景应用在离线数据下,MaxCompute需要动态的拉起进程和数据封装,如果进行MapReduce还涉及数据落地,所以离线数据的分析会比较慢,这也导致MaxCompute无法适用于实时场景。但在大量数据场景下,MaxCompute会展示出优势,它可以动态调整Instance数量,保证有足够多的Instance处理数据。 而MPP Database一旦开启了固定的Cluster和Node之后,数据量较大时会受到集群计算资源的限制。
  • 成本(Cost):MaxCompute在cost层面占较大优势。首先,数据存储在阿里云上,计算部分也只需要为所付出的计算资源付费,不计算时只需为存储资源付费。而MPP Database一旦开启一定的资源,即使不使用也需要付费。
  • 可扩展性(Scalability):阿里云在起初也使用过MPP Database,MPP Database刚开始就设定了固定的cluster,但是由于阿里云内部的业务数据在不断的增加,导致计算资源严重不足。MaxCompute可以动态分配资源,根据计算的复杂度实时调整Instance数量,保证较高的可扩展性。
  • 灵活性(Flexibility):MaxCompute不仅可以处理SQL的查询,还可以处理MapReduce,以及能够查询Machine Learning节点。由于MaxCompute的高扩展性和灵活性,它可以支持阿里云内部95%的数据计算,承载的任务也非常多。

二、分布式系统上Join的实现


Query Plan Generation流程:首先用户会提交SQL给Parser,Parser将其编译成Relation节点,然后将Relation的节点交给优化器Optimizer,经过一系列的优化,其中包括根据物理转化和逻辑转化。Cost model从中选择代价最低的物理的执行计划。Transformer将最优的计划转成Physical Operator tree,并且将Physical Operator tree交给Manager。Manager启动实例,并交给RunTime执行此次query。这过程中,Cost model从Metadata中获取统计信息(如表的宽度和行数),来选择最优的计划,Apache Calcite被用来作用于Optimizer的框架。


image


Optimizer Core Components:逻辑运算符(Logical Operator)主要描述要做哪些事情,如LogicalInnerJoin做InnerJoin, LogicalScan做扫描,LogicalFilter做过滤。物理运算符(Physical Operator)主要描述怎么做,如HashJoin,MergeJoin及NLJoin代表不同的算法,IndexScan表示索引扫描,TableScan是全标扫描,PhysicalFilter是物理过滤器。逻辑运算符(Logical Operator)可以通过Logical Transformation Rules转化为新的逻辑运算符,还可以通过Logical Implementation Rules转化成物理运算符,如从InnerJoin转化成HashJoin。另外,物理运算符(Physical Operator)可以通过属性的强化(Physical Property Enforcement)产生新的物理运算符(Physical Operator),如通过Distribution满足分布的属性,通过Sort满足排序的属性。

image


下图展示了MaxCompute如何生成一个Join Plan。首先,Inner Join通过PartitionedJoinRule产生物理的plan,既Sort Merge Join,它存在盘古系统中,不满足分布的属性,所以MaxCompute需要进行Exchange。

image

image


也就是按照T1.a和T2.b进行Shuffle,Shuffle之后进行Sort Merge。有相同T1.a的值和T2.b的值会分在同一个bucket中。不同的bucket启动多个Instance,每个Instance处理每个bucket,从而进行分布式计算。其中在Shuffle时占用了较多的资源,它不仅有数据的读写,还包括排序。如何尽量减少排序从而加快数据处理速度是优化的关键。

image

假设T1或T2较小,那么可以将T2的全表广播到T1进行Hash Join,好处是T1不需要多次Shuffle,T2也不需要进行Hash计算和排序。这时Join Plan只包含两个stage,M2 stage对T2进行扫描,之后广播到T1。T1不需要进行Shuffle,使用T2全表的数据建Hash表,再通过T1部分数据进行Hash Build,最后得到Hash Join的结果。

三、MaxCompute针对Join和聚合引入的Hash Clustering Table和Range Clustering的优化


1.Hash Clustering Table

分布式系统上Join的实现会涉及非常多次的Shuffle,为此MaxCompute创建了Hash Clustering Table来实现优化。Hash Clustering Table对选择的column进行Hash,将不同的数据分配到不同的bucket里面,这也就说明在创建Hash Clustering Table时,已经进行了Shuffle和排序。基本语法如下图,clustered by 表明按照column进行Shuffle,sorted by 是按照column进行排序,number of buckets 推荐设置成2的n次方,方便与其它表进行Join。同时也推荐将clustered by和sorted by中的column设置为一样或者clustered by中的column包含sorted by中的column。因为Hash Clustering Table通常被用来做Join和Shuffle Remove,可以利用它已有的属性从而去除掉多余的Shuffle和排序,实现优化的目的。

image


详细步骤如下图,Merge Join对T1发送请求,拉取T1的属性。假设T1为Hash Clustering Table,T1反馈是按照T1.a进行Hash,Hash到100个bucket,同时按照T1.a进行排序。T2同理。这时产生的Join Plan就满足了M1,M2和R3的排序,最后所有的operator只需一个stage(M1),不需要多余的Shuffle。

image


image


与之相反,T2的反馈如果是None,Merge Join会发送请求,使T2按照T2.b进行Hash和排序,设置100个bucket。这时产生的Join Plan包含M1和M2两个stage,T2需要Shuffle,T1则不需要Shuffle,消除了一个stage的Shuffle。

image


假如T2的反馈是按照T2.b进行Hash,Hash到100个bucket,但排序不是T2.b。那么Merge Join 依然请求T2按照T2.b排序。这时Join Plan还是仅仅会有M1一个stage,其中只是多了Sort Operator,但没有多余的Shuffle。

image


如果T2设置了200个bucket,T1的100个bucket会被读两遍,进行过滤,T1的1个bucket会对应T2的2个bucket。这时依然没有Shuffle。

image


Hash Clustering Table的限制:Hash Clustering Table在Data Skew方面有明显的限制。当数据量非常大,将这些数据Hash到一个bucket中导致的后果便是拖慢整个cluster的计算速度。Hash Clustering Table只支持等值的bucket pruning,如果按照a分配bucket,a=5,对5获取Hash值,同时对Hash桶进行取模,那么Hash Clustering Table可以定位出a=5具体在哪个bucket中。但如果不等值,Hash Clustering Table便无法支持。Hash Clustering Table 要求所有的clustering key出现聚合key或者Join key中。在CLUSTERED BY C1, C2; GROUP BY C1情况下,Hash Clustering Table无法实现优化。同样,CLUSTERED BY C1, C2; … Join .. ON a.C1 == b.C1 也无法实现优化,Hash Clustering Table 要求Join key 包含C1和C2。

image

2.Range Clustering Table

Range Clustering Table 顾名思义,按照Range进行排序。MaxCompute自动的决定每个bucket的范围。

image


Range Clustering Table怎样确定bucket的范围?如下图,第一层是Mappers,中间是Job Manager,下一层是Reducers。首先在Stage1进行排序,之后从中抽取直方图,每个Worker将直方图发送给Job Manager。Job Manager合并直方图,根据数据量的大小决定合并成多少个bucket。Job Manager在将Bucket的范围再发送给Mappers,由Mappers决定每一条数据发送到具体哪个bucket。最后Reducers会得到具体的Aggregation Stage。

image


Range Clustering Table的优势非常明显,首先Range Clustering Table支持范围比较(Range Comparison)。 同时它可以支持在prefix keys上的聚合和Join,既在CLUSTERED BY C1, C2; GROUP BY C1 情况下,Range Clustering Table也可以支持优化。

Range Clustering Table如何实现Join:假设T1和T2的Range如下图,因为范围不同无法直接Join。这时需要进行范围的切分,将切分后的范围交给Join Workers,由它读取新的范围。如下图,w0读取T1的切分范围,将T2表的不必要范围剔除。

image


Range Clustering如何按照prefix keys进行Join:Join on prefix keys需要直方图和bucket的重新分配。假设按照a和b进行clustering,从直方图中可以知道a是从哪个地方切分的。对bucket重新分配之后可以更新bucket的范围,最后将新的bucket的范围发送给Join Worker

image


下图展示了在range表和normal表中TPCH的查询时间的对比。可以发现,速度总体上提升了60-70%,其中query 5, 17和21达到了数倍的速度的提升。

image

3.Tips for Clustering Table

如何选择正确的clustering keys,从而达到节省资源和降低速度的目的?下面有几点提示可以提供给大家。首先,如果有Join condition keys,Distinct values,Where condition keys(EQUAL/IN, GT/GE/LT/BETWEEN),那么可以针对这些已有的keys创建 Clustering Table。如果是Aggregate keys ,可以选择创建Range Clustering Table。对于 Window keys, 可以根据Partition keys和Order keys 创建clustering keys和sort keys。举例如下,SELECT row_number() OVER (PARTITION BY a ORDER BY b) FROM foo; 那么Optimizer执行Window时产生的plan是CLUSTERED BY a SORTED BY a,b INTO x BUCKETS,既按照a Shuffle,按照a和b进行排序。在一个bucket中a的值不可能都相同,与a不同的值可以认为是一个frame,在frame中还需要进行b排序,所以每个Instance是按照a和b进行排序。如此便省去了预先的计算,既不需要Shuffle也无需排序。

image


此外,需要注意即使两个Hash表是同样的分布,排序和bucket数量,但如果类型不同依然需要进行Shuffle,因为它们的binary表达方式不同,所以Hash的结果也会不同。另外,Clustering Table创建时耗费时间较长,假如创建Clustering Table之后并没有频繁查询,也会造成浪费。还需要注意Clustering Table尽量避免Data Skew。再一个,使用FULL OUTER Join增量更新时需要进行改写。

image


使用FULL OUTER Join进行增量更新: 如下图,分别是snapshot表和delta表,Join keys 是s.key和d.key,但在向新的partition插入表达式时无法判断新的SQL表达式是否满足数据的排序,所以还需要对数据进行再一次的Shuffle。下图中对SQL表达式进行了ANTI JOIN和 UNION ALL的改写,ANTI JOIN可以利用排序的属性,同时UNION ALL也是按照原来的key的分布和排序,如此就可以完全做到Shuffle Remove。

image


Clustering Table分区建议:创建Clustering Table时需要考虑分区的大小,太小的分区本身优化空间就不大反而可能引入小文件问题。假设设置1000个bucket就会生成1000个小文件,而这些小文件会对Mappers造成很大的压力。另外,分区读写比越高的表 cluster后可能得到的收益越大。由于创建Clustering Table耗时较多,那么读的频率较多就会有较大的优势。最后,字段利用率越高(列裁剪较少)的表,cluster后可能得到的收益越大。如果列裁剪之后使用到数据利用率较低,这表明浪费了较多的时间,所以cluster后的收益也不会很大。


欢迎加入“MaxCompute开发者社区2群”,点击链接申请加入或扫描二维码
https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
image

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1天前
|
机器学习/深度学习 运维 算法
大数据基础工程技术团队4篇论文入选ICLR,ICDE,WWW
近日,由阿里云计算平台大数据基础工程技术团队主导的四篇时间序列相关论文分别被国际顶会ICLR2024、ICDE2024和WWW2024接收。
|
1天前
|
存储 机器学习/深度学习 数据采集
大数据处理与分析实战:技术深度剖析与案例分享
【5月更文挑战第2天】本文探讨了大数据处理与分析的关键环节,包括数据采集、预处理、存储、分析和可视化,并介绍了Hadoop、Spark和机器学习等核心技术。通过电商推荐系统和智慧城市交通管理的实战案例,展示了大数据在提高用户体验和解决实际问题上的效能。随着技术进步,大数据处理与分析将在更多领域发挥作用,推动社会进步。
|
1天前
|
分布式计算 DataWorks 数据管理
DataWorks操作报错合集之DataWorks中udf开发完后,本地和在MaxCompute的工作区可以执行函数查询,但是在datawork里报错FAILED: ODPS-0130071:[2,5],是什么原因
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
28 0
|
1天前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之在DataWorks中使用ODPS SQL时遇到"该文件对应引擎实例已失效,请重新选择可用的引擎实例"的错误提示”,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
36 0
|
1天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在 DataWorks 中,使用Oracle作为数据源进行数据映射和查询,如何更改数据源为MaxCompute或其他类型
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
30 1
|
1天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之在DataWorks中,使用JSON解析函数将MySQL表中的字段解析成多个字段将这些字段写入到ODPS(MaxCompute)中如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
29 3
|
1天前
|
分布式计算 大数据 调度
MaxCompute产品使用合集之大数据计算MaxCompute底层加速查询的原理是什么
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1天前
|
分布式计算 大数据 BI
MaxCompute产品使用合集之MaxCompute项目的数据是否可以被接入到阿里云的Quick BI中
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1天前
|
SQL 分布式计算 大数据
MaxCompute产品使用合集之怎样可以将大数据计算MaxCompute表的数据可以导出为本地文件
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1天前
|
分布式计算 DataWorks 关系型数据库
MaxCompute产品使用合集之可以使用什么方法将MySQL的数据实时同步到MaxCompute
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。

热门文章

最新文章

相关产品

  • 云原生大数据计算服务 MaxCompute
  • http://www.vxiaotou.com