不懂线程池你还好意思做开发

不懂线程池你还好意思做开发

文章发布于 2020-12-30 21:41:21,最后更新于 2021-01-13 22:38:50

不懂线程池你还好意思做开发?

微信图片_20210113221103.jpg

一、前言

经过反复debug,最后找出问题,被我自己蠢哭了。

在上次做开发的时候遇到一个问题。当前端上传excel到报表中心服务时,报表中心会将excel存储起来并且在数据库中生成一条数据,同时将生成数据的信息通过RPC/HTTP的方式通知给相对应的微服务,微服务再去根据推送的数据通过RPC/HTTP,下载文档进行后续处理。在实际开发中我写的http回调函数拿到报表中心的数据后进行处理的时候都会抛出异常,表里不存在数据。

微信图片_20210113221140.gif

1.1 问题分析

因为报表中心获取到前端的excel并存储到数据库生成数据再到通知到相对应的微服务,并且返回给前端这是一个完整的事务。当插入数据库的数据生成后,生成的那条插入数据库的信息已经准备好了,但是因为事务未提交,线程也还没结束,那条数据还未真正插入数据库,所以传送给相对于的微服务,微服务到数据库拿数据会发现信息不存在,导致异常发生。

1.2 问题解决

解决办法很简单就是在报表中心将回调通知相应的微服务方法改成异步的。如何改成异步的?就是通过新建一个线程,把异步通知交个新线程去执行。这样就会解决读取不到数据的情况。(报表中心之前是不支持HTTP回调的,只支持RPC,而我的那个微服务未使用RPC)

恰巧这个机缘,让我复习了线程池相关的内容,下面做一个总结。

二、线程池基础知识

​ 线程池就是提前创建好的一些列线程的集合。

2.1 线程池的优势

  • 通过重复使用线程,它能够降低系统资源的消耗。
  • 提高系统响应的速度,当有任务的时候,通过复用已经存在的线程,无需等待新建线程就能立即执行。
  • 方便线程的并非管控,换句话说就是更方便管理线程。
  • 线程池有更强大的功能(后续介绍)

2.2 线程池的主要参数

​ 关于他的主要参数我们通过查看源码,来分析参数。
微信图片_20210113221201.jpg

  • corePoolSize(线程池的基本大小或者叫做核心线程数):当一个线程池收到任务时,如果当前的线程数小于corePoolSize,那就代表存在空闲的线程,那么就会创建一个新的线程来执行任务,直到当线程数大于等于corePoolSize时,才会停止创建新线程。核心线程数就是一直会存在线程池的线程。
  • maximumPoolSize(线程池最大大小):也就是线程池允许的最大线程个数,包括核心线程数和非核心线程数。
  • keepAliveTime(线程存活保持时间):如果线程的线程数大于核心线程数(corePoolSize)时,线程的存活时间如果超过了存活保存时间,那么这个线程就会被销毁。
  • workQueue(任务队列):用于传输和保存等待执行任务的阻塞队列。
  • threadFactory(线程工厂):线程工厂顾名思义就是负责创建线程,他创建的线程有统一的名字格式pool-m-thread-n(m线程池编号,n线程池内线程编号)
  • handler(线程饱和策略):当线程池和队列都满了后,再加入线程就会执行这个策略。

三、java中提供的线程池

微信图片_20210113221224.png

3.1、newCachedThreadPool

​ 创建一个可以无限扩大的线程池,适用于负载较轻,执行短期异步任务。
微信图片_20210113221408.png

3.2、newFixedThreadPool

​ 创建一个固定大小的线程池,负载较重的场景。
微信图片_20210113221508.png

3.3、newSignleThreadExcutor

​ 创建一个单线程的线程池,适用于需要保证任务执行顺序的场景。
微信图片_20210113221538.png

3.4、newScheduledThreadPool

​ 适用于执行延时或周期任务。
微信图片_20210113221605.png

3.5、线程池的提交

​ 线程池的提交有两种方式submit和excutor。他们的区别就是submit提交可以获取返回值,而excutor不行。

3.6、ThreadPoolTaskExecutor和ThreadPoolExecutor的区别


微信图片_20210113221706.jpg

微信图片_20210113221708.jpg
这是他们两个的类关系图

ThreadPoolExecutor

​ 这个类是JDK中的类,通过他的源码我们能开到他是继承Executor的。

public class ThreadPoolExecutor extends AbstractExecutorService 
public abstract class AbstractExecutorService implements ExecutorService 
public interface ExecutorService extends Executor

而Executor我们也不陌生,所有线程的相关类基本上都实现了execute方法,他就是用来执行线程的。

  • Excutor:负责线程的使用和调度的根接口

微信图片_20210113222012.jpg

  • ExcutortService:线程池的主要接口

微信图片_20210113222014.jpg

通过源码,他主要有两个方法一个是submit一个是shutdown。

-ThreadPoolExecutor:就是线程池的主要实现类它的参数在之前介绍过了就不详细介绍了。

  • 在ThreadPoolExcutor中还有一个关键类就是Excutors工具类,它相当于一个工厂负责创建合适的线程池。
ExecutorService executorService1 = Executors.newCachedThreadPool();
ExecutorService executorService2 = Executors.newFixedThreadPool(5);
ExecutorService executorService3 = Executors.newSingleThreadExecutor();
ExecutorService executorService4 = Executors.newScheduledThreadPool(5);
ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(1, 2, 1000, 
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

ThreadPoolTaskExcutor

这是Spring提供的线程池类。可以通过注解的方式属性注入。

@Component
public class ExcuteConfigRpcThreadPool {
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
public void execute(UploadConfigRpcRequest request) {
        ExcuteConfigRpcThread rpcThread = new ExcuteConfigRpcThread(request);
        taskExecutor.execute(rpcThread);
    }
}

他的源码中包含了ThreadPoolExcutor。

微信图片_20210113222018.jpg


四、 如何创建线程

在知道了线程池之后我们再回到本质问题如何使用线程,也就是如何创建线程.

线程的创建有四种方式

  • 继承Thread类,重写run方法,调用start方法执行。
  • 实现Runnable接口,重写run方法。
  • 使用Callable和Future创建线程。
  • 使用线程池框架excutor。

4.1、继承Thread类

​ 继承Thread类并且重写run方法就可以了。

public class threadPool {
public static void main(String[] args)  {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        myThread1 thread1=new myThread1();
        thread1.start();//线程直接执行
        executorService.execute(thread1);//通过线程池执行
          executorService.submit(thread1);
        executorService.shutdown();
    }
public static class myThread1 extends Thread {
@Override
public void run() {
            System.out.println("线程执行");
        }
    }
}

4.2、实现Runnable接口

​ 实现Runnable接口,如果直接执行线程需要创建Thread对象来调用start方法。

public class threadPool {
public static void main(String[] args)  {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        myThread1 thread2=new myThread1();
        Thread thread=new Thread(thread2);
        thread.start();//线程直接执行
        executorService.execute(thread2);//通过线程池执行
          executorService.submit(thread2);
        executorService.shutdown();
    }
public static class myThread2 implements Runnable {
@Override
public void run() {
            System.out.println("线程2执行");
        }
}
}

4.3、使用Callable或者Future

​ 实现Callable接口并重写call方法,创建的进程可以指定返回信息。call方法比run强大许多,不但可以返回值,还可以抛出异常值得一提的是,如果线程是通过Callable来创建的线程池在执行线程的时候只能使用submit就行提交,具体原因上面已经介绍过了。

public class threadPool {
public static void main(String[] args) throws Exception {
try {
           myThread3 thread3=new myThread3();
            FutureTask<String> future=new FutureTask(thread3);
            Thread thread=new Thread(future);
            thread.start();
            System.out.println(future.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
public static class myThread3 implements Callable<String> {
@Override
public String call() throws Exception {
            System.out.println("线程执行");
return "线程执行结果";
        }
    }
}

​ 注意:只有在线程执行了调用get方法才能获取到返回值

使用Excutor创建线程

executorService.execute(new Runnable() {
            @Override
public void run() {
                System.out.println("线程执行");
            }
        });

​ 对于实现callable的线程类比较特殊,可以使用submit来提交通过Future对象接收线程执行的返回值,并且通过get方法获取。

public class threadPool {
public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        List<Future<String>> resultList = new ArrayList<Future<String>>();
for (int i = 0; i < 10; i++) {
            Future<String> future = executorService.submit(new myThread(i));
//将任务执行结果存储到List中
            resultList.add(future);
        }
//遍历任务的结果
for (Future<String> fs : resultList) {
try {
while (!fs.isDone()) ;//Future返回如果没有完成,则一直循环等待,直到Future返回完成
                System.out.println(fs.get());     //打印各个线程(任务)执行的结果
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } finally {
//启动一次顺序关闭,执行以前提交的任务,但不接受新任务
                executorService.shutdown();
            }
        }
    }
}
class myThread implements Callable<String> {
private int id;
public myThread(int id) {
this.id = id;
    }
/**
     * 任务的具体过程,一旦任务传给ExecutorService的submit方法,
     * <p>
     * 则该方法自动在一个线程上执行
     */
public String call() throws Exception {
        System.out.println("call()方法被自动调用!!!" + Thread.currentThread().getName());
//该返回结果将被Future的get方法得到
return "call()方法被自动调用,任务返回的结果是:" + id + "    " + Thread.currentThread().getName();
    }
}

五、线程池相关问答

5.1、线程池为什么使用阻塞队列?

答:如果线程是无限创建的,会导致内存占用过多,内存溢出。并且阻塞队列可以保证任务队列中没有任务时,阻塞获取任务的线程,使线程进入wait状态,释放cpu资源。当有任务来的时候才唤醒相对应的线程,是的线程不至于一直占用cpu。

5.2、线程池有哪几种工作队列?

答:ArrayBlockingQueue(有界队列):基于数组FIFO先进先出的原则对元素排序;LinkedBlockingQueue(无界队列):基于链表的FIFO;SynchronousQueue(同步队列,一个不存储元素的队列,每次插入都要等上一个结束;DelayQueue(延迟队列):周期性的执行;PriorityBlockingQueue(优先队列)。

5.3、为什么不建议使用 Executors静态工厂构建线程池?

答:阿里巴巴Java开发手册,明确指出不允许使用Executors静态工厂构建线程池 原因如下:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

Executors返回的线程池对象的弊端如下:

1:FixedThreadPool 和 SingleThreadPool: 允许的请求队列(底层实现是LinkedBlockingQueue)长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

2:CachedThreadPool 和 ScheduledThreadPool 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

5.4、线程池的拒绝策略有哪些?

答: AbortPolicy:该策略是线程池默认策略;如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。

​ DiscardPolicy:如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。

​ DiscarOldestPolicy:如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。

​ CallRunsPolicy:如果添加到线程池失败,那么主线程会自己取执行该任务,不会等待线程池的线程取执行。

自定义策略。


微信图片_20210113222024.gif

img

我觉得到这里,关于线程池的面试相关问题也基本上全部拿下了。

最后贴上部分代码做个ending

      //文件上传报表中心后的回调函数,http的方式通知相对于的微服务
@Transactional(value=HibernateConstants.TRANSACTIONAL_ORACLE_MASTER_BEAN_NAME,readOnly=false)
//通过注解开启事务
void handleByNoFlow(final AddUploadBatchRpcRequest request,

 final UploadConfig config, final String batchId){
      //主要的回调方法
     saveBatch(batchId, request, config);
     ...//此处省略部分业务代码
     //保存数据到数据库,其中batchId是数据的主键
      //异步回调
     excuteConfigHttpThreadPool.execute(config.getName(),

config.getNotifyNode(),sendMessage,HttpRequestContextType.JSON);
    //excuteConfigHttpThreadPool是自定义类
  
  }
}
//自定义线程池
public class ExcuteConfigHttpThreadPool {
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    //这是类中的线程池
    public void execute(String name, 
String notifyNode, JSONObject sendMessage, HttpRequestContextType json) {
        ExcuteConfigHttpThread httpThread =
 new ExcuteConfigHttpThread(name,notifyNode,
sendMessage,json);
        //httpThread 是自定义线程

        taskExecutor.execute( httpThread);
    }
}
//自定义线程
public class ExcuteConfigHttpThread implements Runnable {
 @Override
    public void run() {
        //参数校验
       // doVerify();
        try {
            HttpRequestModel requestModel = new HttpRequestModel(this.name);
            //创建http请求
            requestModel.buidHttpPost(this.notifyNode, this.request, this.json);
            //http请求参数添加
          HttpServiceImpl httpService = new HttpServiceImpl();
          HttpResult httpResult = httpService.execute(requestModel);
           ...//省去其他代码
    }
}

我是星宇,一个满头黑发,渴望秃头的开发,我们下期见!

如果发现学术性问题,请及时联系我。

Copyright: 采用 知识共享署名4.0 国际许可协议进行许可

Links: https://mxyblogs.club/archives/不懂线程池你还好意思做开发

Buy me a cup of coffee ☕.