導(dǎo)讀 本文將分享 Apache Kyuubi 在愛奇藝的一些實(shí)踐和落地。
文章將圍繞下面三點(diǎn)展開:
(資料圖片)
1. 愛奇藝 Spark Thrift Server 服務(wù)演進(jìn)
2. Spark SQL 平臺化適配
3. Spark SQL 服務(wù)優(yōu)化
分享嘉賓|王震 愛奇藝 高級研發(fā)工程師
編輯整理|馬慧 匯豐達(dá)
出品社區(qū)|DataFun
01
愛奇藝 Spark Thrift Server 服務(wù)演進(jìn)
1. 愛奇藝統(tǒng)一 SQL 網(wǎng)關(guān) Pilot 服務(wù)架構(gòu)
首先分享愛奇藝的統(tǒng)一 SQL 網(wǎng)關(guān) Pilot 服務(wù),Pilot 作為統(tǒng)一的 SQL 服務(wù),上層是通過 JDBC 的協(xié)議對接了愛奇藝的數(shù)據(jù)開發(fā)平臺、即席查詢的平臺以及業(yè)務(wù)的取數(shù)服務(wù)。服務(wù)端通過 SQL 解析的模塊對 SQL 進(jìn)行分析,對異常 SQL 進(jìn)行攔截,并且智能的選擇合適的執(zhí)行策略和路由策略。結(jié)合服務(wù)發(fā)現(xiàn)提供高可用。在配置中心可以配置標(biāo)簽化的配置,并基于運(yùn)行歷史提供統(tǒng)計和審計視圖。底層支持了 Trino、Spark、Hive 還有 Clickhouse 等引擎。其中我們使用 Kyuubi 來作為 Spark Thrift Server 服務(wù)。
2. 愛奇藝 Spark Thrift Server 服務(wù)演進(jìn)
在愛奇藝,Spark Thrift Server 服務(wù)經(jīng)歷了三個階段:
(1) 最開始 我們使用的是 Spark 原生的 Thrift Server 服務(wù),支持簡單的 Ad-hoc 查詢。
(2) 后來 我們引入了 Kyuubi 的版本,支持 Ad-hoc 查詢以及少量的 ETL 任務(wù)。
(3)Kyuubi 在 的版本中做了一些架構(gòu)優(yōu)化,我們是在 Apache Kyuubi 進(jìn)入到 Apache 之后,也升級到了 的版本,支持了 Ad-hoc 查詢,慢慢有大量的 ETL 任務(wù)。并且使用 Spark SQL 代替了 Hive,成為主要的離線處理引擎,同時也支持了離線數(shù)據(jù)湖的分析操作。
下面通過對比這 3 個服務(wù),分享一下 Spark Thrift Server 服務(wù)的演進(jìn)過程。
第一階段:Spark 原生的 Thrift Server 服務(wù)的架構(gòu)
這是 Spark 原生的 Thrift Server 服務(wù)的架構(gòu),通過 啟動常駐的 Spark 服務(wù) ,在 Driver 端去暴露 Thrift Server 的服務(wù)接口來接收并執(zhí)行 SQL 請求,是 基于一個 SparkContext 的多線程的應(yīng)用場景, 它的服務(wù)特性: 基于 HiveServer2 協(xié)議,可以很好地兼容 Hive 相關(guān)的生態(tài)。因?yàn)槭且粋€常駐的服務(wù),可以有效減少 Spark 啟動的開銷, 可以快速響應(yīng)請求。
不過由于使用的是同一個 SparkContext,所以 不支持多租戶,資源不隔離,多個任務(wù)之間會相互影響,搶占資源之類的。 并且服務(wù)啟動后配置是固定的,在 啟動后資源的配置都不可以修改。
第二階段:引入 Kyuubi 的 的版本
后面我們引入了 Kyuubi 的 的版本,對 Thrift Server 進(jìn)行了增強(qiáng), 允許不同的連接使用不同的 SparkContext ,同時也支持 同一個用戶級別的 SparkContext 共享。
也是 基于 HiveServer2 協(xié)議的,支持了多租戶。 同時在服務(wù)啟動后可以 動態(tài)傳入一些資源配置, 調(diào)整 SparkContext 的資源,并且 不同的鏈接可以使用不同 SparkContext 進(jìn)行 Executor 層面的資源隔離。
不過由于它是 Yarn-Client 模式運(yùn)行 的,所有 SparkContext 其實(shí)是共享在一個 Spark Driver 的進(jìn)程之內(nèi)的, 所以對 Driver 端的壓力還是挺大的,也容易達(dá)到性能的瓶頸。
第三階段:升級 Kyuubi 的版本
Kyuubi 的版本對架構(gòu)進(jìn)行了優(yōu)化,主要是 對 Server 端和引擎端進(jìn)行了解耦操作。
Server 端主要是做引擎的啟動以及 SQL 轉(zhuǎn)發(fā),同時支持引擎的共享策略, 比如支持用戶級別共享,允許同一個用戶的不同連接使用同一個 Kyuubi Engine,從而實(shí)現(xiàn)上下文共享,并且節(jié)省資源。
服務(wù)特點(diǎn): 低耦合,Kyuubi 和 Engine 進(jìn)行解耦 不同的任務(wù)使用不同的引擎,這樣就可以達(dá)到完全的資源隔離。
同時支持 各種共享策略:User 級別、Group 級別、還有 Server 級別的。使用共享引擎主要能夠節(jié)省資源,避免反復(fù)啟停引擎,可以快速響應(yīng)請求。
三個服務(wù)的一個簡單的對比
可以看到 Kyuubi 版本中支持多租戶、有很好的并發(fā)支持。
資源隔離性方面: Spark Thrift Server 主要依賴于 Spark 原生的 Pools。Kyuubi 共用同一個 Driver 維護(hù)多個 SparkContext。Kyuubi 支持獨(dú)立和復(fù)用引擎。
資源的使用方面: Spark Thrift Server 是常駐的 SparkContext 資源。Kyuubi 版本是可以使用獨(dú)立的 SparkContext,也可以復(fù)用。Kyuubi 版本也可以支持獨(dú)立的引擎以及各種共享引擎,它的 獨(dú)立引擎是在執(zhí)行完成之后釋放資源的,共享引擎是在超時之后釋放。
02
Spark SQL 平臺化適配
接下來分享 Spark SQL 平臺化適配,將從四個方面進(jìn)行介紹:
① 標(biāo)簽化配置;
② SQL 事件審計;
③ Spark SQL血緣采集;
④ 服務(wù)監(jiān)控。
1. 標(biāo)簽化配置
我們在使用中發(fā)現(xiàn)有很多任務(wù)有共同的特性。
① ETL 任務(wù): 它的任務(wù)特點(diǎn)是運(yùn)行時間比較長,數(shù)據(jù)量比較大,并且因?yàn)檫\(yùn)行長需要穩(wěn)定運(yùn)行。
② 即席查詢的任務(wù): 快速響應(yīng)、數(shù)據(jù)量比較小,同時穩(wěn)定性要求沒有那么高。
③ 對于同一個用戶和業(yè)務(wù)內(nèi)部: 又有一些相似性,數(shù)據(jù)量級比較接近、用戶開發(fā)習(xí)慣比較接近。
所以我們 對這些任務(wù)進(jìn)行一些標(biāo)簽化的配置。 比如離線 ETL,我們配置一個 Connection 級別的獨(dú)立引擎,即席查詢使用 User 級別的引擎。不同的用戶和業(yè)務(wù)根據(jù)不同的需求配置一些資源,另外用戶開發(fā)習(xí)慣不同,可能有一些 Hive 兼容性的配置等。
用戶任務(wù)在請求的時候只需要帶上一些標(biāo)簽,比如帶上一個 ETL 任務(wù),這樣就會使用獨(dú)立的引擎,或者是一個即席查詢?nèi)蝿?wù),就會用 User 引擎級別。
Kyuubi 中是提供了一個 SessionConfAdvisor 接口,允許用戶對 Session 注入一些配置,進(jìn)而可以用來實(shí)現(xiàn)上述的標(biāo)簽化配置的功能。
2. SQL 事件審計
SQL 事件審計是基于 Kyuubi 的 Event 體系進(jìn)行實(shí)現(xiàn)的。
Kyuubi 在 Server 和 Engine 中都暴露了很多事件: 在 Server 中暴露的 Server 的啟停事件、Session 的打開\關(guān)閉事件,Operator 的執(zhí)行事件。Engine 中也記錄了 Session、Operation、還有血緣等事件。
Kyuubi 中通過 Event Handler 對這些事件進(jìn)行處理。在 Server 端,我們實(shí)現(xiàn)了一個 ES 的 Event Handler, 將 Server 的事件直接吐到 ES 中。
引擎端是用 Kyuubi 的 SparkHistory Logging EventHandler, 將 Event 記錄到了 Spark History 的 HDFS 文件 里面。
這樣我們就可以對 SQL 執(zhí)行事件進(jìn)行分析統(tǒng)計,比如可以統(tǒng)計出來 SQL 執(zhí)行的總量、失敗量,還有 SQL 的資源使用情況。 SQL 的執(zhí)行事件里面還包括了一些執(zhí)行的錯誤信息,可以使用正則匹配規(guī)則建立 錯誤信息的規(guī)則庫, 來進(jìn)行錯誤提示實(shí)現(xiàn) 輔助運(yùn)維。
我們還可以基于 SQL 的一些運(yùn)行歷史,對任務(wù)進(jìn)行 HBO 的優(yōu)化,推斷出更合適的配置。
同時 Spark UI 上面其實(shí) Kyuubi 也通過吐出的事件渲染出了 Kyuubi 的 Tab 頁,方便了 Kyuubi 任務(wù)的分析和排障。
Kyuubi 中是可以實(shí)現(xiàn)自定義的 Event Handler 對事件進(jìn)行處理。
3. Spark 血緣采集
這個是血緣采集的過程:
先通過 Spark Catalyst 對 SQL 解析成執(zhí)行計劃。
用 SparkAtlas Connector 的插件, 通過 QueryExecutionListener 攔截到執(zhí)行計劃,再把 它解析出血緣信息,再通過 Atlas 的客戶端給它發(fā)送到 Atlas 里面。
Kyuubi 中也帶了 Spark 的血緣插件,也支持列級別的血緣, 大家有興趣也可以去了解一下。
4. 服務(wù)監(jiān)控
Kyuubi 服務(wù)也暴露了很多的監(jiān)控指標(biāo), 服務(wù)端暴露了:GC、Memory、 Connection、Operation 等監(jiān)控的指標(biāo),內(nèi)部會有很多 Reporter 發(fā)送到不同的服務(wù)里面。
我們 通過自定義的 Reporter Service,把這些指標(biāo)直接吐到內(nèi)部的監(jiān)控平臺上,再配置一些告警和視圖。
同時也開發(fā)了撥測任務(wù),使用 JDBC 連接提交一些測試的任務(wù)到各個 Kyuubi Server 上執(zhí)行,判斷任務(wù)能否正常執(zhí)行。另外,定時調(diào)用 Kyuubi Server 的 Rest API 進(jìn)行健康檢查。
03
Spark SQL 服務(wù)優(yōu)化
1. 小文件優(yōu)化
下面分享一下 Spark SQL 的服務(wù)優(yōu)化,先看一下 Spark SQL 的小文件優(yōu)化。
為什么產(chǎn)生小文件?
左邊是讀一個表,直接寫入到另外一個表里面的操作,右邊是兩個表 Join 寫入到另外一個表中??匆幌伦筮?,假如它讀取的分區(qū)數(shù)是 100 個,寫入也是 100 個分區(qū),最終將會寫入 100 個文件。
右邊假設(shè)兩個表的并行度也是 100,經(jīng)過 Join 之后會產(chǎn)生 Shuffle 操作變成 200 個分區(qū),可能就寫入 200 個文件。 所以最終寫入的文件數(shù)是跟最終寫入 Stage 的 Task 數(shù)是相關(guān)的。
如何解決小文件?
我們怎么解決小文件? 需要在寫入操作之前進(jìn)行一個 Repartition 的操作,來控制寫入的 Stage 的分區(qū)數(shù),進(jìn)而控制寫入的小文件數(shù)。
左邊示例,我們插入了一個 Repartition 10 的算子,最終寫入的 Stage 的 Partition 數(shù)就是 10,最終可能也就生成 10 個文件。
右邊示例也是類似的一個操作。
Repartition 的兩個問題
① 第一個:Repartition 這個數(shù)我們怎么確定?
② 第二個:就是動態(tài)分區(qū)寫入的一個情況下,假設(shè)某一些分區(qū)的數(shù)據(jù)量比較大,又比較容易導(dǎo)致分區(qū)傾斜的情況。
Repartition 數(shù)量如何確定?AQE 動態(tài)合并小分區(qū)
下面看一下具體怎么解決這兩個問題。第一個 Repartition 數(shù)量如何確定?
首先我們可以把 Repartition 數(shù)設(shè)置為一個較大的值,并且因此觸發(fā)了 Shuffle 操作。再借助于 Spark 3 的 AQE 動態(tài)合并小分區(qū)的功能,自動的根據(jù)配置的大小將小分區(qū)進(jìn)行合并,分區(qū)數(shù)控制在合理的范圍。這樣就不需要特意的去配置 Repartition 的數(shù)量。
動態(tài)分區(qū)寫入導(dǎo)致超大文件-添加隨機(jī)數(shù)
動態(tài)分區(qū)如果有 一些分區(qū)的數(shù)量比較大,進(jìn)行 Repartition 操作之后,會導(dǎo)致數(shù)據(jù)傾斜。
對于這種情況,我們通過添加隨機(jī)數(shù),將分區(qū)進(jìn)行拆分。示例中我們加入了值為 4 的隨機(jī)數(shù),相當(dāng)于把每個分區(qū)拆成了 4 份,再對分區(qū)以及隨機(jī)數(shù)字段進(jìn)行 Repartition 操作。這樣可以看到分區(qū)傾斜有一個很好的效果。
不過這樣也會存在一個問題, 如果隨機(jī)數(shù)控制的不合理,它又容易再次導(dǎo)致小文件的問題。
Rebalance 平衡分區(qū)( + )
在 Spark 里面,引入了 Rebalance 的特性,可以自動平衡分區(qū), 分區(qū)過大的時候會進(jìn)行拆分,分區(qū)過小的時候進(jìn)行合并,這樣就可以有效地控制小文件和數(shù)據(jù)傾斜。
圖中上面兩個大分區(qū),在進(jìn)行 Rebalance 之后,會自動地分成兩個分區(qū),小分區(qū)又會自動合并成大分區(qū),這樣整體的效果就比較平衡。
右邊是經(jīng)過 Rebalance 的寫入的執(zhí)行計劃,上面是合并了小分區(qū),下面是拆了一下數(shù)據(jù)傾斜的分區(qū),寫入都是控制在了比較合適的范圍,基本上解決了小文件的問題。
Kyuubi 的相關(guān)特性是在 Kyuubi Extension 的插件里面,使用的時候只需要把這個插件放到 Spark_HOME 的 Jars 里面,同時配上 KyuubiSparkSQLExtension ,然后再配置一下 AQE 的一些配置,基本上就可以自動地優(yōu)化小文件。
2. Z-order 優(yōu)化簡介
下面分享 Kyuubi 的另一個比較優(yōu)秀的特性,就是 Z-order 優(yōu)化。
簡單介紹一下 Z-order 優(yōu)化, Z-order 排序?qū)嶋H上是將多維數(shù)據(jù)映射到一維上的排序算法。
這個圖片是來自 Databricks 的技術(shù)博客,例子可以看到,假設(shè)這有一個二維數(shù)據(jù) X、Y,左邊是一個 線性排序 :X = 0,Y = 1、2、3、4 進(jìn)行排序,并且 4 個數(shù)據(jù)點(diǎn)存放在一個存儲單元里面。Parquet 這一類的文件格式在查詢的時候會根據(jù)文件的元信息,做 Data Skipping 操作,比如文件塊里面的最大最小值來做過濾。左邊我們要執(zhí)行上面 SQL,過濾 X = 2 或者 Y = 2 的數(shù)據(jù),第一個數(shù)據(jù)塊 X = 0,Y 的最大最小值就是 0 ~ 3 之間,所以它會被選中,會被掃描,第二個文件塊,用 X = 0,Y 的最小值是 3,會直接跳過。如此以來就會掃描到 9 個文件塊,并且查詢了 21 個無效數(shù)據(jù)點(diǎn)。
右邊這個是 Z-order 的排序,可以看到它就類似一個 z 型曲線排序。同樣也是 4 個點(diǎn)放在一個文件塊里面。第一個文件塊,如果我們也執(zhí)行上面查詢 X = 2 或者 Y = 2,第一個文件塊 X、Y 最大最小值就是 0 ~ 1 之間,所以會被過濾掉。第二個文件塊 X,Y 的最大最小值是 2~3 之間,所以它就會被選中,第三個會被跳過。如此以來它會掃描到 7 個文件塊,13 個無效的數(shù)據(jù)點(diǎn), 跟左邊線性對比可以看到它的掃描的文件塊數(shù)其實(shí)是減小了,并且無效的數(shù)據(jù)會少很多。
這樣 經(jīng)過 Z-order 排序之后,會提高加速查詢的速度。 并且由于它多維映射到一維上, 相當(dāng)于多維數(shù)據(jù)上更加臨近了,相似性更高一些,所以它的壓縮效果會提高很多。
這邊是 Z-order 的一個實(shí)現(xiàn),最關(guān)鍵是將多維數(shù)據(jù)計算出 Z-value 值,再通過 Z-value 值進(jìn)行排序。
這是維基百科上面的兩張圖介紹。通過一個二進(jìn)制位交叉的算法,計算 Z-value 值,比如 X、Y 都變成了二進(jìn)制的數(shù),進(jìn)行位交錯計算出來 Z-value 的值,通過這個值進(jìn)行排序。右邊就是經(jīng)過算法之后的排序效果??梢钥吹降谝粋€點(diǎn) 000000,第二個點(diǎn)000001,第三個點(diǎn)再到 000010,再到 000011。是一個 Z 型的曲線。
這個特性在 Kyuubi 中也是放在 KyuubiExtensionSpark 的插件里面,我們只需要把它復(fù)制到 Spark Jars 里面,并且在 SparkConf 里面配置 Kyuubi Spark SQL Extension。
對存量的數(shù)據(jù)只需執(zhí)行一下 Optimize 的命令, 就可以對存量數(shù)據(jù)進(jìn)行重排序然后再寫入的操作。
對于增量數(shù)據(jù),我們可以在表里面加上 、 兩個Properties, 在增量寫入的時候,它就會自動進(jìn)行 Z-order 的優(yōu)化寫入。
04
問答環(huán)節(jié)
Q1:Kyuubi 怎么實(shí)現(xiàn)持久化的 Spark Context?
A1:Kyuubi 中會啟動一個獨(dú)立的引擎,并且支持一些共享策略。比如像 User 級別共享引擎,很多人使用同一個 Hadoop 用戶進(jìn)行提交,將會使用同一個引擎,引擎具有一個超時時間,在空閑一段時間后才會退出。比如你提交一個任務(wù),執(zhí)行完之后連接會斷掉,但是引擎是沒有退出的,還常駐在那里,其他的人一提交就會馬上連到引擎上面。
Q2:什么叫撥測?
A2:撥測就是對服務(wù)的健康檢測。比如每 5 分鐘提交一個 JDBC 請求,保證服務(wù)可用。每 1 分鐘去調(diào)一下它的 Rest API,確定服務(wù)是存活的,連續(xù)失敗了幾次就可以確定服務(wù)不可用,自動地對它進(jìn)行一個重啟。
今天的分享就到這里,謝謝大家。
關(guān)鍵詞: