线程池ExecutorService和CountDownLatch组合使用处理大量数据和统计时间

avatar 2021年11月02日11:29:38 0 104 views

最近在做公司的项目数据迁移,需要把大量的数据查出来做一些逻辑处理,依靠基本是不能完全实现,还是需要我们通过代码来处理。

传统操作是查出所有数据,然后遍历处理。

为了发挥多核CPU的优势,提升处理速度,这里我们使用多线程来处理。

逻辑是,比如有100万条数据,创建一个大小为10的线程池,分页处理,每个线程一次处理1000条数据

代码如下

1、调用类

@Slf4j
@Component
public class OmsOrderHandler {

    // 这里可以考虑手动创建线程池,为了简单直接用newFixedThreadPool
    private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
    public final static int PAGE_SIZE = 1000;
    
    public static final String TABLE_NAME = "OMS_ORDER";


    @Autowired
    private OmsOrderService omsOrderService;

    
    public void handle() {
        long start = System.currentTimeMillis();
        // 查询总数
        Example example = new Example(OmsOrder.class);
        int total;
        try {
            total = omsOrderService.selectCountByExample(example);
        } catch (Exception e) {
            log.error("数据迁移表名:{}查询总数报错", TABLE_NAME, e);
            return;
        }
        int pages = (int) Math.ceil(total * 1.0 / PAGE_SIZE);
        log.info("数据迁移表名:{}任务准备开始,数据量:{}, 页数:{}", TABLE_NAME, total, pages);

        // 定义需要pages个任务
        CountDownLatch countDownLatch = new CountDownLatch(pages);
        // 分页执行,每页作为一个任务
        for (int i = 1; i <= pages; i++) {
            OmsOrderTask omsOrderTask = new OmsOrderTask();
            omsOrderTask.setPageNum(i);
            omsOrderTask.setPageSize(PAGE_SIZE);
            omsOrderTask.setCountDownLatch(countDownLatch);
            executorService.submit(omsOrderTask);
        }

        //等待所有线程完毕
        try {
            countDownLatch.await();
            log.info("数据迁移表名:{}任务迁移完成,数据量:{}, 页数:{},耗时:{}秒", TABLE_NAME, total, pages, (System.currentTimeMillis() - start) / 1000);
        } catch (InterruptedException e) {
            log.error("数据迁移表名:{}任务异常", TABLE_NAME, e);
        }

    }

}

 

2、任务类

@Slf4j
@Data
public class OmsOrderTask implements Runnable {

    private int pageNum;
    private int pageSize;
    private CountDownLatch countDownLatch;

    public static final String TABLE_NAME = "OMS_ORDER";


    @Override
    public void run() {
        long start = System.currentTimeMillis();

        // 获得spring的bean可以通过SpringUtil来获取
        OmsOrderService omsOrderService = SpringUtil.getBean("omsOrderService");
        try {
            log.info("数据迁移表名:{},页码:{},开始", TABLE_NAME, pageNum);
            // 分页查询订单数据
            CommonForm form = new CommonForm();
            form.setPage(pageNum);
            form.setLimit(pageSize);
            Example example = new Example(OmsOrder.class);

            LayPage<OmsOrder> page = omsOrderService.getPage(form, example);
            List<OmsOrder> list = page.findRows();
            for (OmsOrder omsOrder : list) {

                // 做一些逻辑处理 TODO
                // ......
            }

            // 完成1个任务
            if(countDownLatch != null) {
                countDownLatch.countDown();
            }

            log.info("数据迁移表名:{},页码:{},结束,耗时:{}秒", TABLE_NAME, pageNum,  (System.currentTimeMillis()-start)/1000.0);
        } catch (Exception e) {
            log.error("数据迁移表名:{},页码:{},运行失败", TABLE_NAME, pageNum, e);
            countDownLatch.countDown();
        }
    }

 

3、SpringUtil

import java.util.Map;

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import cn.hutool.core.util.ArrayUtil;

/**
 * Spring(Spring boot)工具封装,包括:
 *
 * <pre>
 *     1、Spring IOC容器中的bean对象获取
 * </pre>
 *
 */
@Component
public class SpringUtil implements ApplicationContextAware
{

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
    {
        SpringUtil.applicationContext = applicationContext;
    }

    /**
     * 获取applicationContext
     *
     * @return ApplicationContext
     */
    public static ApplicationContext getApplicationContext()
    {
        return applicationContext;
    }

    //通过name获取 Bean.

    /**
     * 通过name获取 Bean
     *
     * @param <T> Bean类型
     * @param name Bean名称
     * @return Bean
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name)
    {
        return (T) applicationContext.getBean(name);
    }

    /**
     * 通过class获取Bean
     *
     * @param <T> Bean类型
     * @param clazz Bean类
     * @return Bean对象
     */
    public static <T> T getBean(Class<T> clazz)
    {
        return applicationContext.getBean(clazz);
    }

    /**
     * 通过name,以及Clazz返回指定的Bean
     *
     * @param <T> bean类型
     * @param name Bean名称
     * @param clazz bean类型
     * @return Bean对象
     */
    public static <T> T getBean(String name, Class<T> clazz)
    {
        return applicationContext.getBean(name, clazz);
    }

    /**
     * 获取指定类型对应的所有Bean,包括子类
     *
     * @param <T> Bean类型
     * @param type 类、接口,null表示获取所有bean
     * @return 类型对应的bean,key是bean注册的name,value是Bean
     * @since 5.3.3
     */
    public static <T> Map<String, T> getBeansOfType(Class<T> type)
    {
        return applicationContext.getBeansOfType(type);
    }

    /**
     * 获取指定类型对应的Bean名称,包括子类
     * @param type 类、接口,null表示获取所有bean名称
     * @return bean名称
     * @since 5.3.3
     */
    public static String[] getBeanNamesForType(Class<?> type)
    {
        return applicationContext.getBeanNamesForType(type);
    }

    /**
     * 获取配置文件配置项的值
     *
     * @param key 配置项key
     * @return 属性值
     * @since 5.3.3
     */
    public static String getProperty(String key)
    {
        return applicationContext.getEnvironment().getProperty(key);
    }

    /**
     * 获取当前的环境配置,无配置返回null
     *
     * @return 当前的环境配置
     * @since 5.3.3
     */
    public static String[] getActiveProfiles()
    {
        return applicationContext.getEnvironment().getActiveProfiles();
    }

    /**
     * 获取当前的环境配置,当有多个环境配置时,只获取第一个
     *
     * @return 当前的环境配置
     * @since 5.3.3
     */
    public static String getActiveProfile()
    {
        final String[] activeProfiles = getActiveProfiles();
        return ArrayUtil.isNotEmpty(activeProfiles) ? activeProfiles[0] : null;
    }
}

 

4、日志效果图

数据迁移表名:OMS_ORDER任务准备开始,数据量:64823, 页数:65  
数据迁移表名:OMS_ORDER,页码:2,开始  
数据迁移表名:OMS_ORDER,页码:1,开始  
数据迁移表名:OMS_ORDER,页码:4,开始  
数据迁移表名:OMS_ORDER,页码:6,开始  
数据迁移表名:OMS_ORDER,页码:5,开始  
数据迁移表名:OMS_ORDER,页码:3,开始  
数据迁移表名:OMS_ORDER,页码:7,开始  
数据迁移表名:OMS_ORDER,页码:8,开始  
数据迁移表名:OMS_ORDER,页码:10,开始  
数据迁移表名:OMS_ORDER,页码:9,开始  
数据迁移表名:OMS_ORDER,页码:1,结束,耗时:1.269秒  
数据迁移表名:OMS_ORDER,页码:11,开始  
数据迁移表名:OMS_ORDER,页码:7,结束,耗时:1.276秒  
数据迁移表名:OMS_ORDER,页码:12,开始  
数据迁移表名:OMS_ORDER,页码:4,结束,耗时:1.276秒  
数据迁移表名:OMS_ORDER,页码:13,开始  
数据迁移表名:OMS_ORDER,页码:2,结束,耗时:1.279秒  
数据迁移表名:OMS_ORDER,页码:14,开始  
数据迁移表名:OMS_ORDER,页码:3,结束,耗时:1.279秒  
数据迁移表名:OMS_ORDER,页码:15,开始  
数据迁移表名:OMS_ORDER,页码:5,结束,耗时:1.282秒  
数据迁移表名:OMS_ORDER,页码:16,开始  
数据迁移表名:OMS_ORDER,页码:6,结束,耗时:1.282秒  
数据迁移表名:OMS_ORDER,页码:17,开始  
数据迁移表名:OMS_ORDER,页码:8,结束,耗时:1.286秒  
数据迁移表名:OMS_ORDER,页码:18,开始  
数据迁移表名:OMS_ORDER,页码:10,结束,耗时:1.292秒  
数据迁移表名:OMS_ORDER,页码:19,开始  
数据迁移表名:OMS_ORDER,页码:9,结束,耗时:1.295秒  
数据迁移表名:OMS_ORDER,页码:20,开始  
数据迁移表名:OMS_ORDER,页码:11,结束,耗时:1.049秒  
数据迁移表名:OMS_ORDER,页码:21,开始  
数据迁移表名:OMS_ORDER,页码:12,结束,耗时:1.058秒  
数据迁移表名:OMS_ORDER,页码:22,开始  
数据迁移表名:OMS_ORDER,页码:14,结束,耗时:1.056秒  
数据迁移表名:OMS_ORDER,页码:23,开始  
数据迁移表名:OMS_ORDER,页码:13,结束,耗时:1.059秒  
数据迁移表名:OMS_ORDER,页码:24,开始  
数据迁移表名:OMS_ORDER,页码:17,结束,耗时:1.06秒  
数据迁移表名:OMS_ORDER,页码:25,开始  
数据迁移表名:OMS_ORDER,页码:19,结束,耗时:1.051秒  
数据迁移表名:OMS_ORDER,页码:26,开始  
数据迁移表名:OMS_ORDER,页码:15,结束,耗时:1.067秒  
数据迁移表名:OMS_ORDER,页码:27,开始  
数据迁移表名:OMS_ORDER,页码:16,结束,耗时:1.07秒  
数据迁移表名:OMS_ORDER,页码:28,开始  
数据迁移表名:OMS_ORDER,页码:18,结束,耗时:1.066秒  
数据迁移表名:OMS_ORDER,页码:29,开始  
数据迁移表名:OMS_ORDER,页码:20,结束,耗时:1.068秒  
数据迁移表名:OMS_ORDER,页码:30,开始  
数据迁移表名:OMS_ORDER,页码:23,结束,耗时:1.11秒  
数据迁移表名:OMS_ORDER,页码:31,开始  
数据迁移表名:OMS_ORDER,页码:22,结束,耗时:1.111秒  
数据迁移表名:OMS_ORDER,页码:32,开始  
数据迁移表名:OMS_ORDER,页码:21,结束,耗时:1.127秒  
数据迁移表名:OMS_ORDER,页码:33,开始  
数据迁移表名:OMS_ORDER,页码:24,结束,耗时:1.12秒  
数据迁移表名:OMS_ORDER,页码:34,开始  
数据迁移表名:OMS_ORDER,页码:26,结束,耗时:1.119秒  
数据迁移表名:OMS_ORDER,页码:35,开始  
数据迁移表名:OMS_ORDER,页码:25,结束,耗时:1.12秒  
数据迁移表名:OMS_ORDER,页码:36,开始  
数据迁移表名:OMS_ORDER,页码:27,结束,耗时:1.118秒  
数据迁移表名:OMS_ORDER,页码:37,开始  
数据迁移表名:OMS_ORDER,页码:28,结束,耗时:1.12秒  
数据迁移表名:OMS_ORDER,页码:38,开始  
数据迁移表名:OMS_ORDER,页码:29,结束,耗时:1.12秒  
数据迁移表名:OMS_ORDER,页码:39,开始  
数据迁移表名:OMS_ORDER,页码:30,结束,耗时:1.11秒  
数据迁移表名:OMS_ORDER,页码:40,开始  
数据迁移表名:OMS_ORDER,页码:31,结束,耗时:1.22秒  
数据迁移表名:OMS_ORDER,页码:41,开始  
数据迁移表名:OMS_ORDER,页码:32,结束,耗时:1.225秒  
数据迁移表名:OMS_ORDER,页码:42,开始  
数据迁移表名:OMS_ORDER,页码:33,结束,耗时:1.226秒  
数据迁移表名:OMS_ORDER,页码:43,开始  
数据迁移表名:OMS_ORDER,页码:35,结束,耗时:1.216秒  
数据迁移表名:OMS_ORDER,页码:44,开始  
数据迁移表名:OMS_ORDER,页码:34,结束,耗时:1.226秒  
数据迁移表名:OMS_ORDER,页码:45,开始  
数据迁移表名:OMS_ORDER,页码:36,结束,耗时:1.221秒  
数据迁移表名:OMS_ORDER,页码:46,开始  
数据迁移表名:OMS_ORDER,页码:37,结束,耗时:1.223秒  
数据迁移表名:OMS_ORDER,页码:47,开始  
数据迁移表名:OMS_ORDER,页码:38,结束,耗时:1.22秒  
数据迁移表名:OMS_ORDER,页码:48,开始  
数据迁移表名:OMS_ORDER,页码:39,结束,耗时:1.219秒  
数据迁移表名:OMS_ORDER,页码:49,开始  
数据迁移表名:OMS_ORDER,页码:40,结束,耗时:1.227秒  
数据迁移表名:OMS_ORDER,页码:50,开始  
数据迁移表名:OMS_ORDER,页码:41,结束,耗时:1.177秒  
数据迁移表名:OMS_ORDER,页码:51,开始  
数据迁移表名:OMS_ORDER,页码:43,结束,耗时:1.179秒  
数据迁移表名:OMS_ORDER,页码:52,开始  
数据迁移表名:OMS_ORDER,页码:45,结束,耗时:1.177秒  
数据迁移表名:OMS_ORDER,页码:53,开始  
数据迁移表名:OMS_ORDER,页码:42,结束,耗时:1.188秒  
数据迁移表名:OMS_ORDER,页码:54,开始  
数据迁移表名:OMS_ORDER,页码:44,结束,耗时:1.181秒  
数据迁移表名:OMS_ORDER,页码:55,开始  
数据迁移表名:OMS_ORDER,页码:46,结束,耗时:1.184秒  
数据迁移表名:OMS_ORDER,页码:56,开始  
数据迁移表名:OMS_ORDER,页码:47,结束,耗时:1.18秒  
数据迁移表名:OMS_ORDER,页码:57,开始  
数据迁移表名:OMS_ORDER,页码:48,结束,耗时:1.178秒  
数据迁移表名:OMS_ORDER,页码:58,开始  
数据迁移表名:OMS_ORDER,页码:49,结束,耗时:1.184秒  
数据迁移表名:OMS_ORDER,页码:59,开始  
数据迁移表名:OMS_ORDER,页码:50,结束,耗时:1.182秒  
数据迁移表名:OMS_ORDER,页码:60,开始  
数据迁移表名:OMS_ORDER,页码:51,结束,耗时:1.658秒  
数据迁移表名:OMS_ORDER,页码:61,开始  
数据迁移表名:OMS_ORDER,页码:52,结束,耗时:1.652秒  
数据迁移表名:OMS_ORDER,页码:62,开始  
数据迁移表名:OMS_ORDER,页码:53,结束,耗时:1.913秒  
数据迁移表名:OMS_ORDER,页码:63,开始  
数据迁移表名:OMS_ORDER,页码:54,结束,耗时:1.927秒  
数据迁移表名:OMS_ORDER,页码:64,开始  
数据迁移表名:OMS_ORDER,页码:55,结束,耗时:1.933秒  
数据迁移表名:OMS_ORDER,页码:65,开始  
数据迁移表名:OMS_ORDER,页码:56,结束,耗时:1.931秒  
数据迁移表名:OMS_ORDER,页码:57,结束,耗时:1.935秒  
数据迁移表名:OMS_ORDER,页码:58,结束,耗时:1.937秒  
数据迁移表名:OMS_ORDER,页码:59,结束,耗时:1.933秒  
数据迁移表名:OMS_ORDER,页码:60,结束,耗时:1.931秒  
数据迁移表名:OMS_ORDER,页码:61,结束,耗时:0.578秒  
数据迁移表名:OMS_ORDER,页码:62,结束,耗时:0.585秒  
数据迁移表名:OMS_ORDER,页码:65,结束,耗时:0.432秒  
数据迁移表名:OMS_ORDER,页码:63,结束,耗时:0.457秒  
数据迁移表名:OMS_ORDER,页码:64,结束,耗时:0.445秒  
数据迁移表名:OMS_ORDER任务迁移完成,数据量:64823, 页数:65,耗时:10秒  

 

 

 

  • 微信
  • 交流学习,有偿服务
  • weinxin
  • 博客/Java交流群
  • 资源分享,问题解决,技术交流。群号:590480292
  • weinxin
avatar

发表评论

avatar 登录者:匿名
可以匿名评论或者登录后台评论,评论回复后会有邮件通知

  

已通过评论:0   待审核评论数:0