线程池

学习java很久很久了,得有个5年了,但是从来都没有真正的走进java世界,希望从这篇文章开始,把自己对java的点点滴滴都记录下来。
  从java5开始,java就提供了名叫Executor framework的机制,主要是围绕着Executor接口, 它的接口 ExecutorService, 以及实现了这两个接口的ThreadPoolExecutor类来展开,这种机制把线程的执行和创建分离开了,你只需要创建一个线程,然后把线程丢给Executor,让它执行去吧。使用这个机制的另外一个好处是可以使用Callable接口,它类似于Runnable接口,但是有两个不一样的特性。
  • 它的主要方法是call(), 它可以携带一个返回值。
  • 当你发送了一个Callable对象给executor之后,你可以拿到一个实现了Future接口的对象,通过这个对象,你可以控制对象的状态以及Callable对象的结果。

  
  

public Server(){
executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5);
System.out.printf("Server: Task Count: %d\n",executor.
getTaskCount());
}

提交带返回值的任务。
public class FactorialCalculator implements Callable<Integer> {
private Integer number;
public FactorialCalculator(Integer number){
this.number=number;
}
@Override
public Integer call() throws Exception {
int result = 1;
if ((num==0)||(num==1)) {
result=1;
} else {
for (int i=2; i<=number; i++) {
result*=i;
TimeUnit.MILLISECONDS.sleep(20);
}
}
System.out.printf("%s: %d\n",Thread.currentThread().
getName(),result);
return result;
}

public class Main {
public static void main(String[] args) {
ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.
newFixedThreadPool(2);
List<Future<Integer>> resultList=new ArrayList<>();
Random random=new Random();
for (int i=0; i<10; i++){
Integer number= random.nextInt(10);
FactorialCalculator calculator=new
FactorialCalculator(number);
Future<Integer> result=executor.submit(calculator);
resultList.add(result);
}
do {
for (int i=0; i<resultList.size(); i++) {
Future<Integer> result=resultList.get(i);
System.out.printf("Main: Task %d: %s\n",i,result.
isDone());
}
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (executor.getCompletedTaskCount()<resultList.size());
System.out.printf("Main: Results\n");
for (int i=0; i<resultList.size(); i++) {
Future<Integer> result=resultList.get(i);
Integer number=null;
try {
number=result.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.printf("Main: Task %d: %d\n",i,number);
}
executor.shutdown();
}
}

}
View Code
  
  在Future对象里面,我们可以通过result.isDone()方法来判断线程是否计算完毕。
  执行一堆任务,只返回第一个完成的任务。result = executor.invokeAny(taskList);
  执行全部,resultList=executor.invokeAll(taskList);
  推迟执行,executor.awaitTermination(1, TimeUnit.DAYS);
  把任务的执行和结果的处理分开,需要用到CompletionService, CompletionServicei有两个方法,take和poll,poll是如果没有,它就会立刻返回一个null,take是没有的话,会一直等待。
  当线程池调用了关闭之后,它需要等待当前所有进行中的线程结束才会完全关闭,在这个过程当中提交的线程,需要拒绝处理,我们需要实现一个RejectedExecutionHandler,重写它的rejectedExecution方法,然后听过executor的setRejectedExecutionHandler()方法来设置。
  

public class RejectedTaskController implements 
RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor
executor) {
System.out.printf("RejectedTaskController: The task %s has
been rejected\n",r.toString());
System.out.printf("RejectedTaskController: %s\n",executor.
toString());
System.out.printf("RejectedTaskController: Terminating:
%s\n",executor.isTerminating());
System.out.printf("RejectedTaksController: Terminated:
%s\n",executor.isTerminated());
}
View Code
  
  Fork/Join Framework
   Fork/Join Framwork的诞生是为了解决如下图这样的问题的
  
   我们可以利用ForkJoinPool,它是一个特殊的Executors。
ForkJoin 框架主要是有两个操作:
Fork:把一个任务分成几个小任务,然后执行
Join:等待小任务的完成,并生成一个新任务。
这里我把ForkJoin称为刀叉框架,刀叉框架和Executor框架不一样的地方在于work-stealing算法,不同于Executor框架,刀叉框架在等待子任务的完成之前就已经创建并开始运行Join方法,Join方法一直在检测任务是否完成并且开始运行。通过这样的方式,可以很好的利用runtime的优势,提高性能。
这样就有一些限制:
任务只能采用Fork和Join来作为同步机制,而不能采用别的同步机制,如果采用其他的机制,他们在同步操作的时候,不能执行别的任务。比如:当你在刀叉框架里面一个任务sleep的时候,别的正在执行的任务也会停止,直到该线程sleep结束。
刀叉框架的核心是两个类:
(1)ForkJoinPool,它实现了ExecutorService接口和work-stealing算法,通过它可以很好的管理正在运行的任务以及了解任务的信息。
(2)ForkJoinTask,任务的基类,它提供了fork()方法和join()方法来控制任务的状态。不同通常,你会实现他们的两个子类,不带返回结果的RecursiveAction和带返回结果的RecursiveTask。

0 个评论

要回复文章请先登录注册