AsyncTask源码解析

本代码基于sdk4.0

AsyncTask适合在UI线程中使用并且非常简单,这个类允许执行后台操作并且在UI线程发布结果,不必直接操作线程或者handler

AsyncTask被设计为一个Thread和Handler的帮助类,不用构成一个通用线程框架。AsyncTasks理论上是用来进行短时间的操作(最多几秒钟,生命周期一般要与activity或fragment绑定)。如果想使线程长时间执行操作,强烈建议使用java.util.concurrent包比如Executor,ThreadPoolExecutor和FutureTask.

一个异步任务被设想设计为在后台运行的线程而且结果会发布在UI线程。
三种类型 Params,Progress,Result,4个方法onPreExecute,doInBackground,onProgressUpdate
,onPostExecute

一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private class DownloadFilesTask extends AsyncTask<URL, Integer, Long> {
protected Long doInBackground(URL... urls) {
int count = urls.length;
long totalSize = 0;
for (int i = 0; i < count; i++) {
totalSize += Downloader.downloadFile(urls[i]);
publishProgress((int) ((i / (float) count) 100));//发布进度
if (isCancelled()) break;//检查是否应该取消
}
return totalSize;
}

protected void onProgressUpdate(Integer... progress) {
setProgressPercent(progress[0]);
}

protected void onPostExecute(Long result) {
showDialog("Downloaded " + result + " bytes");
}
}

如何开始一个任务

new DownloadFilesTask().execute(url1, url2, url3);

AsyncTask通常的三个类型

  • Params参数类型
  • Progress在后台计算过程中发布的进度单位类型
  • Result后台计算的结果类型

以上三种类型通常都会被在异步任务中使用,通过设置为Void标记为不使用

private class MyTask extends AsyncTask<Void, Void, Void> { … }

四个步骤

  • onPreExecute()}在任务执行之前在UI线程调用,通常为任务做些准备比如显示一个进度条
  • doInBackground在onPreExecute之后立刻在后台线程中运行,参数会被传到这个方法 ,计算结果必须返回,而且最终会传递到最后一步 ,这一步可以使用publishProgress来发布一次或多次进度 ,这些进度值都会被发布到UI线程(在onProgressUpdate这步)
  • onProgressUpdate在pulishProgress之后在UI线程调用,这个方法用来当后台计算还在执行的时候在UI线程展示进度 ,比如可以改变进度条进度或者在一个text域展示日志
  • onPostExecute后台计算完成之后在UI线程调用,后台计算的结果会以参数形式传到这一步

取消一个任务

一个任务可以随时通过调用cancel(boolean)来取消,调用这个方法会是接下来的isCancelled返回true,调用这个方法之后onCancelled(Object)会替代onPostExecute(Object)在doInBackgroud(Object[])返回之后执行。为了确保一个任务能尽快被取消,应该在doInBackgroud(Object[])中周期的检查isCancelled()的值(比如说在循环中)

线程规则

为了使任务更好的工作,必须遵循以下规则

  • AsyncTask必须在UI线程加载,在JELLY_BEAN版本会被自动完成。
  • 任务实例必须在UI线程创建
  • execute必须在UI线程调用
  • 不要手动调用onPreExecute,onPostExecute,doInBackground,onProgressUpdate
  • 任务只能执行一次,否则会抛出一个异常

AsyncTask确保所有的回调都是同步的,在没有显示同步情况下是安全的

执行顺序

一开始AsyncTasks是在一个单独的后台线程中执行,在DONUT版本之后,变为在一个允许多个任务并行运行的线程池中。在HONEYCOMB版本之后,任务会在一个单独的线程中来避免并行执行引起的应用错误

如果确实想要并行的执行,调用executeOnExecutor(java.util.concurrent.Executor, Object[])和THREAD_POOL_EXECUTOR

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//获取cpu数量
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

//池中核心线程数量
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;

//池中最大线程数量
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT*2 + 1;

//提供的默认线程池
public static final Executor THREAD_POOL_EXECUTOR
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);

//最大支持128个长度
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
/*
在同一时间按序执行。对某一个进程来说这个串行时全局的。
*/

public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

SerialExecutor的实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;

public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {//为空的时候才执行下一个,不为空只是插入。
scheduleNext();
}
}

protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
1
2
3
4
5
6
7
8
/*@hide 强制创建静态的handle */
public static void init() {
sHandler.getLooper();
}

private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
private final WorkerRunnable<Params, Result> mWorker;
private final FutureTask<Result> mFuture;

其中sThreadFactory的实现

1
2
3
4
5
6
7
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);

public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
构造函数,必须在UI线程调用
*/

public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);

Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
return postResult(doInBackground(mParams));
}
};

mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occured while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}

WorkerRunnable静态抽象内部类

1
2
3
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
1
2
3
4
5
6
/* 
这个方法依据平台的版本在一个单独后台线程或线程池中安排队列中的任务
*/

public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
1
2
3
4
5
6
7
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = sHandler.obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
1
2
3
4
5
6
7
8
9
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;

AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
1
2
3
4
5
protected abstract Result doInBackground(Params... params);
protected void onPreExecute() {
}
protected void onPostExecute(Result result) {
}
1
2
3
4
5
6
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}

私有静态final类
private static final InternalHandler sHandler = new InternalHandler();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static class InternalHandler extends Handler {
@Override
public void handleMessage(Message msg) {
AsyncTaskResult result = (AsyncTaskResult) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
1
2
3
4
5
6
7
8
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
1
2
3
4
5
6
/*
如果调用了cancel(boolean)这个方法,应该在doInBackground(Object[])中周期性的检查isCancelled()的返回值来尽早结束任务
*/

public final boolean isCancelled() {
return mCancelled.get();
}
1
2
3
4
5
6
7
8
/* 
在cancel(boolean)被调用和doInBackgroud(Object[])完成之后在UI线程执行
默认的实现只是调用了onCancelled()同时忽略了结果。如果自己实现这个方法不要调用super.onCancelled(result)
*/
@SuppressWarnings({"UnusedParameters"})
protected void onCancelled(Result result) {
onCancelled();
}
1
2
3
4
/*应用最好重写onCannelled(object)。这个方法默认是被onCancelled(object)方法调用
*/

protected void onCancelled() {
}
1
2
3
4
5
6
/*  
在publishProgress执行之后在UI线程运行,参数值是从publishProgress中穿过来的
*/

@SuppressWarnings({"UnusedDeclaration"})
protected void onProgressUpdate(Progress... values) {
}
1
2
3
4
5
6
7
8
9
/*
尝试取消任务,如果任务已经完成或已经取消,或因为某些其他的原因不能取消都会导致这次尝试失败。如果成功了而且任务当被调用cancel的时候还没有开始,那么这个任务将不再执行。如果任务已经开始,那么mayInterruptIfRunning这个参数决定在这次停掉任务的尝试中是否使执行任务的线程中断。
调用这个方法的结果将传递给在doInBackground(Object[])返回之后的UI线程中被调用的onCancelled(Object)。调用这个方法保证onPostExecute(Object)不被调用。
调用这个方法之后,应该在doInBackground(Object[])中周期性的检查isCancelled()的返回值来尽早结束任务
*/

public final boolean cancel(boolean mayInterruptIfRunning) {
mCancelled.set(true);
return mFuture.cancel(mayInterruptIfRunning);
}
1
2
3
4
5
6
7
8
/*当后台计算还在运行时在doInBackground中被调用来更新UI线程。每次调用这个方法都会触发UI线程的onProgressUpdate被执行
*/

protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
sHandler.obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
1
2
3
4
5
6
/*  
execute(Object...)的一个方便版本,参数是一个简单的Runnable对象,
*/

public static void execute(Runnable runnable) {
sDefaultExecutor.execute(runnable);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*  
这个方法通常使用THREAD_POOL_EXECUTOR在AsyncTask管理的线程池中并行的运行多个任务,但是也可以使用自定义的Executor
注意:允许多个任务在一个线程池中并行通常不是你想要的,因为他们操作的顺序没有定义。比如如果这些线程用来修改通用的状态(比如由于按钮点击写一个文件),修改的顺序没有保证。
如果不仔细对待这个问题,在最新版本中极少情况下数据将会被一个按顺序的重写,导致晦涩数据丢失和稳定性问题。
所以这些修改最好串行来执行,如果不管平台的版本为保证这个任务是串行的,可以使用SERIAL_EXECUTOR作为参数
必须在UI线程执行
THREAD_POOL_EXECUTOR作为一个方便的全程的线程池对那些松耦合的任务是可用的
*/

public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {//所以只能运行一次呢。那把这个status判断去掉,就可以实现多次执行?
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);

return this;
}
1
2
3
4
5
6
7
/*  
在最需要的时间等待计算完成,然后检索结果
*/

public final Result get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {

return mFuture.get(timeout, unit);
}
1
2
3
public final Result get() throws InterruptedException, ExecutionException {
return mFuture.get();
}

**综上,AsyncTask的流程:在构造函数里初始化一个可以返回结果的future,(future中处理和handle相关的操作)在调用execute时,调用默认的SerialExecutor(静态,该应用公用)在线程池中来线性执行添加的future。调用executeOnExecutor(Executor,Params)可以传入一个Executor来修改线程执行方式。

###之前实现方式

下面是基于sdk1.5 asyncTask的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static final int CORE_POOL_SIZE = 1;
private static final int MAXIMUM_POOL_SIZE = 10;
private static final int KEEP_ALIVE = 10;
private static final BlockingQueue<Runnable> sWorkQueue =
new LinkedBlockingQueue<Runnable>(MAXIMUM_POOL_SIZE);
private static final ThreadPoolExecutor sExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}

mStatus = Status.RUNNING;

onPreExecute();

mWorker.mParams = params;
sExecutor.execute(mFuture);

return this;
}

以下是基于sdk2.2 asyncTask的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static final int CORE_POOL_SIZE = 5;
private static final int MAXIMUM_POOL_SIZE = 128;
private static final int KEEP_ALIVE = 10;
private static final BlockingQueue<Runnable> sWorkQueue =
new LinkedBlockingQueue<Runnable>(10);
private static final ThreadPoolExecutor sExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);

public final AsyncTask<Params, Progress, Result> execute(Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}

mStatus = Status.RUNNING;

onPreExecute();

mWorker.mParams = params;
sExecutor.execute(mFuture);

return this;
}

关于corePoolSize和maximumPoolSize,queue和是否新建线程的关系:

当一个新的任务通过execute(Runnable)提交,如果小于运行着的corePoolSize线程数量,会创建一个新的线程来处理请求,即使其他任务线程是闲置的。
如果大于corePoolSize但是小于运行的maximumPoolSize线程,并且queue满了,就会新建一个线程。

以上都是基于一个Executor来完成的。

所以一开始AsyncTasks是在一个单独的后台线程中执行,在DONUT版本之后,变为在一个允许多个任务并行运行的线程池中。在HONEYCOMB版本之后,任务会在一个单独的线程中。
**