跳到主要內容

線程池續:你必須要知道的線程池submit()實現原理之FutureTask!_台北網頁設計


※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益



擁有後台管理系統的網站,將擁有強大的資料管理與更新功能,幫助您隨時新增網站的內容並節省網站開發的成本。




前言


上一篇內容寫了Java中線程池的實現原理及源碼分析,說好的是實實在在的大滿足,想通過一篇文章讓大家對線程池有個透徹的了解,但是文章寫完總覺得還缺點什麼?


上篇文章只提到線程提交的execute()方法,並沒有講解線程提交的submit()方法,submit()有一個返回值,可以獲取線程執行的結果Future<T>,這一講就那深入學習下submit()FutureTask實現原理。


使用場景&示例


使用場景


我能想到的使用場景就是在并行計算的時候,例如一個方法中調用methodA()、methodB(),我們可以通過線程池異步去提交方法A、B,然後在主線程中獲取組裝方法A、B計算后的結果,能夠大大提升方法的吞吐量。



使用示例


/**
* @author wangmeng
* @date 2020/5/28 15:30
*/
public class FutureTaskTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newCachedThreadPool();

System.out.println("====執行FutureTask線程任務====");
Future<String> futureTask = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("FutureTask執行業務邏輯");
Thread.sleep(2000);
System.out.println("FutureTask業務邏輯執行完畢!");
return "歡迎關注: 一枝花算不算浪漫!";
}
});

System.out.println("====執行主線程任務====");
Thread.sleep(1000);
boolean flag = true;
while(flag){
if(futureTask.isDone() && !futureTask.isCancelled()){
System.out.println("FutureTask異步任務執行結果:" + futureTask.get());
flag = false;
}
}

threadPool.shutdown();
}
}

上面的使用很簡單,submit()內部傳遞的實際上是個Callable接口,我們自己實現其中的call()方法,我們通過futureTask既可以獲取到具體的返回值。


submit()實現原理


submit() 是也是提交任務到線程池,只是它可以獲取任務返回結果,返回結果是通過FutureTask來實現的,先看下ThreadPoolExecutor中代碼實現:


public class ThreadPoolExecutor extends AbstractExecutorService {
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}

public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
}

提交任務還是執行execute()方法,只是task被包裝成了FutureTask ,也就是在excute()中啟動線程後會執行FutureTask.run()方法。


再來具體看下它執行的完整鏈路圖:



上圖可以看到,執行任務並返回執行結果的核心邏輯實在FutureTask中,我們以FutureTask.run/get 兩個方法為突破口,一點點剖析FutureTask的實現原理。


FutureTask源碼初探


先看下FutureTask中部分屬性:



public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

private Callable<V> callable;
private Object outcome;
private volatile Thread runner;
private volatile WaitNode waiters;
}


  1. state



當前task狀態,共有7中類型。
NEW: 當前任務尚未執行
COMPLETING: 當前任務正在結束,尚未完全結束,一種臨界狀態
NORMAL:當前任務正常結束
EXCEPTIONAL: 當前任務執行過程中發生了異常。
CANCELLED: 當前任務被取消
INTERRUPTING: 當前任務中斷中..
INTERRUPTED: 當前任務已中斷




  1. callble



用戶提交任務傳遞的Callable,自定義call方法,實現業務邏輯




  1. outcome



任務結束時,outcome保存執行結果或者異常信息。




  1. runner



當前任務被線程執行期間,保存當前任務的線程對象引用




  1. waiters



因為會有很多線程去get當前任務的結果,所以這裏使用了一種stack數據結構來保存



FutureTask.run()實現原理


我們已經知道在線程池runWorker()中最終會調用到FutureTask.run()方法中,我們就來看下它的執行原理吧:



具體代碼如下:


public class FutureTask<V> implements RunnableFuture<V> {
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
}

首先是判斷FutureTaskstate狀態,必須是NEW才可以繼續執行。


然後通過CAS修改runner引用為當前線程。


接着執行用戶自定義的call()方法,將返回結果設置到result中,result可能為正常返回也可能為異常信息。這裏主要是調用set()/setException()


FutureTask.set()實現原理


set()方法的實現很簡單,直接看下代碼:


public class FutureTask<V> implements RunnableFuture<V> {
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
finishCompletion();
}
}
}

call()返回的數據賦值給全局變量outcome上,然後修改state狀態為NORMAL,最後調用finishCompletion()來做掛起線程的喚醒操作,這個方法等到get()後面再來講解。


FutureTask.get()實現原理



接着看下代碼:


public class FutureTask<V> implements RunnableFuture<V> {
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
}

如果FutureTaskstateNORMAL或者COMPLETING,說明當前任務並沒有執行完成,調用get()方法會被阻塞,具體的阻塞邏輯在awaitDone()方法:


private int awaitDone(boolean timed, long nanos) throws InterruptedException {

final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}

這個方法可以說是FutureTask中最核心的方法了,一步步來分析:


如果timed不為空,這說明指定nanos時間還未返回結果,線程就會退出。


※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益



擁有後台管理系統的網站,將擁有強大的資料管理與更新功能,幫助您隨時新增網站的內容並節省網站開發的成本。




q是一個WaitNode對象,是將當前引用線程封裝在一個stack數據結構中,WaitNode對象屬性如下:


 static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

接着判斷當前線程是否中斷,如果中斷則拋出中斷異常。


下面就進入一輪輪的if... else if...判斷邏輯,我們還是採用分支的方式去分析。


分支一:if (s > COMPLETING) {


此時get()方法已經有結果了,無論是正常返回的結果,還是異常、中斷、取消等,此時直接返回state狀態,然後執行report()方法。


分支二:else if (s == COMPLETING)


條件成立,說明當前任務接近完成狀態,這裏讓當前線程再釋放cpu,進行下一輪搶佔cpu


分支三:else if (q == null)


第一次自旋執行,WaitNode還沒有初始化,初始化q=new WaitNode();


分支四:else if (!queued){


queued代表當前線程是否入棧,如果沒有入棧則進行入棧操作,順便將全局變量waiters指向棧頂元素。


分支五/六:LockSupport.park


如果設置了超時時間,則使用parkNanos來掛起當前線程,否則使用park()


經過這麼一輪自旋循環后,如果執行call()還沒有返回結果,那麼調用get()方法的線程都會被掛起。


被掛起的線程會等待run()返回結果后依次喚醒,具體的執行邏輯在finishCompletion()中。


最終stack結構中數據如下:



FutureTask.finishCompletion()實現原理



具體實現代碼如下:


private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null;
q = next;
}
break;
}
}

done();

callable = null;
}

代碼實現很簡單,看過get()方法后,我們知道所有調用get()方法的線程,在run()還沒有返回結果前,都會保存到一個有WaitNode構成的statck數據結構中,而且每個線程都會被掛起。


這裡是遍歷waiters棧頂元素,然後依次查詢起next節點進行喚醒,喚醒后的節點接着會往後調用report()方法。


FutureTask.report()實現原理



具體代碼如下:


private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

這個方法很簡單,因為執行到了這裏,說明當前state狀態肯定大於COMPLETING,判斷如果是正常返回,那麼返回outcome數據。


如果state是取消狀態,拋出CancellationException異常。


如果狀態都不滿足,則說明執行中出現了差錯,直接拋出ExecutionException


FutureTask.cancel()實現原理



public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

cancel()方法的邏輯很簡單,就是修改state狀態為CANCELLED,然後調用finishCompletion()來喚醒等待的線程。


這裏如果mayInterruptIfRunning,就會先中斷當前線程,然後再去喚醒等待的線程。


總結


FutureTask的實現原理其實很簡單,每個方法基本上都畫了一個簡單的流程圖來方便立即。


後面還打算分享一個BlockingQueue相關的源碼解讀,這樣線程池也可以算是完結了。


在這之前可能會先分享一個SpringCloud常見配置代碼分析、最佳實踐等手冊,方便工作中使用,也是對之前看過的源碼一種總結。敬請期待!
歡迎關注:

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益



擁有後台管理系統的網站,將擁有強大的資料管理與更新功能,幫助您隨時新增網站的內容並節省網站開發的成本。





Orignal From: 線程池續:你必須要知道的線程池submit()實現原理之FutureTask!_台北網頁設計

留言

這個網誌中的熱門文章

旅館疑有臭蟲 北市府稽查找嘸

有民眾抱怨,日前投宿的北市某旅館疑似出現臭蟲,北市觀傳局與衛生局、環保局聯合稽查,但因為沒有發現蟲屍,無法確認該旅館是否真有臭蟲,市府下周將召開專家會議處理該案,市長蔣萬安則允諾會以最高規格防範臭蟲。 北市某旅館傳出疑有臭蟲,議員陳宥丞23日在市政總質詢詢問市府處理進度,並指出法國、澳洲、韓國的臭蟲,起初都現蹤公車或地鐵卻沒被發現,直到大規模爆發,才付出大量社會成本處理,而且一般殺蟲劑無法殺掉臭蟲,北市是否有因應措施? 觀傳局主任祕書蕭君杰表示,21日聯合環保局、衛生局到該旅館稽查,但沒有發現臭蟲,也沒有查到蟲卵跡象,只能檢查現場環境是否符合衛生相關規定,但環保局有指導業者如何針對臭蟲清潔消毒。觀傳局長王秋冬指出,下周會與專家學者召開會議,以最高規格處理此案。 想知道購買電動車哪裡補助最多? 台中電動車 補助資訊懶人包彙整 ;推薦評價好的 iphone維修 中心擁有專業的維修技術團隊,同時聘請資深iphone手機維修專家,現場說明手機問題,快速修理,沒修好不收錢住家的頂樓裝 太陽光電 聽說可發揮隔熱功效一線推薦東陽能源擁有核心技術、產品研發、系統規劃設置、專業團隊的太陽能發電廠商。 網頁設計 一頭霧水該從何著手呢? 回頭車 貨運收費標準宇安交通關係企業,自成立迄今,即秉持著「以誠待人」、「以實處事」的企業信念 台中搬家公司 教你幾個打包小技巧,輕鬆整理裝箱!還在煩惱搬家費用要多少哪?台中大展搬家線上試算搬家費用,從此不再擔心「物品怎麼計費」、「多少車才能裝完」 台中搬家 公司費用怎麼算?擁有20年純熟搬遷經驗,提供免費估價且流程透明更是5星評價的搬家公司好山好水 露營車 漫遊體驗露營車x公路旅行的十一個出遊特色。走到哪、玩到哪,彈性的出遊方案,行程跟出發地也可客製 廣告預算用在刀口上, 台北網頁設計 公司幫您達到更多曝光效益; 電動車補助 衛生局疾病管制科長張惠美表示,現場查到與臭蟲無關的多項衛生缺失,包含未提供從業人員體檢報告、簡易外傷藥品及器材超過有效期、紗窗有破損等,已要求業者2周內改善,將擇期複查,如果複查不合格,將依法裁罰3000元至2萬元罰鍰。 但陳宥丞批評,環保局未公告哪些藥劑能殺死臭蟲,很擔心北市會和韓國、法國一樣,把臭蟲防治交給民間恐造成大規模爆發,且北市內的廢棄傢俱回收廠也有可能成為臭蟲孳生的...

必知必會-存儲器層次結構

相信大家一定都用過各種存儲技術,比如mysql,mongodb,redis,mq等,這些存儲服務性能有非常大的區別,其中之一就是底層使用的存儲設備不同。作為一個程序員,你需要理解存儲器的層次結構,這樣才能對程序的性能差別瞭然於心。今天帶大家了解下計算機系統存儲器的層次結構。 存儲技術 首先了解下什麼是存儲器系統? 實質上就是一個具有不同容量、成本和訪問時間的存儲設備的層次結構。從快到慢依次為:CPU寄存器、高速緩存、主存、磁盤; 這裏給大家介紹一組數據,讓大家有一個更清晰的認識: 如果數據存儲在CPU寄存器,需要0個時鐘周期就能訪問到,存儲在高速緩存中需要4~75個時鐘周期。如果存儲在主存需要上百個周期,而如果存儲在磁盤上,大約需要幾千萬個周期! -- 出自 CSAPP 接下來一起深入了解下計算機系統涉及的幾個存儲設備: 隨機訪問存儲器 隨機訪問存儲器(RAM)分為靜態RAM (SRAM) 和動態RAM(DRAM)。SRAM的速度更快,但也貴很多,一般不會超過幾兆字節,通常用來做告訴緩存存儲器。DRAM就是就是我們常說的主存。 訪問主存 數據流是通過操作系統中的總線的共享电子電路在處理器和DRAM之間來來回回。每次CPU和主存之間的數據傳送都是通過一系列複雜的步驟完成,這些步驟成為總線事務。讀事務是將主存傳送數據到CPU。寫事務從CPU傳送數據到主存。 總線是一組并行的導線,能攜帶地址、數據和控制信號。下圖展示了CPU芯片是如何與主存DRAM連接的。 那麼我們在加載數據和存儲數據時,CPU和主存到底是怎樣交互實現的呢? 首先來看一個基本指令,加載內存數據到CPU寄存器中: movq A,%rax 將地址A的內容加載到寄存器%rax中,這個命令會使CPU芯片上稱為總線接口(bus interface)的電路在總線上發起讀事務,具體分為三個步驟: CPU將地址A放到系統總線上,I/O橋將信號傳遞到內存總線。詳情看下下圖a 主存感覺到內存總線上的地址信號,從內存總線讀地址,從DRAM取出數據字,將其寫到內存總線。I/O橋將內存總線信號翻譯成系統總線信號,沿着系統總線傳遞到CPU總線接口。下圖b CPU感覺到系統總線上的數據,從總線上讀數據,並將數據複製到寄存器%rax...

2016年電動車和插電式混合動力車銷量預計將超過70萬輛

中汽協日前預測,2016年全國電動汽車和插電式混合動力車的銷量預計將超過70萬輛,較2015年的銷量增長一倍。 2015年電動車和插電式混合動力車的合併銷量為331092輛,較2014年增長了340%。其中包括247482輛電動車和83610輛插電式混合動力車,在24萬輛多的電動汽車銷量中,包括146719輛乘用車,另有100763輛為商用車。插電式混合動力車的銷量中,60663輛為乘用車,22947輛為商用車。 根據2015年起草的藍圖,政府計畫到2020年在全國範圍內新建12000個充電站和480枚充電樁。2014年年底,全國共有780個充電站共31000枚充電樁。2015年政府還為27個省市自治區設定了電動車的最低銷量目標。 政府預計,這些措施到位後,自主品牌車企的電動車和插電式混合動力車銷量到2020年可達100萬輛,到2025年可達300萬輛。 本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理 【其他文章推薦】 ※為什麼 USB CONNECTOR 是電子產業重要的元件? ※ 網頁設計 一頭霧水??該從何著手呢? 找到專業技術的 網頁設計公司 ,幫您輕鬆架站! ※想要讓你的商品成為最夯、最多人討論的話題? 網頁設計公司 讓你強力曝光 ※想知道最厲害的 台北網頁設計公司推薦 、 台中網頁設計公司推薦 專業設計師"嚨底家"!! Orignal From: 2016年電動車和插電式混合動力車銷量預計將超過70萬輛