更新時間:2018年10月24日16時12分 來源:傳智播客 瀏覽次數(shù):
1.MapTask工作機制
整個Map階段流程大體如上圖所示。簡單概述:input File通過split被邏輯切分為多個split文件,通過Record按行讀取內(nèi)容給map(用戶自己實現(xiàn)的)進行處理,數(shù)據(jù)被map處理結束之后交給OutputCollector收集器,對其結果key進行分區(qū)(默認使用hash分區(qū)),然后寫入buffer,每個map task都有一個內(nèi)存緩沖區(qū),存儲著map的輸出結果,當緩沖區(qū)快滿的時候需要將緩沖區(qū)的數(shù)據(jù)以一個臨時文件的方式存放到磁盤,當整個map task結束后再對磁盤中這個map task產(chǎn)生的所有臨時文件做合并,生成最終的正式輸出文件,然后等待reduce task來拉數(shù)據(jù)。
詳細步驟:
Ø 首先,讀取數(shù)據(jù)組件InputFormat(默認TextInputFormat)會通過getSplits方法對輸入目錄中文件進行邏輯切片規(guī)劃得到splits,有多少個split就對應啟動多少個MapTask。split與block的對應關系默認是一對一。
Ø 將輸入文件切分為splits之后,由RecordReader對象(默認LineRecordReader)進行讀取,以\n作為分隔符,讀取一行數(shù)據(jù),返回
Ø 讀取split返回
Ø map邏輯完之后,將map的每條結果通過context.write進行collect數(shù)據(jù)收集。在collect中,會先對其進行分區(qū)處理,默認使用HashPartitioner。
MapReduce提供Partitioner接口,它的作用就是根據(jù)key或value及reduce的數(shù)量來決定當前的這對輸出數(shù)據(jù)最終應該交由哪個reduce task處理。默認對key hash后再以reduce task數(shù)量取模。默認的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以訂制并設置到job上。
Ø 接下來,會將數(shù)據(jù)寫入內(nèi)存,內(nèi)存中這片區(qū)域叫做環(huán)形緩沖區(qū),緩沖區(qū)的作用是批量收集map結果,減少磁盤IO的影響。我們的key/value對以及Partition的結果都會被寫入緩沖區(qū)。當然寫入之前,key與value值都會被序列化成字節(jié)數(shù)組。
環(huán)形緩沖區(qū)其實是一個數(shù)組,數(shù)組中存放著key、value的序列化數(shù)據(jù)和key、value的元數(shù)據(jù)信息,包括partition、key的起始位置、value的起始位置以及value的長度。環(huán)形結構是一個抽象概念。
緩沖區(qū)是有大小限制,默認是100MB。當map task的輸出結果很多時,就可能會撐爆內(nèi)存,所以需要在一定條件下將緩沖區(qū)中的數(shù)據(jù)臨時寫入磁盤,然后重新利用這塊緩沖區(qū)。這個從內(nèi)存往磁盤寫數(shù)據(jù)的過程被稱為Spill,中文可譯為溢寫。這個溢寫是由單獨線程來完成,不影響往緩沖區(qū)寫map結果的線程。溢寫線程啟動時不應該阻止map的結果輸出,所以整個緩沖區(qū)有個溢寫的比例spill.percent。這個比例默認是0.8,也就是當緩沖區(qū)的數(shù)據(jù)已經(jīng)達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的內(nèi)存,執(zhí)行溢寫過程。Map task的輸出結果還可以往剩下的20MB內(nèi)存中寫,互不影響。
Ø 當溢寫線程啟動后,需要對這80MB空間內(nèi)的key做排序(Sort)。排序是MapReduce模型默認的行為,這里的排序也是對序列化的字節(jié)做的排序。
如果job設置過Combiner,那么現(xiàn)在就是使用Combiner的時候了。將有相同key的key/value對的value加起來,減少溢寫到磁盤的數(shù)據(jù)量。Combiner會優(yōu)化MapReduce的中間結果,所以它在整個模型中會多次使用。
那哪些場景才能使用Combiner呢?從這里分析,Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。Combiner只應該用于那種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結果的場景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它對job執(zhí)行效率有幫助,反之會影響reduce的最終結果。
Ø 每次溢寫會在磁盤上生成一個臨時文件(寫之前判斷是否有combiner),如果map的輸出結果真的很大,有多次這樣的溢寫發(fā)生,磁盤上相應的就會有多個臨時文件存在。當整個數(shù)據(jù)處理結束之后開始對磁盤中的臨時文件進行merge合并,因為最終的文件只有一個,寫入磁盤,并且為這個文件提供了一個索引文件,以記錄每個reduce對應數(shù)據(jù)的偏移量。
至此map整個階段結束。
2.
ReduceTask工作機制
Reduce大致分為copy、sort、reduce三個階段,重點在前兩個階段。copy階段包含一個eventFetcher來獲取已完成的map列表,由Fetcher線程去copy數(shù)據(jù),在此過程中會啟動兩個merge線程,分別為inMemoryMerger和onDiskMerger,分別將內(nèi)存中的數(shù)據(jù)merge到磁盤和將磁盤中的數(shù)據(jù)進行merge。待數(shù)據(jù)copy完成之后,copy階段就完成了,開始進行sort階段,sort階段主要是執(zhí)行finalMerge操作,純粹的sort階段,完成之后就是reduce階段,調(diào)用用戶定義的reduce函數(shù)進行處理。
詳細步驟:
Ø Copy階段,簡單地拉取數(shù)據(jù)。Reduce進程啟動一些數(shù)據(jù)copy線程(Fetcher),通過HTTP方式請求maptask獲取屬于自己的文件。
Ø Merge階段。這里的merge如map端的merge動作,只是數(shù)組中存放的是不同map端copy來的數(shù)值。Copy過來的數(shù)據(jù)會先放入內(nèi)存緩沖區(qū)中,這里的緩沖區(qū)大小要比map端的更為靈活。merge有三種形式:內(nèi)存到內(nèi)存;內(nèi)存到磁盤;磁盤到磁盤。默認情況下第一種形式不啟用。當內(nèi)存中的數(shù)據(jù)量到達一定閾值,就啟動內(nèi)存到磁盤的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設置有Combiner,也是會啟用的,然后在磁盤中生成了眾多的溢寫文件。第二種merge方式一直在運行,直到?jīng)]有map端的數(shù)據(jù)時才結束,然后啟動第三種磁盤到磁盤的merge方式生成最終的文件。
Ø 把分散的數(shù)據(jù)合并成一個大的數(shù)據(jù)后,還會再對合并后的數(shù)據(jù)排序。
Ø 對排序后的鍵值對調(diào)用reduce方法,鍵相等的鍵值對調(diào)用一次reduce方法,每次調(diào)用會產(chǎn)生零個或者多個鍵值對,最后把這些輸出的鍵值對寫入到HDFS文件中。
3. Shuffle機制
map階段處理的數(shù)據(jù)如何傳遞給reduce階段,是MapReduce框架中最關鍵的一個流程,這個流程就叫shuffle。
shuffle: 洗牌、發(fā)牌——(核心機制:數(shù)據(jù)分區(qū),排序,合并)。
shuffle是Mapreduce的核心,它分布在Mapreduce的map階段和reduce階段。一般把從Map產(chǎn)生輸出開始到Reduce取得數(shù)據(jù)作為輸入之前的過程稱作shuffle。
1).Collect階段:將MapTask的結果輸出到默認大小為100M的環(huán)形緩沖區(qū),保存的是key/value,Partition分區(qū)信息等。
2).Spill階段:當內(nèi)存中的數(shù)據(jù)量達到一定的閥值的時候,就會將數(shù)據(jù)寫入本地磁盤,在將數(shù)據(jù)寫入磁盤之前需要對數(shù)據(jù)進行一次排序的操作,如果配置了combiner,還會將有相同分區(qū)號和key的數(shù)據(jù)進行排序。
3).Merge階段:把所有溢出的臨時文件進行一次合并操作,以確保一個MapTask最終只產(chǎn)生一個中間數(shù)據(jù)文件。
4).Copy階段: ReduceTask啟動Fetcher線程到已經(jīng)完成MapTask的節(jié)點上復制一份屬于自己的數(shù)據(jù),這些數(shù)據(jù)默認會保存在內(nèi)存的緩沖區(qū)中,當內(nèi)存的緩沖區(qū)達到一定的閥值的時候,就會將數(shù)據(jù)寫到磁盤之上。
5).Merge階段:在ReduceTask遠程復制數(shù)據(jù)的同時,會在后臺開啟兩個線程對內(nèi)存到本地的數(shù)據(jù)文件進行合并操作。
6).Sort階段:在對數(shù)據(jù)進行合并的同時,會進行排序操作,由于MapTask階段已經(jīng)對數(shù)據(jù)進行了局部的排序,ReduceTask只需保證Copy的數(shù)據(jù)的最終整體有效性即可。
Shuffle中的緩沖區(qū)大小會影響到mapreduce程序的執(zhí)行效率,原則上說,緩沖區(qū)越大,磁盤io的次數(shù)越少,執(zhí)行速度就越快
緩沖區(qū)的大小可以通過參數(shù)調(diào)整, 參數(shù):io.sort.mb 默認100M
作者:傳智播客大數(shù)據(jù)培訓學院
首發(fā):http://cloud.itcast.cn