掘金 后端 ( ) • 2024-04-08 14:24

需求

三方厂家推送数据到我们系统,同是也要把我们系统得到的厂家数据推送到另一个业务系统

思路

使用线程池,主线程依旧是厂家推送数据到我们系统,其他线程就去处理同时将厂家推送过来的数据发送到另一个业务系统

实现

1.创建线程池

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component
public class ThreadUtil {
    /**
     * 这一行代码创建了一个线程池实例。
     * 创建了一个固定大小为 20 的线程池,线程的存活时间为 3600 秒(即 1 小时),
     * 使用 ArrayBlockingQueue 作为工作队列,队列的容量为 2000。
     * 该线程池最多同时执行 20 个任务,超过这个数量的任务将会放入队列中等待执行
     * @return
     */
    @Bean(value = "executorService")
    public ExecutorService getExecutorService() {
        return new ThreadPoolExecutor(20, 20, 3600, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2000));
    }
}

代码解释

这一行代码创建了一个线程池实例。 创建了一个固定大小为 20 的线程池,线程的存活时间为 3600 秒(即 1 小时), 使用 ArrayBlockingQueue 作为工作队列,队列的容量为 2000。 该线程池最多同时执行 20 个任务,超过这个数量的任务将会放入队列中等待执行

2.使用线程池

由于我不需要线程执行的结果,所以比较简单,如果要获取结果,可以在结尾处加上futures.add(future);

//多线程
@Autowired
private ExecutorService executorService;

@PostMapping("getSeptic")
public JSONObject saveSeptic(@RequestBody SepticQuery query) {
    log.info("[SepticShareController][saveSeptic]:query:"+ JSONUtil.toJsonStr(query));
    ResultData resultData = new ResultData();
    //判断必传数据
    if (StringUtil.isNullOrEmpty(query.getSBBM()) || StringUtil.isNullOrEmpty(query.getSBLX())
            || StringUtil.isNullOrEmpty(query.getGXSJ())) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("success", false);
        jsonObject.put("code",2);
        jsonObject.put("msg","获取失败");
        log.error("[SepticShareController][saveSeptic]:获取信息失败:"+ jsonObject.toString());
        return jsonObject;
    } else {
        //设置dto参数
        SepticTankInfoDTO septicTankInfoDTO  = new SepticTankInfoDTO();;
        try {
            septicTankInfoDTO.setSectionId(SECTIONID);
            septicTankInfoDTO.setGeoX(query.getJD());
            septicTankInfoDTO.setGeoY(query.getWD());
            septicTankInfoDTO.setSepticTankName(query.getSBMC());
            septicTankInfoDTO.setSepticCode(query.getSBBM());
            septicTankInfoDTO.setSepticTankName(query.getSBMC());
            septicTankInfoDTO.setResponsiblePerson(query.getLXR());
        } catch (Exception e) {
            e.printStackTrace();
            log.error("[SepticShareController][saveSeptic]:error:"+ e.getMessage());
            throw new RuntimeException("参数格式不对");
        }
        //这样做是为了确保在多线程环境下对列表的操作是安全的,即使多个线程同时访问该列表,也不会出现数据不一致或者其他线程安全问题。

            List<SepticQuery> messages = Collections.synchronizedList(new ArrayList<>());
            messages.add(query);
            String url = "http://127.0.0.1:5080/septic/getSeptic";
            Map<String, String> headers = new HashMap<>();
            headers.put("Content-Type", "application/json");

        try {
            //通过线程池 executorService 实现了多线程任务的并行执行。
            // 每个任务都是一个异步操作,通过 executorService.submit() 方法提交给线程池执行。
            executorService.submit(() -> {
                //将获取的数据发送给其他系统里面去

                 try{
                        InfoQueryDTO infoQueryDTO = new InfoQueryDTO();
                        infoQueryDTO.setSbbm(messages.get(0).getSBBM());
                        infoQueryDTO.setSbmc(messages.get(0).getSBMC());
                        infoQueryDTO.setJd(String.valueOf(messages.get(0).getJD()));
                          ...
                        String json = JSON.toJSONString(infoQueryDTO);
                        String val = HttpUtil.createPost(url).addHeaders(headers).body(json).execute().body();
                        com.alibaba.fastjson.JSONObject jsonObject = JSON.parseObject(val);
//                        com.alibaba.fastjson.JSONObject object = JSON.parseObject(jsonObject.get("data").toString());
                    } catch (Exception e) {}

            });
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("多线程处理任务异常!");
        }

        //todo 判断设备编号是否已存在 若已存在 不进行新增
        GUID = query.getSBBM();
        SepticTankInfoVO septicTankInfoVO = septicTankInfoManager.getDetailByGgId(GUID);
        try {
            if (septicTankInfoVO != null) {
                septicTankInfoDTO.setSepticCode(septicTankInfoVO.getSepticCode());
                septicTankInfoDTO.setSepticTankId(septicTankInfoVO.getSepticTankId());
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("设备号已存在,请确认再推送");
        }

        septicTankInfoManager.saveOrUpdate(septicTankInfoDTO);
    }
    JSONObject jsonObject = new JSONObject();
    jsonObject.put("success", true);
    jsonObject.put("code",0);
    jsonObject.put("msg","获取成功");
    System.out.println(jsonObject);
    return jsonObject;
}

线程池内部的源代码分析

我们在项目里使用线程池的时候,通常都会先创建一个具体实现Bean来定义线程池

 @Bean(value = "executorService")
    public ExecutorService getExecutorService() {
        return new ThreadPoolExecutor(20, 20, 3600, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2000));
    }

ThreadPoolExecutor的父类是AbstractExecutorService,

然后AbstractExecutorService的顶层接口是:ExecutorService。

当线程池触发了submit函数的时候,

实际上会调用到父类AbstractExecutorService对象的java.util.concurrent.AbstractExecutorService

#submit(java.lang.Runnable)方法,

然后进入到ThreadPoolExecutor

#execute部分。

java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable) 源代码位置:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

这里面你会看到返回的是一个future对象供调用方判断线程池内部的函数到底是否有完全执行成功。

在jdk8源代码中,提交任务的执行逻辑部分如下所示

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       
        int c = ctl.get();
        //工作线程数小于核心线程的时候,可以填写worker线程
        if (workerCountOf(c) < corePoolSize) {
              //新增工作线程的时候会加锁
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果线程池的状态正常,切任务放入就绪队列正常
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                //如果当前线程池处于关闭状态,则抛出拒绝异常
                reject(command);
            //如果工作线程数超过了核心线程数,那么就需要考虑新增工作线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果新增的工作线程已经达到了最大线程数限制的条件下,需要触发拒绝策略的抛出
        else if (!addWorker(command, false))
            reject(command);
    }

结论

首先判断线程池内部的现场是否都有任务需要执行。

如果不是,则使用一个空闲的工作线程用于任务执行。

否则会判断当前的工作队列是否已经满了,如果没有满则往队列里面投递一个任务,等待线程去处理。

如果工作队列已经满了,此时会根据饱和策略去判断,是否需要创建新的线程还是果断抛出异常等方式来进行处理。