7、定时进行数据批处理任务

一、StopWatch时间控制类:

StopWatch 是spring工具包org.springframework.util下的一个工具类,主要用于计算同步 单线程执行时间。

1、StopWatch优缺点:

优点:

(1)、spring自带工具类,可直接使用;

(2)、代码实现简单,使用更简单;

(3)、统一归纳,展示每项任务耗时与占用总时间的百分比,展示结果直观;

(4)、性能消耗相对较小,并且最大程度的保证了start与stop之间的时间记录的准确性;

(5)、可在start时直接指定任务名字,从而更加直观的显示记录结果;

缺点:

(1)、一个StopWatch实例一次只能开启一个task,不能同时start多个task,并且在该task未stop之前不能start一个新的task,必须在该task stop之后才能开启新的task,若要一次开启多个,需要new不同的StopWatch实例;

(2)、代码侵入式使用,需要改动多处代码;

2、StopWatch模板:

7、定时进行数据批处理任务
public Object stopWatchDemo(){
        StopWatch stopWatch = new StopWatch("StopWatch测量业务逻辑的运行时间");
        stopWatch.start();
        log.info("====================StopWatch计时任务开始========================");

        //业务逻辑1
        //业务逻辑2
        //......

        stopWatch.stop();
        log.info("====================StopWatch计时任务结束========================");
        log.info(stopWatch.prettyPrint());  //打印执行时间
        return null;
    }

二、数据的批处理:

Java中的批处理用于执行一组插入操作或批处理,因为一次又一次地执行单个插入会浪费时间并降低性能。因此,使用批处理可以一次执行多个插入操作,这样可以提高程序的性能。

1、批处理方式一:

    @Autowired
    private SqlSessionFactory sqlSessionFactory;

    /**
     * 批处理插入
     */
    private void upSave1(List<BatchSaveVO> itemList) {
        SqlSession session = null;
        try {
            // 新获取一个模式为BATCH,自动提交为false的session
            // 如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交,可能导致内存溢出
            session = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
            BatchSaveMapper mapper = session.getMapper(BatchSaveMapper.class);
            int batchCount = 1000;// 每批commit的个数
            int batchLastIndex = batchCount;// 每批最后一个的下标
            for (int index = 0; index < itemList.size(); ) {
                if (batchLastIndex >= itemList.size()) {
                    batchLastIndex = itemList.size();
                    log.info("index:{}", batchLastIndex);
                    mapper.batchInsert(itemList.subList(index, batchLastIndex));
                    session.commit();
                    log.info("index:" + index + " batchLastIndex:" + batchLastIndex);
                    log.info("数据插入完毕");
                    break;// 数据插入完毕,退出循环
                } else {
                    mapper.batchInsert(itemList.subList(index, batchLastIndex));
                    session.commit();
                    log.info("index:" + index + " batchLastIndex:" + batchLastIndex);
                    index = batchLastIndex;// 设置下一批下标
                    batchLastIndex = index + (batchCount - 1);
                }
            }
            session.commit();
        } finally {
            session.close();
        }
    }

2、批处理方式二:

    @Autowired
    private SqlSessionTemplate sqlSessionTemplate;

    private void upSave2(List<BatchSaveVO> newReports) {
        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        // 如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交,可能导致内存溢出
        SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, true);
        try {
            Optional.ofNullable(newReports).ifPresent(reports -> {
                BatchSaveMapper batchMapper = sqlSession.getMapper(BatchSaveMapper.class);
                batchMapper.batchInsert(newReports);
                sqlSession.commit();
            });
        }catch (Exception e){
            sqlSession.rollback();
            log.error("================保存失败================:{}",e.getMessage());
        }finally {
            sqlSession.close();
        }
    }

7、定时进行数据批处理任务

3、SqlSessionFactory详解:

(1)、介绍:

SqlSessionFactory是MyBatis的关键字,它是单个数据库映射关系经过编译后的内存镜像,SqlSessionFactory对象的实例可以通过SqlSessionFactoryBuilder对象来获得,而SqlSessionFactoryBuildr则可以从XML配置文件或一个预先定制的Configuration的实例构建出SqlSessionFactory的实例每一个MyBatis的应用程序都以一个SqlSessionFactory对象的实例为核心。SqlSessionFactory是线程安全的,他一旦被创建,应该在应用执行期间都存在,在应用运行期间不要重复创建多次,建议使用单例模式,SqlSessionFactory是创建SqlSession的工厂。

(2)、SqlSessionFactory.openSession()方法:

//新获取一个模式为BATCH,自动提交为false的session
// 如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交,可能导致内存溢出
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);

7、定时进行数据批处理任务

(3)、ExecutorType三种SQL执行器模式:

1)、ExecutorType.SIMPLE:相当于JDBC的stmt.execute(sql) 执行完毕即关闭即stmt.close()

2)、ExecutorType.REUSE:相当于JDBC的stmt.execute(sql) 执行完不关闭,而是将stmt存入Map

3)、ExecutorType.BATCH:相当于JDBC语句的stmt.addBatch(sql),即仅将执行SQL加入到批量计划但是不真正执行,所以此时不会执行返回受影响的行数,而只有执行stmt.execteBatch()后才会真正执行sql。

方式

优势

劣势

ExecutorType.SIMPLE

默认执行器, 节约服务器资源

每次都要开关Statement

ExecutorType.REUSE

提升后端接口处理效率

每次一个新sql都缓存,增大JVM内存压力

ExecutorType.BATCH

专门用于更新插入操作,效率最快

对select 不适用,另外特殊要求,比如限制一次execteBatch的数量时需要写过滤器定制

三、定时任务:

1、@Schedule定时任务方式:

在日常开发中比较简单的实现方式就是使用Spring的@Scheduled()注解。但是在修改服务器时间时会导致定时任务不执行情况的发生,解决的办法是当修改服务器时间后,将服务进行重启就可以避免此现象的发生。

(1)、@Schedule详解:

@Schedule详解参考

(2)、@Schedule的使用:

要使用@Scheduled注解,首先需要在相关类上添加@EnableScheduling,启用Spring的计划任务执行功能,这样可以在容器中的任何Spring管理的bean上检测@Scheduled注解,执行计划任务。

@Scheduled(cron = "0 0 1 * * ?")
    public Object batchSaveWay() {
        StopWatch stopWatch = new StopWatch("@Scheduled定时任务");
        stopWatch.start();
        log.info("====================@Scheduled定时任务开始========================");

        //业务逻辑

        stopWatch.stop();
        log.info("====================@Scheduled定时任务结束========================");
        log.info(stopWatch.prettyPrint());
        return null;
    }

2、SchedulingConfigurer接口线程定时任务方式:

@Schedule 注解有一个缺点,其定时的时间不能动态的改变,而基于SchedulingConfigurer 接口的方式可以做到(例如将cron表达式存入数据库,通过修改数据中的cron表达式,动态改变定时任务的执行周期,而不需要重启项目。)

(1)、操作流程:

1)、声明ThreadPoolTaskScheduler配置类:

@Configuration
public class TaskPoolConfig {
    /**
     * ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC。
     * ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。
     *
     * 拒绝策略:
     * (1)、CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用 线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。
     *     但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
     * (2)、AbortPolicy: 丢弃任务,并抛出拒绝执行
     * (3)、RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
     * (4)、DiscardPolicy: 直接丢弃,其他啥都没有
     * (5)、DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
     */

    @Bean
    public ThreadPoolTaskScheduler taskSchedulerExecutor() {
        // 创建一个线程池对象
        final ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        // 定义一个线程池大小
        scheduler.setPoolSize(100);
        // 线程池名的前缀
        scheduler.setThreadNamePrefix("taskExecutor-");
        // 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
        scheduler.setWaitForTasksToCompleteOnShutdown(true);
        // 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
        scheduler.setAwaitTerminationSeconds(60);
        // 线程池对拒绝任务的处理策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
        scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return scheduler;
    }
}

2)、实现SchedulingConfigurer接口:

@EnableScheduling
@Component
@Configuration
public abstract class ThreadSchedulingConfigurerWay implements SchedulingConfigurer {
    @Autowired
    private TaskPoolConfig taskPoolConfig;

    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        scheduledTaskRegistrar.setScheduler(taskPoolConfig.taskSchedulerExecutor());
        scheduledTaskRegistrar.addTriggerTask(new Runnable() {
            @Override
            public void run() {
                // 定时任务要执行的内容
                executionWay();
            }
        }, new Trigger() {
            @Override
            public Date nextExecutionTime(TriggerContext triggerContext) {
                // 定时任务触发,可修改定时任务的执行周期
                String cron = getCron(); //可以将表达式配置在数据库中,动态改变定时的时间
                CronTrigger trigger = new CronTrigger(cron);
                Date nextExecDate = trigger.nextExecutionTime(triggerContext);
                return nextExecDate;
            }
        });
    }

    public abstract String getCron();
    public abstract void executionWay();

}

3)、实现定时线程任务:

任务一:

//定时线程任务一
@Configuration
public class ThreadSchedulingConfigurerWayOneImpl extends ThreadSchedulingConfigurerWay {
    @Override
    public String getCron() {
        String cron = "0 0/1 * * * ?";
        return cron;
    }

    @Override
    public void executionWay() {
        System.out.println("定时线程任务demo1:"
                + LocalDateTime.now()+",线程名称:"+Thread.currentThread().getName()
                + " 线程id:"+Thread.currentThread().getId());
    }
}

任务二:

//定时线程任务二
@Configuration
public class ThreadSchedulingConfigurerWayTwoImpl extends ThreadSchedulingConfigurerWay {
    @Override
    public String getCron() {
        String cron = "0 0/1 * * * ?";
        return cron;
    }

    @Override
    public void executionWay() {
        System.out.println("定时线程任务demo2:"
                + LocalDateTime.now()+",线程名称:"+Thread.currentThread().getName()
                + " 线程id:"+Thread.currentThread().getId());
    }
}

(2)、ThreadPoolTaskExecutor和ThreadPoolTaskScheduler的区别:

相关参考

3、Quartz框架定时任务方式:

Quartz框架定时任务默认都是并发执行的,不会等待上一次任务执行完毕,只要间隔时间到就会执行,如果定时任执行太长,会长时间占用资源,导致其它任务堵塞。

(1)、添加Quartz依赖:

org.springframework.boot
    spring-boot-starter-quartz

(2)、声明Job任务(继承QuartzJobBean抽象类):

@Slf4j
public class QuartzBatchSaveJob extends QuartzJobBean {
    @Autowired
    @Qualifier("QuartzWay")
    private BatchSaveService quartzWay;

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        //定时任务的业务逻辑
        quartzWay.batchSaveWay();
    }
}

(3)、编写业务类:

public Object batchSaveWay() {
        StopWatch stopWatch = new StopWatch("Quartz定时框架启动任务");
        stopWatch.start();
        log.info("====================Quartz定时框架启动任务开始========================");

        //业务逻辑

        stopWatch.stop();
        log.info("====================Quartz定时框架启动任务结束========================");
        log.info(stopWatch.prettyPrint());
        return null;
    }

(4)、将Job任务注册到Spring容器中:

@Configuration
public class QuartzConfig {
    //Quartz定时框架启动任务
    @Bean
    public JobDetail businessJobDetail(){
        return JobBuilder.newJob(QuartzBatchSaveJob.class)
                .withDescription("Quartz定时框架启动任务")
                .withIdentity(QuartzBatchSaveJob.class.getSimpleName())
                .storeDurably().build();
    }
    @Bean
    public Trigger businessJobTrigger() {
        String cron = "0 0 1 * * ?";
        CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cron);
        return TriggerBuilder.newTrigger().forJob(businessJobDetail())
                .withIdentity("QuartzBatchSaveJobTrigger")
                .withSchedule(cronScheduleBuilder).build();
    }
}

搜索

复制

Original: https://www.cnblogs.com/Iven-L/p/16587692.html
Author: 爱文(Iven)
Title: 7、定时进行数据批处理任务

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/575695/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Harbor部署

    harbor 无论是使用Docker-distribution去自建仓库,还是通过官方镜像跑容器的方式去自建仓库,通过前面的演示我们可以发现其是非常的简陋的,还不如直接使用官方的D…

    数据库 2023年6月14日
    087
  • Collections.singletonList方法

    这个方法主要用于只有一个元素的优化, 减少内存分配,无需分配额外的内存,可以从SingletonList内部类看得出来,由于只有一个element,因此可以做到内存分配最小化,相比…

    数据库 2023年6月16日
    082
  • mysql常用语句 3

    1.找出每个部门平均薪水的薪资等级,from后面嵌套子查询第一步是找出每个部门的平均工资。 [En] The first step is to find out the avera…

    数据库 2023年5月24日
    073
  • CMD 命令汇总

    CMD 常用命令 常用命令 作用 D: 切换到 D 盘下 dir 查看当前路径下的全部内容 cd 盘符:\目录1\目录2…… 进入多级目录 cd .. 回退…

    数据库 2023年6月6日
    066
  • ansible-复制模块

    简介:临时的,在ansible中是指需要快速执行的单条命令,并且不需要保存的命令。对于复杂的命令则为 playbook。 1、复制模块 可在终端执行ansible-doc copy…

    数据库 2023年6月14日
    083
  • 5、基于EasyExcel的导入导出

    一、Apach POI处理Excel的方式: 传统Excel操作或者解析都是利用Apach POI进行操作,POI中处理Excel有以下几种方式: 1、HSSFWorkbook: …

    数据库 2023年6月6日
    0117
  • 自动补全、回滚!介绍一款可视化 sql 诊断利器

    Yearning简介 ================= Yearning MYSQL 是一个SQL语句审核平台。提供查询审计,SQL审核等多种功能,支持Mysql,可以在一定程度…

    数据库 2023年6月9日
    0123
  • Redis 哈希Hash底层数据结构

    Redis 底层数据结构 Redis数据库就像是一个哈希表,首先对key进行哈希运算得到哈希值再取模得到一个下标,每个元素是一个节点,节点之间形成链表。这感觉有点像Java中的Ha…

    数据库 2023年6月14日
    099
  • go context详解

    Context通常被称为上下文,在go中,理解为goroutine的运行状态、现场,存在上下层goroutine context的传递,上层goroutine会把context传递…

    数据库 2023年6月9日
    0105
  • leetcode 144. Binary Tree Preorder Traversal 二叉树展开为链表(中等)

    一、题目大意 给你二叉树的根节点 root ,返回它节点值的 前序 遍历。 示例 1: 输入:root = [1,null,2,3]输出:[1,2,3] 示例 2: 输入:root…

    数据库 2023年6月16日
    087
  • 11、ON DUPLICATE KEY UPDATE实现插入更新操作

    一、插入与更新操作: MySQL中,采用ON DUPLICATE KEY UPDATE语句对不存在的数据进行INSERT插入操作,对已存在的数据进行UPDATE更新操作; 总结: …

    数据库 2023年6月6日
    059
  • 第一篇博客

    这是我在博客园的第一篇博客,用来纪念以下,同时也是写博客的试水标记 Original: https://www.cnblogs.com/zht1702/p/15081310.htm…

    数据库 2023年6月14日
    066
  • likeshop搭建商城系统,一步到位

    什么是商城系统?商城系统又称在线商城系统,是一个功能完善的在线购物系统,主要为在线销售和在线购物服务。 一般的商城系统运营模式有B2C单商户商城系统,B2B2C多商户商城系统以及S…

    数据库 2023年6月14日
    0147
  • 史上最全Mysql规范

    1 整体规约 1)【强制】数据库所有对象必须要有注释,包括:表、字段、索引等,并且要保持最新; 1)【强制】默认使用utf8字符集,无乱码风险,除一些需要存储特殊符号的字段,可以采…

    数据库 2023年6月14日
    072
  • 2022CNAS能力验证-存在性

    背景 警方接到报案称有人利用无人机投放非法宣传材料,根据线索找到一处住宅,搜查发现无人机、智能电视机、小型网络设备等电子设备,提取了电子设备数据并对网络设备进行了抓包分析。现委托你…

    数据库 2023年6月11日
    072
  • LeetCode刷题笔记-简单入门题

    分割平衡字符串 在一个 平衡字符串 中,’L’ 和 ‘R’ 字符的数量是相同的。 给你一个平衡字符串 s,请你将它分割成尽可能多的平…

    数据库 2023年6月11日
    092
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球