在學(xué)習(xí)完powerjob并了解基本工作原理之后,基于業(yè)務(wù)需要基礎(chǔ)的BasicProcessor(單機(jī)處理器)不能完美的契合業(yè)務(wù)需求,BasicProcessor如果一次任務(wù)中處理設(shè)備量過大,那么則會(huì)造成單個(gè)任務(wù)實(shí)例負(fù)載過高,并且系統(tǒng)壓力過大,因此選用了支持將單個(gè)任務(wù)中全部設(shè)備,通過將任務(wù)分批拆分為多個(gè)包含少量設(shè)備的子任務(wù)的MapProcessor。
MapProcessor介紹
MapProcessor接口繼承了BasicProcessor接口并且對其進(jìn)行再次增強(qiáng),使其可以通過子任務(wù)的方式將任務(wù)拆分執(zhí)行,對執(zhí)行任務(wù)的節(jié)點(diǎn)更加友好,在MapProcessor中可以通過MapProcessor接口的isRoot()方法判斷該次執(zhí)行的任務(wù)是否為根任務(wù),調(diào)用該方法會(huì)返回一個(gè)布爾值,為根任務(wù),則可以構(gòu)建子任務(wù),需手動(dòng)創(chuàng)建子任務(wù)實(shí)體類,用于存入子任務(wù)執(zhí)行時(shí)所需要的參數(shù),此次在項(xiàng)目中使用則是解決設(shè)備功能調(diào)用中同時(shí)給大批量設(shè)備同時(shí)下發(fā)功能調(diào)用指令.考慮到后期設(shè)備可能過多,所以選擇MapProcessor處理器通過構(gòu)建子任務(wù)的方式將,任務(wù)拆分,每個(gè)子任務(wù)攜帶部分設(shè)備ID分別執(zhí)行,減輕單次執(zhí)行任務(wù)時(shí)的壓力。
具體實(shí)現(xiàn)
1. 定義子任務(wù)類
用于在構(gòu)建子任務(wù)時(shí)攜帶執(zhí)行所需參數(shù)。
2.創(chuàng)建執(zhí)行執(zhí)行器并實(shí)現(xiàn)MapProcessor接口
在實(shí)現(xiàn)接口時(shí)需重寫process()方法即執(zhí)行的任務(wù)邏輯
3. isRoot()
通過MapProcessor執(zhí)行任務(wù),無論是根任務(wù)還是子任務(wù)都是執(zhí)行的同一個(gè)process()方法,所以MapProcessor接口提供了isRoot()方法用來判斷,當(dāng)前執(zhí)行的任務(wù)是根任務(wù)還是子任務(wù)。
4.判斷為根任務(wù)
當(dāng)isRoot()方法返回true時(shí),則表明當(dāng)前任務(wù)為根任務(wù),此時(shí)需構(gòu)建子任務(wù)集合,例如此任務(wù)需操作1000個(gè)設(shè)備,那么則可以構(gòu)建5個(gè)子任務(wù)將1000個(gè)設(shè)備ID分別保存到5個(gè)子任務(wù)對象中,然后將五個(gè)子任務(wù)對象放入到集合中,再通過調(diào)用map()方法,將子任務(wù)集合及任務(wù)名,傳入到taskContext,任務(wù)上下文中,以便后續(xù)的調(diào)用。
5.判斷不為根任務(wù)
本文僅以簡單的二層任務(wù)結(jié)構(gòu)距離,如果判斷不為根任務(wù)則此時(shí)一定是子任務(wù),只需要通過taskContext調(diào)用getSubTask()方法,將我們創(chuàng)建的子任務(wù)取出。
并根據(jù)先前在創(chuàng)建子任務(wù)時(shí)傳入的執(zhí)行所需參數(shù),進(jìn)行對應(yīng)的業(yè)務(wù)處理。
創(chuàng)建任務(wù)
在創(chuàng)建需要使用MapProcessor處理器處理的任務(wù)時(shí),其他的參數(shù)與我們之前創(chuàng)建演示Demo無異,只是需要在執(zhí)行配置處,選擇Map執(zhí)行,并將我們所編寫的處理器全類名填寫正確.如果沒有選擇Map執(zhí)行,任務(wù)實(shí)例日志中會(huì)顯示。
此時(shí)則需要檢查,創(chuàng)建任務(wù)的參數(shù)是否填選正確。
后續(xù)的任務(wù)執(zhí)行則無特別操作,只需要在任務(wù)管理頁面,通過滑塊來確定是否需要執(zhí)行該任務(wù).并在運(yùn)行實(shí)例中可以查看每次任務(wù)執(zhí)行的日志。
拓展: MapReduceProcessor
此處理器則是在MapProcessor處理器基礎(chǔ)上,添加了reduce()方法,在實(shí)現(xiàn)該接口時(shí)除了需要重寫process方法外還需要重寫reduce方法.reduce方法則是根據(jù)process()方法的返回值,對子任務(wù)的執(zhí)行狀態(tài)進(jìn)行統(tǒng)計(jì),方便后續(xù)對子任務(wù)的計(jì)算操作,單此處理器在執(zhí)行計(jì)算統(tǒng)計(jì)時(shí),需要對數(shù)據(jù)庫進(jìn)行全量掃描,對節(jié)點(diǎn)內(nèi)存壓力過大,謹(jǐn)慎使用,如果只是簡單的對任務(wù)進(jìn)行拆分,不需要后續(xù)再對子任務(wù)處理結(jié)果進(jìn)行計(jì)算操作,MapProcessor即可進(jìn)行處理。