2014年,Spark開源生態(tài)系統(tǒng)得到了大幅增長,已成為大數(shù)據(jù)領(lǐng)域最人氣的開源項目之一,活躍在Hortonworks、IBM、Cloudera、MapR和Pivotal等眾多知名大數(shù)據(jù)公司,更擁有Spark SQL、Spark Streaming、MLlib、GraphX等多個相關(guān)項目。同時值得一提的是,Spark貢獻者中有一半左右的中國人。
短短四年時間,Spark不僅發(fā)展為Apache基金會的頂級開源項目,更通過其高性能內(nèi)存計算及其豐富的生態(tài)快速贏得幾乎所有大數(shù)據(jù)處理用戶。2015年1月10日,一場基于Spark的高性能應(yīng)用實踐盛宴由Databricks軟件工程師連城、百度高級工程師甄鵬、百度架構(gòu)師孫垚光、百度美國研發(fā)中心高級架構(gòu)師劉少山四位專家聯(lián)手打造。
Databricks軟件工程師連城——Spark SQL 1.2的提升和新特性
談及Spark SQL 1.2的提升和新特性,連城主要總結(jié)了4個方面——External data source API(外部數(shù)據(jù)源API)、列式內(nèi)存存儲加強(Enhanced in-memory columnar storage)、Parquet支持加強(Enhanced Parquet support)和Hive支持加強(Enhanced Hive support)。
External data source API
連城表示,因為在處理很多外部數(shù)據(jù)源中出現(xiàn)的擴展問題,Spark在1.2版本發(fā)布了External data source API。通過External data source API,Spark將不同的外部數(shù)據(jù)源抽象成一個關(guān)系表格,從而實現(xiàn)更貼近無縫的操作。
External data source API在支持了多種如JSON、Avro、CSV等簡單格式的同時,還實現(xiàn)了Parquet、ORC等的智能支持;同時,通過這個API,開發(fā)者還可以使用JDBC將HBase這樣的外部系統(tǒng)對接到Spark中。
連城表示,在1.2版本之前,開發(fā)者其實已經(jīng)實現(xiàn)了各種各樣外部數(shù)據(jù)源的支持,因此,對比更原生的支持一些外部數(shù)據(jù)源,External data source API的意義更在于針對相應(yīng)數(shù)據(jù)源進行的特殊優(yōu)化,主要包括Column pruning(列剪枝)和Pushing predicates to datasources(將predicates貼近數(shù)據(jù)源)兩個方面:
Column pruning。主要包括縱橫的兩種剪枝。在列剪枝中,Column pruning可以完全忽視無需處理的字段,從而顯著地減少IO。同時,在某些條件查詢中,基于Parquet、ORC等智能格式寫入時記錄的統(tǒng)計信息(比如最大值、最小值等),掃描可以跳過大段的數(shù)據(jù),從而省略了大量的磁盤掃描負載。
Pushing predicates to datasources。在更復(fù)雜的SQL查詢中,讓過濾條件維度盡可能的接近數(shù)據(jù)源,從而減少磁盤和網(wǎng)絡(luò)IO,最終提高整體端到端的性能。
使用External data source API之前
使用External data source API之后
搭載了如Parquet和ORC這樣的智能格式
連城表示,在Spark 1.2版本中,External data source API并沒有實現(xiàn)預(yù)期中的功能,在Roadmap中,F(xiàn)irst class分片支持(First class partitioning support with partition pruning)、Data sink(insertion)API、將Hive作為外部數(shù)據(jù)源等。
Enhanced in-memory columnar storage
連城表示,不管Shark,還是Spark,內(nèi)存緩存表的支持都是非常重要的一個特性。他表示,雖然在1.1和之前版本中的列式內(nèi)存表的性能已然不錯,但是還會出現(xiàn)一些問題:第一,大數(shù)據(jù)量下緩存超大體積表時(雖然不推薦,但不缺現(xiàn)實用例),會出現(xiàn)OOM等問題;第二,在列式存儲中,像Parquet、ORC這種收集統(tǒng)計信息然后通過這些信息做partition skipping等操作在之前版本中并沒有完全實現(xiàn)。這些問題在1.2版本中都得到了解決,本節(jié),連城主要介紹了語義統(tǒng)一、緩存實體化、基于緩存共享的查詢計劃、Cache大表時的OOM問題、表格統(tǒng)計(Table statistics)等方面。
緩存實體化。SQLContext.cacheTable(“tbl”)默認(rèn)使用eager模式,緩存實體化將自動進行,不會再等到表被使用或觸發(fā)時,避免手動做“SELECT COUNT(*) FROM src;”。同時,新增了“CACHE [LAZY] TABLE tbl [AS SELECT …]”這樣的DML。
語義統(tǒng)一。早期時候,SchemaRDD.cache()和SQLContext.cacheTable(“tbl”)這兩個語義是不同的。其中,SQLContext.cacheTable會去建立一些列式存儲格式相關(guān)優(yōu)化,而SchemaRDD.cache()卻以一行一個對象的模式進行。在1.2版本中,這兩個操作已被統(tǒng)一,同時各種cache操作都將得到一個統(tǒng)一的內(nèi)存表。
基于緩存共享的查詢計劃。兩個得到相同結(jié)果的cache語句將共享同一份緩存數(shù)據(jù)。
避免Cache大表時的OOM問題。優(yōu)化內(nèi)存表的建立和訪問,減少開銷,進一步提升性能;在緩存大表時,引入batched column buffer builder,將每一列切成多個batch,從而避免了OOM。
表格統(tǒng)計。Table statistics,類似Parquet、ORC使用的技術(shù),在1.2版本中主要實現(xiàn)了Predicate pushdown(實現(xiàn)更快的表格掃描)和Auto broadcast join(實現(xiàn)更快的表格join)。
最后,連城還詳細介紹了一些關(guān)于加強Parquet和Hive支持的實現(xiàn),以及Spark未來的一些工作。
百度基礎(chǔ)架構(gòu)部高級工程師甄鵬——Spark在百度開放云BMR中的實戰(zhàn)分享
百度分布式計算團隊從2011年開始持續(xù)關(guān)注Spark,并于2014年將Spark正式引入百度分布式計算生態(tài)系統(tǒng)中,在國內(nèi)率先面向開發(fā)者及企業(yè)用戶推出了支持Spark并兼容開源接口的大數(shù)據(jù)處理產(chǎn)品BMR(Baidu MapReduce)。在甄鵬的分享中,我們主要了解了百度Spark 應(yīng)用現(xiàn)狀、百度開放云BMR和Spark On BMR三個方面的內(nèi)容。
Spark在百度
甄鵬表示,當(dāng)前百度的Spark集群由上千臺物理主機(數(shù)萬Cores,上百TBMemory)組成,日提交App在數(shù)百,已應(yīng)用于鳳巢、大搜索、直達號、百度大數(shù)據(jù)等業(yè)務(wù)。之以選擇Spark,甄鵬總結(jié)了三個原因:快速高效、API 友好易用和組件豐富。
快速高效。首先,Spark使用了線程池模式,任務(wù)調(diào)度效率很高;其次,Spark可以最大限度地利用內(nèi)存,多輪迭代任務(wù)執(zhí)行效率高。
API友好易用。這主要基于兩個方面:第一,Spark支持多門編程語言,可以滿足不同語言背景的人使用;第二,Spark的表達能力非常豐富,并且封裝了大量常用操作。
組件豐富。Spark生態(tài)圈當(dāng)下已比較完善,在官方組件涵蓋SQL、圖計算、機器學(xué)習(xí)和實時計算的同時,還有著很多第三方開發(fā)的優(yōu)秀組件,足以應(yīng)對日常的數(shù)據(jù)處理需求。
百度開放云BMR
在BMR介紹中,甄鵬表示,雖然BMR被稱為Baidu MapReduce,但是這個名稱已經(jīng)不能完全表示出這個平臺:BMR是百度開放云的數(shù)據(jù)分析服務(wù)產(chǎn)品,基于百度多年大數(shù)據(jù)處理分析經(jīng)驗,面向企業(yè)和開發(fā)者提供按需部署的Hadoop&Spark集群計算服務(wù),讓客戶具備海量數(shù)據(jù)分析和挖掘能力,從而提升業(yè)務(wù)競爭力。
如圖所示,BMR基于BCC(百度云服務(wù)器),建立在HDFS和BOS(百度對象存儲)分布式存儲之上,其處理引擎包含了MapReduce和Spark,同時還使用了HBase數(shù)據(jù)庫。在此之上,系統(tǒng)集成了Pig、Hive、SQL、Streaming、GraphX、MLLib等專有服務(wù)。在系統(tǒng)的最上層,BMR提供了一個基于Web的控制臺,以及一個API形式的SDK。
在圖片的最右邊,Scheduler在BMR中起到了管理作用,使用它開發(fā)者可以編寫比較復(fù)雜的作業(yè)流。
Spark On BMR
類似于通常的云服務(wù),BMR中的Spark同樣隨用隨起,集群空閑即銷毀,幫助用戶節(jié)省預(yù)算。此外,集群創(chuàng)建可以在3到5分鐘內(nèi)完成,包含了完整的Spark+HDFS+YARN堆棧。同時,BMR也提供Long Running模式,并有多種套餐可選。
完善的報表服務(wù),全方位監(jiān)控
在安全上,用戶擁有虛擬的獨立網(wǎng)絡(luò),在同一用戶全部集群可互聯(lián)的同時,BMR用戶間網(wǎng)絡(luò)被完全隔離。同時,BMR還支持動態(tài)擴容,節(jié)點規(guī)?蓮椥陨炜s。除此之外,在實現(xiàn)Spark全組件支持的同時,BMR可無縫對接百度的對象存儲BOS服務(wù),借力百度多年的存儲研發(fā)經(jīng)驗,保證數(shù)據(jù)存儲的高可靠性。
百度基礎(chǔ)架構(gòu)部架構(gòu)師孫垚光——百度高性能通用Shuffle服務(wù)
在2014 Sort Benchmark國際大賽上,百度成功奪冠,其幕后英雄無疑卓越的Shuffle機制,在孫垚光的分享中,我們對Shuffle的發(fā)展、細節(jié)和未來有了一次深度的接觸。
Shuffle簡介
孫垚光表示,簡單來說,Shuffle就是按照一定的分組和規(guī)則Map一個數(shù)據(jù),然后傳入Reduce端。不管對于MapReduce還是Spark,Shuffle都是一個非常重要的階段。然而,雖然Shuffle解決的問題相同,但是在Spark和MapReduce中,Shuffle流程(具體時間和細節(jié))仍然存在一定的差別:
Baidu Shuffle發(fā)展歷程
通過孫垚光了解到,Shuffle在百度的發(fā)展主要包括兩個階段:跟隨社區(qū)和獨立發(fā)展。從2008年百度的MapReduce/Hadoop起步開始,百度就開始跟隨社區(qū),使用社區(qū)版本,期間的主要工作包含Bug修復(fù)和性能優(yōu)化兩個方面(增加內(nèi)存池、減少JVMGC,傳輸Server由Jetty換Netty,及批量傳輸、聚合數(shù)據(jù)等方面)。
分離了shuffle和Map/Reduce
在2012年開始,Baidu Shuffle開啟獨立發(fā)展階段,主要源于下一代離線計算系統(tǒng)的開發(fā),Shuffle被抽離為獨立的ShuffleService服務(wù),從而提高了集群資源的利用率。
截止此時,不管是社區(qū)版本(MapReduce/Spark),還是百度研發(fā)的ShuffleService,它們都是基于磁盤的PULL模式;诖疟P,所有Map的數(shù)據(jù)都會放到磁盤,雖然Spark號稱內(nèi)存計算,但是涉及到Shuffle時還是會寫磁盤;赑ULL,所有數(shù)據(jù)在放到Map端的磁盤之后,Reduce在使用時還需要主動的拉出來,因此會受到兩個問題影響:首先,業(yè)務(wù)數(shù)據(jù)存儲在Map端的服務(wù)器上,機器宕機時會不可避免丟失數(shù)據(jù),這一點在大規(guī)模分布式集群中非常致命;其次,更重要的是,Shuffle階段會產(chǎn)生大量的磁盤尋道(隨機讀)和數(shù)據(jù)重算(中間數(shù)據(jù)存在本地磁盤),舉個例子,某任務(wù)有1百萬個Map,1萬個Reduce,如果一次磁盤尋道的時間是10毫秒,那么集群總共的磁盤尋道時間= 1000000 ×10000 ×0.01 = 1億秒。
New Shuffle
基于這些問題,百度設(shè)計了基于內(nèi)存的PUSH模式。新模式下,Map輸出的數(shù)據(jù)將不落磁盤,并在內(nèi)存中及時地Push給遠端的Shuffle模塊,從而將獲得以下提升:
New Shuffle的優(yōu)勢
New Shuffle架構(gòu)
如圖所示,藍色部分為New Shuffle部分,主要包含兩個部分:數(shù)據(jù)寫入和讀取的API,Map端會使用這個接口來讀取數(shù)據(jù),Reduce會使用這個接口來讀取數(shù)據(jù);其次,最終重要的是,服務(wù)器端使用了典型的主從架構(gòu),用多個shuffle工作者節(jié)點來shuffle數(shù)據(jù)。同時,在系統(tǒng)設(shè)計中,Master非常有利于橫向擴展,讓shuffle不會成為整個分布式系統(tǒng)的瓶頸。
讓New Shuffle模塊專注于shuffle,不依賴于外部計算模塊,從而計算模塊可以專注于計算,同時還避免了磁盤IO。然而New Shuffle帶來的問題也隨之暴漏,其中影響比較重要的兩個就是:慢節(jié)點和數(shù)據(jù)重復(fù)。
慢節(jié)點。以shuffle寫入過程中出現(xiàn)慢節(jié)點為例,通常包含兩個情況。首先,Shuffle自身慢節(jié)點,對比社區(qū)版本中只會影響到一個task,New Shuffle中常常會影響到一片集群。在這里,百度為每個Shuffle節(jié)點都配置了一個從節(jié)點,當(dāng)Map檢測到一個慢節(jié)點時,系統(tǒng)會自動切換到從節(jié)點。其次,DFS出現(xiàn)慢節(jié)點,這個情況下,Shuffle的從節(jié)點只能起到緩解作用。這種情況下,首先DFS系統(tǒng)會自動檢測出慢節(jié)點,并進行替換。比如,傳統(tǒng)的HDFS會以pipeline的形式進行寫入,而DFS則轉(zhuǎn)換為分發(fā)寫。
在此之外,New Shuffle還需要解決更多問題,比如資源共享和隔離等。同時,基于New Shuffle的機制,New Shuffle還面臨一些其他挑戰(zhàn),比如Reduce全啟動、數(shù)據(jù)過于分散、對DFS壓力過大、連接數(shù)等等。
數(shù)據(jù)重復(fù)。如上圖所示,這些問題主要因為New Shuffle對上層組件缺少感知,這個問題的解決主要使用task id和block id進行去重。
New Shuffle展望
孫垚光表示,New Shuffle使用了通用的Writer和Reader接口,當(dāng)下已經(jīng)支持百度MR和DCE(DAG、C++),同時即將對開源Spark提供支持。在未來,New Shuffle無疑將成為更通用的組件,支持更多的計算模型。
百度美國硅谷研發(fā)中心高級架構(gòu)師劉少山——Fast big data analytics with Spark on Tachyon
Tachyon是一個分布式的內(nèi)存文件系統(tǒng),可以在集群里以訪問內(nèi)存的速度來訪問存在Tachyon里的文件。Tachyon是架構(gòu)在分布式文件存儲和上層各種計算框架之間的中間件,主要負責(zé)將那些不需要落到DFS里的文件,落到分布式內(nèi)存文件系統(tǒng)中,從而達到共享內(nèi)存,以提高效率。1月10日下午的最后一場分享中,劉少山帶來了一場Tachyon的深入解析。
Tachyon和Spark
劉少山表示,在Spark使用過程中,用戶經(jīng)常困擾于3個問題:首先,兩個Spark 實例通過存儲系統(tǒng)來共享數(shù)據(jù),這個過程中對磁盤的操作會顯著降低性能;其次,因為Spark崩潰所造成的數(shù)據(jù)丟失;最后,垃圾回收機制,如果兩個Spark實例需求同樣的數(shù)據(jù),那么這個數(shù)據(jù)會被緩存兩次,從而造成很大的內(nèi)存壓力,更降低性能。
使用Tachyon,存儲可以從Spark中分離處理,讓其更專注于計算,從而避免了上述的3個問題。
Tachyon架構(gòu)
劉少山從Spark的角度分享了Tachyon的部署。在與Spark搭配使用時,系統(tǒng)會建立一個Tachyon的job,通過Tachyon Client來訪問同一個機器上的Tachyon Worker,也就是機器上的內(nèi)存。而Tachyon Client則會與Tachyon Master交互,來清楚每個分節(jié)點所包含的數(shù)據(jù)。由此可見,在整個Tachyon 系統(tǒng)中,Master、Client和Worker為最重要的三個部分。
Tachyon Master。Master主要部件是Inode和Master Worker Info:Inode會負責(zé)系統(tǒng)的監(jiān)視,Master Worker Info則存儲了所有Worker的信息。
Tachyon Worker。Worker主要負責(zé)存儲,其中Worker Storage是最主要的數(shù)據(jù)結(jié)構(gòu),包含Local data folder和Under File System兩個部分。其中Local data folder表示存在本地的Tachyon文件,Under File System則負責(zé)從HDFS中讀取Worker中未發(fā)現(xiàn)的數(shù)據(jù)。
Tachyon Client。Client為上層用戶提供了一個透明的機制,其TachyonFS接口負責(zé)數(shù)據(jù)請求。每個Client中有多個Tachyon File,其中Block In Stream負責(zé)文件讀。↙ocal Block In Stream負責(zé)本地機器讀取,Remote Block In Stream則負責(zé)讀取遠程機器);Block Out Stream主要負責(zé)將文件寫到本地機器上。在Client上,Master Client會與Master交互,Worker Client則與Client交互。
Tachyon在百度
為什么要使用Tachyon,劉少山指出,在百度,計算集群和存儲集群往往不在同一個地理位置的數(shù)據(jù)中心,在大數(shù)據(jù)分析時,遠程數(shù)據(jù)讀取將帶來非常高的延時,特別是ad-hoc查詢。因此,將Tachyon作為一個傳輸緩存層,百度通常會將之部署在計算集群上。首次查詢時,數(shù)據(jù)會從遠程存儲取出,而在以后的查詢中,數(shù)據(jù)就會從本地的Tacnyon上讀取,從而大幅的改善了延時。
在百度,Tachyon的部署還處于初始階段,大約部署了50臺機器,主要服務(wù)于ad-hoc查詢。
實踐中遭遇的挑戰(zhàn)
通過劉少山了解到,Tachyon的使用過程并不是一帆風(fēng)順,比如:因為Tachyon需求對Block完全讀取,從而可能造成Blocks并未被緩存;有時候,雖然scheduler已經(jīng)確認(rèn)了數(shù)據(jù)存在本地,Spark workers仍然從遠程blocks讀取,而緩存命中率也只有可憐的33%(如果你需要的是2號block,Tachyon會分別從1、2、3號block讀取,從而將block讀取了3份)。因此,劉少山表示,如果要使用好Spark與Tachyon,一定要對用例和Tachyon進行充分的了解。
分享最后,劉少山還介紹了Hierarchical Storage Feature特性以及百度未來的工作,其中包括緩存替換策略等。