做瞹瞹瞹免费网站,网站的线下推广怎么做的,网站怎么做推广和宣传,php网站开发 招聘Apache Spark是一个优秀的计算引擎#xff0c;广泛应用于数据工程、机器学习等领域。向量化执行技术在不升级硬件的情况下#xff0c;既可获得资源节省#xff0c;又能加速作业执行。GlutenVelox解决方案为Spark换上了向量化执行引擎#xff0c;本文将阐述美团在这一方向的…Apache Spark是一个优秀的计算引擎广泛应用于数据工程、机器学习等领域。向量化执行技术在不升级硬件的情况下既可获得资源节省又能加速作业执行。GlutenVelox解决方案为Spark换上了向量化执行引擎本文将阐述美团在这一方向的实践和思考。1 什么是向量化计算1.1 并行数据处理SIMD指令1.2 向量化执行框架数据局部性与运行时开销1.3 如何使用向量化计算2 为什么要做Spark向量化计算3 Spark向量化计算如何在美团实施落地3.1 整体建设思路3.2 SparkGlutenVelox计算流程3.3 阶段划分4 美团Spark向量化计算遇到的挑战4.1 稳定性问题4.2 支持ORC并优化读写性能4.3 Native HDFS客户端优化4.4 Shuffle重构4.5 适配HBO4.6 一致性问题5 上线效果6 未来规划6.1 Spark向量化之后对开源社区的跟进策略6.2 提升向量化覆盖率的策略有需要的小伙伴可以点击文章最下方的微信名片添加免费领取【保证100%免费】1 什么是向量化计算| 1.1 并行数据处理SIMD指令让我们从一个简单问题开始假设要实现“数组ab存入c”设三个整型数组的长度都是100那么只需将“c[i] a[i] b[i]”置于一个100次的循环内代码如下void addArrays(const int* a, const int* b, int* c, int num) { for (int i 0; i num; i) { c[i] a[i] b[i]; } }我们知道计算在CPU内完成逻辑计算单元操作寄存器中的数据算术运算的源操作数要先放置到CPU的寄存器中哪怕简单的内存拷贝也需要过CPU寄存器。所以完成“c[i] a[i] b[i]”需经三步加载Load从内存加载2个源操作数a[i]和b[i]到2个寄存器。计算Compute执行加法指令作用于2个寄存器里的源操作数副本结果产生到目标寄存器。存储Store将目标寄存器的数据存入拷贝到目标内存位置c[i]。其中加载和存储对应访存指令Memory Instruction计算是算术加指令循环执行100次上述三步骤就完成了“数组a 数组b 数组c”。该流程即对应传统的计算架构单指令单数据SISD顺序架构任意时间点只有一条指令作用于一条数据流。如果有更宽的寄存器超机器字长比如256位16字节一次性从源内存同时加载更多的数据到寄存器一条指令作用于寄存器x和y在x和y的每个分量比如32位4字节上并行进行加并将结果存入寄存器z的各对应分量最后一次性将寄存器z里的内容存入目标内存那么就能实现单指令并行处理数据的效果这就是单指令多数据SIMD。图1向量化计算“数组ab存入c”单指令多数据对应一类并行架构现代CPU一般都支持SIMD执行单条指令同时作用于多条数据流可成倍的提升单核计算能力。SIMD非常适合计算密集型任务它能加速的根本原因是“从一次一个跨越到一次一组从而实现用更少的指令数完成同样的计算任务。”1996年Intel推出的X86 MMXMultiMedia eXtension指令集扩展可视为SIMD的起点随后演进出SSE1999年SSE2/3/4/5、AVX2008/AVX22013、AVX5122017等扩展指令集。在linux系统中可以通过lscpu或cpuid命令查询CPU对向量化指令的支持情况。| 1.2 向量化执行框架数据局部性与运行时开销执行引擎常规按行处理的方式存在以下三个问题CPU Cache命中率差。一行的多列字段数据的内存紧挨在一起哪怕只对其中的一个字段做操作其他字段所占的内存也需要加载进来这会抢占稀缺的Cache资源。Cache命失会导致被请求的数据从内存加载进Cache等待内存操作完成会导致CPU执行指令暂停Memory Stall这会增加延时还可能浪费内存带宽。变长字段影响计算效率。假设一行包括int、string、int三列其中int类型是固定长度而string是变长的一般表示为int len bytes content变长列的存在会导致无法通过行号算offset做快速定位。虚函数调用带来额外开销。对一行的多列进行处理通常会封装在一个循环里会抽象出一个类似handle的接口C虚函数用于处理某类型数据各字段类型会override该handle接口。虚函数的调用多一步查表且无法被内联循环内高频调用虚函数的性能影响不可忽视。图2row by row VS blcok by block因此要让向量化计算发挥威力只使用SIMD指令还不够还需要对执行框架层面进行改造变Row By Row为Block By Block数据按列组织替代按行组织在Clickhouse和Doris里叫BlockVelox里叫Vector这将提高数据局部性。参与计算的列的多行数据会内存紧凑的保存在一起CPU可以通过预取指令将接下来要处理的数据加载进Cache从而减少Memory Stall。不参与计算的列的数据不会与被处理的列竞争Cache这种内存交互的隔离能提高Cache亲和性。同一列数据在循环里被施加相同的计算批量迭代将减少函数调用次数通过模版能减少虚函数调用降低运行时开销。针对固定长度类型的列很容易被并行处理通过行号offset到数据这样的执行框架也有利于让编译器做自动向量化代码生成显著减少分支减轻预测失败的惩罚。结合模板编译器会为每个实参生成特定实例化代码避免运行时查找虚函数表并且由于编译器知道了具体的类型信息可以对模板函数进行内联展开。图3向量化执行框架示例| 1.3 如何使用向量化计算自动向量化Auto-Vectorization。当循环内没有复杂的条件分支没有数据依赖只调用简单内联函数时通过编译选项如gcc -ftree-vectorize、-O3编译器可以将顺序执行代码翻译成向量化执行代码。可以通过观察编译hint输出和反汇编确定是否生产了向量化代码。编译hint输出编译g test.cpp -g -O3 -marchnative -fopt-info-vec-optimized执行后有类似输出“test.cpp:35:21: note: loop vectorized”。反汇编gdb test (gdb) disassemble /m function_name看到一些v打头的指令例如vmovups、vpaddd、vmovups等。使用封装好的函数库如Intel Intrinsic function、xsimd等。这些软件包中的内置函数实现都使用了SIMD指令进行优化相当于high level地使用了向量化指令的汇编详见https://www.intel.com/content/www/us/en/docs/intrinsics-guide/index.html。通过asm内嵌向量化汇编但汇编指令跟CPU架构相关可移植性差。编译器暗示使用编译指示符Compiler Directive如CilkMIT开发的用于并行编程的中间层编程语言和库它扩展了C语言里的#pragma simd和OpenMP里的#pragma omp simd。Compiler Hint。通过__restrict去修饰指针参数告诉编译器多个指针指向不相同不重叠的内存让编译器放心大胆的去优化。如果循环内有复杂的逻辑或条件分支那么将难以向量化处理。以下是一个向量化版本数组相加的例子使用Intel Intrinsic function#include immintrin.h // 包含Intrinsic avx版本函数的头文件 void addArraysAVX(const int* a, const int* b, int* c, int num) { assert(num % 8 0); // 循环遍历数组步长为8因为每个__m256i可以存储8个32位整数 for (int i 0; i num; i 8) { __m256i v_a _mm256_load_si256((__m256i*)a[i]); // 加载数组a的下一个8个整数到向量寄存器 __m256i v_b _mm256_load_si256((__m256i*)b[i]); // 加载数组b的下一个8个整数到向量寄存器 __m256i v_c _mm256_add_epi32(v_a, v_b); // 将两个向量相加结果存放在向量寄存器 _mm256_store_si256((__m256i*)c[i], v_c); // 将结果向量存储到数组c的内存 } } int main(int argc, char* argv[]) { const int ARRAY_SIZE 64 * 1024; int a[ARRAY_SIZE] __attribute__((aligned(32))); // 按32字节对齐满足某些向量化指令的内存对齐要求 int b[ARRAY_SIZE] __attribute__((aligned(32))); int c[ARRAY_SIZE] __attribute__((aligned(32))); srand(time(0)); for (int i 0; i ARRAY_SIZE; i) { a[i] rand(); b[i] rand(); c[i] 0; // 对数组a和b赋随机数初始值 } auto start std::chrono::high_resolution_clock::now(); addArraysAVX(a, b, c, ARRAY_SIZE); auto end std::chrono::high_resolution_clock::now(); std::cout addArraysAVX took std::chrono::duration_caststd::chrono::microseconds(end - start).count() microseconds. std::endl; return 0; }addArraysAVX函数中的_mm256_load_si256、_mm256_add_epi32、_mm256_store_si256都是Intrinsic库函数内置函数命名方式是操作浮点数_mm(xxx)_name_PT操作整型_mm(xxx)_name_epUY其中xxx代表数据的位数xxx为SIMD寄存器的位数若为128位则省略AVX提供的__m256为256位name为函数的名字表示功能浮点内置函数的后缀是PT其中P代表的是对矢量Packed Data Vector还是对标量scalar进行操作T代表浮点数的类型若为s则为单精度浮点型若为d则为双精度浮点整型内置函数的后缀是epUYU表示整数的类型若为无符号类型则为u否在为i而Y为操作的数据类型的位数。编译g test.cpp -O0 -stdc11 -mavx2 -o test。选项-O0用于禁用优化因为开启优化后有可能自动向量化-mavx2用于启用AVX2指令集。测试发现非向量化版本addArrays耗时170微秒而使用Intrinsic函数的向量化版本addArraysAVX耗时58微秒耗时降为原来的1/3。2 为什么要做Spark向量化计算从业界发展情况来看近几年OLAP引擎发展迅速该场景追求极致的查询速度向量化技术在Clickhouse、Doris等Native引擎中得到广泛使用降本增效的趋势也逐渐扩展到数仓生产。2022年6月DataBricks发表论文《Photon- A Fast Query Engine for Lakehouse Systems》Photon是DataBricks Runtime中C实现的向量化执行引擎相比DBR性能平均提升4倍并已应用在Databricks商业版上但没有开源。2021年Meta开源Velox一个C实现的向量化执行库。2022 Databricks Data AI Summit 上Intel 与Kyligence介绍了合作开源项目Gluten旨在为Spark SQL提供Native Vectorized Execution。GlutenVelox的组合使Java栈的Spark也可以像Doris、Clickhouse等Native引擎一样发挥向量化执行的性能优势。从美团内部来看数仓生产有数万规模计算节点很多业务决策依赖数据及时产出若应用向量化执行技术在不升级硬件的情况下既可获得可观的资源节省也能加速作业执行让业务更快看到数据和做出决策。根据Photon和Gluten的公开数据应用向量化Spark执行效率至少可以提升一倍我们在物理机上基于TPC-H测试GlutenVelox相比Spark 3.0也有1.7倍性能提升。图4GlutenVelox在TPC-H上的加速比来自Gluten3 Spark向量化计算如何在美团实施落地| 3.1 整体建设思路更关注资源节省而不单追求执行加速。Spark在美团主要场景是离线数仓生产与OLAP场景不同时间相对不敏感但资源内存为主基数大成本敏感。离线计算历史已久为充分利用存量服务器我们不能依赖硬件加速的手段如更多的内存、SSD、高性能网卡。我们评估收益的核心指标是总「memory*second」降低。基于C/Rust等Native语言而非Java进行开发。Java语言也在向量化执行方面做尝试但JVM语言对底层控制力弱如无法直接内嵌SIMD汇编再加上GC等固有缺陷还远远谈不上成熟而系统向的语言C/C、Rust则成为挖掘CPU向量化执行潜能的首选。可插拔、面向多引擎而非绑定Spark。虽然面向不同工作负载的各类大数据引擎层出不穷但其架构分层则相似一般包括编程接口、执行计划、调度、物理执行、容错等尤其执行算子部分有较多可复用实现。如Meta内部主要大数据引擎有Presto和Spark建设一个跨引擎的执行库优化同时支持Presto和Spark显然是更好的选择OLAP引擎向量化计算本身就是标配流计算引擎出于性能考虑也可以攒批而非一条条处理数据mini batch因此向量化执行也有性能提升空间。我们认为面向不同场景设计的大数据引擎有可能共用同一个高性能向量化执行库。使用开源方案而非完全自研。Spark有几百个function和operator向量化改造的工作量巨大从性能、完成度、适配成本、是否支持多引擎、社区的活跃度等方面综合考虑我们最终选择了GlutenVelox的方案。迁移过程对用户透明保证数据一致。Spark的几百个function和operator都要通过C重新实现同时还涉及Spark、Gluten、Velox版本变化很容易实现出现偏差导致计算结果不一致的情况。我们开发了一个用于升级验证的黑盒测试工具ETL Blackbox Test可以将一个作业运行在不同版本的执行引擎上进行端到端验证包括执行时间、内存及CPU资源使用情况、作业数据的对比结果通过对比两次执行的行数以及每一列所有数据md5的加和值来确定数据是否一致。| 3.2 SparkGlutenVelox计算流程通过Spark的plugin功能Gluten将Spark和向量化执行引擎Native backend如Velox连接起来分为Driver plugin和Executor Plugin。在Driver端SparkContext初始化时Gluten的一系列规则如ColumnarOverrideRules通过Spark Extensions注入这些规则会对Spark的执行计划进行校验并把Gluten支持的算子转换成向量化算子如FileScan会转换成NativeFileScan不能转换的算子上报Fallback的原因并在回退的部分嵌入Column2Row、Row2Column算子生成Substrait执行计划。在Executor端接收到Driver侧的LaunchTask RPC消息传输的Substrait执行计划后再转换成Native backend的执行计划最终通过JNI调用Native backend执行。Gluten希望能尽可能多的复用原有的Spark逻辑只是把计算部分转到性能更高的向量化算子上如作业提交、SQL解析、执行计划的生成及优化、资源申请、任务调度等行为都还由Spark控制。| 3.3 阶段划分在我们开始Spark向量化项目时开源版本的Gluten和Velox还没有在业界Spark生产环境大规模实践过为了降低风险最小代价验证可行性我们把落地过程分为以下五个阶段逐步进行软硬件适配情况确认。Velox要求CPU支持bmi、bmi2、f16c、avx、avx2、sse指令集需要先确定服务器是否支持在生产环境运行TPC-DS或者TPC-H测试验证理论收益公司内部版本适配编译运行跑通典型任务。当时Gluten只支持Spark3.2和Spark3.3考虑到Spark版本升级成本更高我们暂时将相关patch反打到Spark3.0上。这个阶段我们解决了大量编译失败问题建议用社区推荐的OS在容器中编译运行如果要在物理机上运行需要把相关依赖部署到各个节点或者使用静态链接的方式开启vcpkg。cat /proc/cpuinfo | grep --color -wE bmi|bmi2|f16c|avx|avx2|sse稳定性验证。确定测试集完善稳定运行需要的feature以达到比较高的测试通过率包括支持ORC、Remote shuffle、HDFS适配、堆内堆外的内存配置等。本阶段将测试通过率从不足30%提升到90%左右。性能收益验证。由于向量化版本和原生Spark分别使用堆外内存和堆内内存引入翻倍内存的配置以及一些高性能feature支持不完善一开始生产环境测试性能结果不及预期。我们逐个分析解决问题包括参数对齐、去掉arrow中间数据转换、shuffle batch fetch、Native HDFS客户端优化、使用jemelloc、算子优化、内存配置优化、HBO适配等。本阶段将平均资源节省从-70%提升到40%以上。一致性验证。主要是问题修复对所有非SLA作业进行大规模测试筛选出稳定运行、数据完全一致、有正收益的作业。灰度上线。将向量化执行环境发布到所有服务器对符合条件的作业分批上线增加监控报表收集收益对性能不及预期、发生数据不一致的作业及时回退原生Spark上。此过程用户无感知。整个实施过程中我们通过收益转化漏斗找到收益最大的优化点指导项目迭代。下图为2023年某一时期的相邻转化情况。图6Spark向量化项目收益转化漏斗图4 美团Spark向量化计算遇到的挑战| 4.1 稳定性问题聚合时Shuffle阶段OOM。在Spark中Aggregation一般包括Partial Aggregation、Shuffle、Final Aggregation三个阶段Partial Aggregation在Mapper端预聚合以降低Shuffle数据量加速聚合过程、避免数据倾斜。Aggregation需要维护中间状态如果Partial Aggregation占用的内存超过一定阈值就会提前触发Flush同时后续输入数据跳过此阶段直接进入ShuffleWrite流程。Gluten使用Velox默认配置的Flush内存阈值Spark堆外内存*75%由于Velox里Spill功能还不够完善Partial Aggregation不支持Spill这样大作业场景后续ShuffleWrite流程可能没有足够的内存可以使用(可用内存25%*Spark堆外内存)会引起作业OOM。我们采用的策略是通过在Gluten侧调低Velox Partial Aggregation的Flush阈值来降低Partial Aggregation阶段的内存占用避免大作业OOM。这个方案在可以让大作业运行通过但是理论上提前触发Partial Aggergation Flush会降低Partial Aggretation的效果。更合理的方案是Partial Aggregation支持Spill功能Gluten和Velox社区也一直在完善对向量化算子Spill功能的支持。SIMD指令crash。Velox对数据复制做了优化如果该类型对象是128bit比如LongDecimal类型会通过SIMD指令用于数据复制以提升性能。如下图所示Velox库的FlatVector::copyValuesAndNulls()函数里的一行赋值会调用T::operator()调用的movaps指令必须作用于16B对齐的地址不满足对齐要求会crash。我们在测试中复现了crash通过日志确定有未按16B对齐的地址出现。无论是Velox内存池还是Gluten内存池分配内存都强制做了16B对齐最终确认是Arrow内存池分配出的地址没对齐Gluten用了三方库Arrow。这个问题可以通过为LongDecimal类型重载operator操作符修复但这样做可能影响性能也不彻底因为不能排除还有其他128bit类型对象存在。最终我们与Gluten社区修改了Arrow内存分配策略强制16B对齐。图7Crash代码示例| 4.2 支持ORC并优化读写性能Velox的DWIO模块原生只支持DWRF和Parquet两种数据格式美团大部分表都是基于ORC格式进行存储的。DWRF文件格式是Meta内部所采用的ORC分支版本其文件结构与ORC相似比如针对ORC文件的不同区域可通过复用DWRF的Reader来完成相关数据内容的读取。图8Dwrf文件格式DwrfReader用于读取文件层面的元数据信息包括PostScript、Footer和Header。DwrfRowReader用来读取StripeFooter以便确定每个column的Stream位置。FormatData用来读取StripeIndex从而确定每个RowGroup的位置区间。ColumnReader用来读取StripeData完成不同column的数据抽取。我们完善了Velox ORC格式的支持并对读取链路做了优化主要工作包括支持RLEv2解码Velox-5443并在解码过程中完成Filter下推Velox-6647。我们将Apache RLEv2解码逻辑移植到了Velox通过BMI2指令集来加速varint解码过程中的位运算并在解码过程中下推过滤不必要的数据。支持Decimal数据类型Velox-5837以及该数据类型的Filter下推处理Velox-6240。ORC文件句柄复用以降低HDFS的NN处理压力Velox-6140。出于线程安全层面的考虑HdfsReadFile每次pread都会开启一个新文件句柄来做seekread客户端会向NameNode发送大量open请求加重HDFS的压力。我们通过将文件的读取句柄在内部做复用处理thread_local模式减少向NN发送的open请求。使用ISA-L加速ORC文件解压缩。我们对ORC文件读取耗时trace分析得出zlib解压缩占总耗时60%解码占30%IO和其他仅占10%解压效率对ORC文件读取性能很关键。为此我们对ZlibDecompressor做了重构引入Intel的解压缩向量化库ISA-L来加速解压缩过程。基于这些优化改造后的Velox版ORC Reader读取时间减少一半性能提升一倍。图9Apache ORC与改造后的Velox ORC读取性能对比上为Apache ORC| 4.3 Native HDFS客户端优化首先介绍一下HDFS C客户端对ORC文件读取某一列数据的过程。第一步读取文件的最后一个字节来确定PostScript长度读取PostScript内容第二步通过PostScript确定Footer的存储位置读取Footer内容第三步通过Footer确定每个Stripe的元数据信息读取StripeFooter第四步通过StripeFooter确定每个Column的Stream位置读取需要的Stream数据。图10ORC文件读取过程在生产环境测试中我们定位到两个数据读取相关的性能问题小数据量随机读放大。客户端想要读取[offset ~ readEnd]区间内的数据发送给DN的实际读取区间却是[offset ~ endOfCurBlock]导致[readEnd ~ endOfCurBlock]这部分数据做了无效读取。这样设计主要是为了优化顺序读场景通过预读来加快后续访问然而针对随机读场景小数据量下比较普遍该方式却适得其反因为预读出的数据很难被后续使用增加了读放大行为。我们优化为客户端只向DN传递需要读取的数据区间DN侧不提前预取只返回客户端需要的数据。图11读放大过程示意图DN慢节点导致作业运行时间变长。我们发现很多大作业的HDFS长尾耗时非常高HDFS的平均read时延只有10ms左右P99.99时延却达到了6秒耗时最长的请求甚至达到了5分钟但在不启用EC场景下HDFS的每个block会有三副本完全可以切换到空闲DN访问。为此我们对客户端的读请求链路做了重新的设计与调整实时监测每个DN的负载情况基于P99.9分位请求时延判定慢节点并将读请求路由到负载较低的DN上面。HDFS Native客户端读优化之后平均读写延迟降低了2/3吞吐提升2倍。| 4.4 Shuffle重构Gluten在shuffle策略的支持上没有预留好接口每新增一种shuffle模式需要较大改动。美团有自研的Shuffle Service其他公司也可能有自己的Shuffle Service如Celeborn为了更好适配多种shuffle模式我们提议对shuffle接口重新梳理并主导了此讨论和设计。Gluten中的shuffle抽象第一层是数据格式Velox是RowVectorGluten引入的Arrow是RecordBatch第二层是分区方式RoundRobin、SinglePart、Hash、Range如果要支持新shuffle模式local、remote需要实现2*4*216个writer将会有大量冗余代码。分区具体实现应该与数据格式和shuffle模式无关我们用组合模式替代继承模式。另外我们在shuffle中直接支持了RowVector避免Velox和Arrow对应数据类型之间的额外转换开销。| 4.5 适配HBOHBOHistorical Based Optimization是通过作业历史运行过程中资源的实际使用量来预测作业下一次运行需要的资源并设置资源相关参数的一种优化手段。美团过去在原生Spark上通过调配堆内内存取得了8%左右的内存资源节省。Gluten主要使用堆外内存off-heap这与原生Spark主要使用堆内内存on-heap不同。初期出于稳定性考虑Gluten和原生Spark的运行参数整体一致总内存大小相同Gluten off-heap 占比75% on-heap占比25%。这样配置既会导致内存利用率不高原生Spark的内存使用率58%向量化版作业内存使用率 38%也会使一部分作业on-heap内存配置偏低在算子回退时导致任务OOM。我们把HBO策略推广到堆外内存向量化计算的内存节省比例从30%提升到40%由于heap内存配置不合理的OOM问题全部消除。图13HBO流程图| 4.6 一致性问题**1. 低版本ORC数据丢失。**hive-0.13之前使用的ORCFooter信息不包含列名只有ID用来表示第几列如Col1, Col2…。Velox TableScan算子在扫表的时候如果下推的Filter里包含IsNotNull(A)会根据列名A查找该列数据由于无法匹配到列名会误判空文件导致数据缺失。Spark在生成读ORC表的执行计划时通过访问HiveMetaStore得到表的Schema信息并在物理算子FileSourceScanExec中保存了表的Schema信息。Gluten对该算子进行doTransform()转换时会把表的Schema信息序列化到Substrait的ReadRel里。接下来的Substrait计划转Velox计划阶段我们把表的Schema信息传给Velox的HiveTableHandle在构造Velox的DwrfReader时修正ORC文件Footer里的Schema信息如果Footer的Schema不包含列名就读取表Schema里的对应列的名称进行赋值解决了这个问题。**2. count distinct结果错误。**比如这样一条SQLselect A, B, count(distinct userId), sum(amt) from t group by 1,2Gluten会把count(distinct userId) 变为count(userId)通过把userId加到GroupingKey里来实现distinct语义。具体处理过程如下表1示例SQL在Spark中的处理步骤在第3步的Intermediate Aggregation中为了节省内存和加速执行当Velox的HashAggregate算子满足触发Flush的条件时HashTable内存占用超过阈值或者聚合效果低于阈值Velox会标记 partialFulltrue触发Flush操作计算HashTable里已经缓存数据的Intermediate Result并且后续输入的数据不再执行Intermediate Aggregation的计算直接进入第4步的Partial Aggregation。如果后续输入的数据里包含重复的userIdcount(userId)会因为去重不彻底而结果错误。我们短期的修复方案是禁用Intermediate Aggregation的提前Flush功能直到所有数据都输入之后再获取该阶段的聚合结果。这个方案的弊端有两个1HashTable的内存占用会变大需要同时开启HashAggregate算子的Spill功能避免OOM2直接修改了Velox的HashAggregate算子内部代码从Velox自身的角度来看没有单独针对Distinct相关的聚合做处理随着后续迭代可能影响所有用到Intermediate Aggregation的聚合过程。鉴于此Gluten社区提供了一个更加均衡的解决方案针对这类Distinct Aggregation生成执行计划时Spark的Partial Merge Aggregation不再生成Intermediate Aggregation改为生成Final Aggregation不会提前flush、不使用merge_sum同时配合聚合函数的Partial Companion函数来返回Intermediate结果。这样就从执行计划转换策略层面规避这个问题避免对Velox里Final Aggregation内部代码做过多的改动。**3. 浮点类型转换精度错误。**形如查询SELECT concat(col2, cast(max(col1) as string)) FROM (VALUES (CAST(5.08 AS FLOAT), abc_)) AS tab(col1, col2) group by col2;在Spark中返回abc_5.08在Gluten中返回abc_5.079999923706055。浮点数5.08不能用二进制分数精确表达近似表示成5.0799999237060546875。Velox通过函数folly::tostd::string(T val)来实现float类型到string类型的转换这个函数底层依赖开源库google::double-conversion, folly里默认设置了输出宽度参数DoubleToStringConverter::SHORTEST可以准确表示double类型的最小宽度转换时经过四舍五入之后返回 5.079999923706055。我们把宽度参数改为DoubleToStringConverter::SHORTEST_SINGLE可以准确表示float类型的最小宽度转换时经过四舍五入之后返回 5.08。5 上线效果我们已上线了2万多ETL作业平均内存资源节省40%平均执行时间减少13%证明GlutenVelox的向量化计算方案生产可行。向量化计算除了能提高计算效率也能提高读写数据的效率如某个作业的Input数据有30TB过去需要执行7小时绝大部份时间花在了读数据和解压缩上面。使用向量化引擎后因为上文提到的ISA-L解压缩优化列转行的开销节省以及HDFS Native客户端优化执行时间减少到2小时内。图14上线优化效果6 未来规划我们已上线向量化计算的Spark任务只是小部分计划2024年能让绝大部分的SQL任务运行在向量化引擎上。| 6.1 Spark向量化之后对开源社区的跟进策略Spark、Gluten、Velox三个社区各有自己考虑和版本发布节奏从一个社区到多个社区的引擎维护复杂度上升。我们的应对有二一是计算引擎有不同层次Spark升级主要考虑功能语义实现、执行计划、资源和task调度Gluten和Velox的升级主要考虑物理算子性能优化各取所长二是尽量减少和社区的差异公司内部适配只在Spark中实现公司内的UDF以git submodule形式单独维护。升级到Spark3.5。Gluten最低支持的Spark版本为3.223年我们为了降低验证成本选择在Spark3.0兼容Gluten但继续升级迭代成本比较高在推广之前应该升级到更新的Spark版本。Spark3.5将会是Gluten社区对Spark3.x上长期支持的稳定版本。高版本Spark也有一些额外收益我们基于TPC-H实测Spark3.5相比Spark3.0「memory*second」减少40%执行时间减少17%根据之前升级经验生产环境大约能达到一半效果。保持Spark版本长期稳定。高版本Spark对Hadoop版本的升级迭代带来比较高适配成本内部迭代的feature也有比较高的迁移成本因此我们平均3年才会升级一次Spark版本更多是将需要的feature合并到内部分支。快速跟进Gluten/Velox新版本。升级到Spark3.5之后我们内部Spark版本与Gluten社区的兼容性成本很低并且向量化相关feature还会持续迭代预计每半年可升级一次线上版本。| 6.2 提升向量化覆盖率的策略扩大向量化算子和UDF范围。我们整理了影响权重最高的几十个算子回退问题与Gluten社区一起解决对于大量内部UDF则会探索用大模型来将UDF批量改写为C版本的向量化实现。扩大File format支持向量化范围。美团内部有约20%的表为textfile格式还有接近10%的表使用内部开发的format只能按行读取也不支持下推加上行转列都会有额外性能开销影响最终效果。我们将会把textfile全部转为ORC为自研format提供C客户端进一步提升向量化计算性能。读者福利如果大家对大模型感兴趣这套大模型学习资料一定对你有用对于0基础小白入门如果你是零基础小白想快速入门大模型是可以考虑的。一方面是学习时间相对较短学习内容更全面更集中。二方面是可以根据这些资料规划好学习计划和方向。作为一名老互联网人看着AI越来越火也总想为大家做点啥。干脆把我这几年整理的AI大模型干货全拿出来了。包括入门指南、学习路径图、精选书籍、视频课还有我录的一些实战讲解。全部免费不搞虚的。学习从来都是自己的事我能做的就是帮你把路铺平一点。资料都放在下面了有需要的直接拿能用到多少就看你自己了。这份完整版的大模型 AI 学习资料已经上传CSDN朋友们如果需要可以点击文章最下方的VX名片免费领取【保真100%】