阅读 441

spring schedule 配置多任务动态 cron 【增删启停】

spring schedule 配置多任务动态 cron 【增删启停】

一、背景

之前公司经常会遇到配置定时任务,简单的任务可以直接依赖spring即可。

简单任务直接使用 @scheduled 注解配合@EnableScheduling。

但是如何实现简单的动态cron呢?


开发原则是:

在满足项目需求的情况下,尽量少的依赖其它框架,避免项目过于臃肿和复杂。主要研究spring 自带的schedule。


常见的任务调度方式


单机部署模式

Timer: jdk中自带的一个定时调度类,可以简单的实现按某一频度进行任务执行。提供的功能比较单一,无法实现复杂的调度任务。

ScheduledExecutorService: 也是jdk自带的一个基于线程池设计的定时任务类。其每个调度任务都会分配到线程池中的一个线程执行,所以其任务是并发执行的,互不影响。

Spring Task:Spring提供的一个任务调度工具,支持注解和配置文件形式,支持Cron表达式,使用简单但功能强大。

Quartz:一款功能强大的任务调度器,可以实现较为复杂的调度功能,如每月一号执行、每天凌晨执行、每周五执行等等,还支持分布式调度,就是配置稍显复杂。

分布式集群模式(不多介绍,简单提一下)

Quartz:可以去看看这篇文章Quartz分布式。

elastic-job:当开发的弹性分布式任务调度系统,采用zookeeper实现分布式协调,实现任务高可用以及分片。

xxl-job:是大众点评员工徐雪里于2015年发布的分布式任务调度平台,是一个轻量级分布式任务调度框架。

saturn: 是唯品会提供一个分布式、容错和高可用的作业调度服务框架。

二、本篇说明

springBoot 基础模块 spring-boot-starter-web 已经内置 schedule ,无需引入额外依赖。

先思考几个问题:

1、动态 cron 实现的原理

任务的 【 停止】是基于 future接口 的cancel() 方法。

任务的 【增加、删除、启动】是基于 注册到 类ScheduledTaskRegistrar 的 ScheduledFuture的数量。

涉及核心类:


ScheduledFuture

SchedulingConfigurer

ScheduledTaskRegistrar

2、多任务并行执行配置

spring默认机制对schedule是单线程。


3、如何配置多个任务

好多博文,都是配置一个cron,这让初学者很难受。


4、如何配置任务分组

根据自己业务背景,可根据步骤三,进行改造。


5、如何配置服务启动自启任务。

想要程序启动时首次去加我们设置的task,只需实现 CommandLineRunner 即可。

6、如何从数据库读取配置

这个其实很简单,在实现 ScheduledTaskRegistrar 时,先直接查询我们需要的数据即可。

7、如何优雅的实现我们的代码

这里为了我们多个task实现时,去除臃肿的if else ,使用策略模式去实现我们的task,这里代码里面会具体介绍。


8、如何去触发我们的schedule 【增删启停】

配置好 task任务类,注入到 controller ,通过接口直接调用即可。


三、代码实现

先贴出我的github 代码,下面代码描述不全。


普通多任务动态cron

分组多任务动态cron

1. 普通多任务动态cron 实现

1.1 对应数据库的实体类 TaskEntity

@Data

@AllArgsConstructor

@NoArgsConstructor

public class TaskEntity {

    /**

     * 任务id

     */

    private int taskId;

    /**

     * 任务说明

     */

    private String desc;

    /**

     * cron 表达式

     */

    private String expression;

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

1.2 配置每个任务实现

配置任务接口 TaskService


public interface TaskService {


    void HandlerJob();


    Integer jobId();


}

1

2

3

4

5

6

7

配置任务接口实现 TaskServiceJob1Impl、TaskServiceJob2Impl …


@Service

public class TaskServiceJob1Impl implements TaskService {

    @Override

    public void HandlerJob() {

        System.out.println("------job1 开始执行---------:"+new Date());


        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "    " + Thread.currentThread().getName() + "    任务一启动");

        try {

            Thread.sleep(10000);//任务耗时10秒

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "    " + Thread.currentThread().getName() + "    结束");


    }


    @Override

    public Integer jobId() {

        return 1;

    }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

1.3 配置任务解析器 TaskSolverChooser

注:

这里引入策略模式

为啥要配置 任务解析器选择器:

因为我们实现多个任务时,一个任务对应一个 CronTask,需要在 MyScheduledTask 里面去实现我们每一个方法。

譬如,我们有100个任务就要自定义100个任务实现方法,代码会很臃肿,明显不符合,【开闭原则】,于是这里采用策略模式,解耦我们多个任务业务实现逻辑。


@Slf4j

@Component

public class TaskSolverChooser implements ApplicationContextAware {


    private ApplicationContext applicationContext;


    private Map<Integer, TaskService> chooseMap = new HashMap<>(16);


    /**

     * 拿到spring context 上下文

     */

    @Override

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        this.applicationContext = applicationContext;

    }


    @PostConstruct

    private void registerToTaskSolver(){

        Map<String, TaskService> taskServiceMap = applicationContext.getBeansOfType(TaskService.class);

        for (TaskService value : taskServiceMap.values()) {

            chooseMap.put(value.jobId(), value);

            log.info("task {} 处理器: {} 注册成功",new Object[]{value.jobId(),value});

        }

    }


    /**

     * 获取需要的job

     */

    public TaskService getTask(Integer jobId){

        return chooseMap.get(jobId);

    }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

1.4 配置MyScheduledTask (动态cron核心配置)

说明:

1、配置多线程执行任务

2、配置 刷新 task

3、配置 停止 task

4、配置 执行task 业务逻辑


@Component

public class MyScheduledTask implements SchedulingConfigurer {


    private volatile ScheduledTaskRegistrar registrar;


    private final ConcurrentHashMap<Integer, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();

    private final ConcurrentHashMap<Integer, CronTask> cronTasks = new ConcurrentHashMap<>();


    @Autowired

    private TaskSolverChooser taskSolverChooser;


    @Override

    public void configureTasks(ScheduledTaskRegistrar registrar) {


        //设置20个线程,默认单线程,如果不设置的话,不能同时并发执行任务

        registrar.setScheduler(Executors.newScheduledThreadPool(10));

        this.registrar = registrar;

    }


    /**

     * 修改 cron 需要 调用该方法

     */

    public void refresh(List<TaskEntity> tasks){

        //取消已经删除的策略任务

        Set<Integer> sids = scheduledFutures.keySet();

        for (Integer sid : sids) {

            if(!exists(tasks, sid)){

                scheduledFutures.get(sid).cancel(false);

            }

        }

        for (TaskEntity TaskEntity : tasks) {

            String expression = TaskEntity.getExpression();

            //计划任务表达式为空则跳过

            if(!StringUtils.hasLength(expression)){

                continue;

            }

            //计划任务已存在并且表达式未发生变化则跳过

            if (scheduledFutures.containsKey(TaskEntity.getTaskId())

                    && cronTasks.get(TaskEntity.getTaskId()).getExpression().equals(expression)) {

                continue;

            }

            //如果策略执行时间发生了变化,则取消当前策略的任务

            if(scheduledFutures.containsKey(TaskEntity.getTaskId())){

                scheduledFutures.get(TaskEntity.getTaskId()).cancel(false);

                scheduledFutures.remove(TaskEntity.getTaskId());

                cronTasks.remove(TaskEntity.getTaskId());

            }

            //业务逻辑处理

            CronTask task = cronTask(TaskEntity, expression);



            //执行业务

            ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());

            cronTasks.put(TaskEntity.getTaskId(), task);

            scheduledFutures.put(TaskEntity.getTaskId(), future);

        }

    }


    /**

     * 停止 cron 运行

     */

    public void stop(List<TaskEntity> tasks){

        tasks.forEach(item->{

            if (scheduledFutures.containsKey(item.getTaskId())) {

                // mayInterruptIfRunning设成false话,不允许在线程运行时中断,设成true的话就允许。

                scheduledFutures.get(item.getTaskId()).cancel(false);

                scheduledFutures.remove(item.getTaskId());

            }

        });

    }


    /**

     * 业务逻辑处理

     */

    public CronTask cronTask(TaskEntity TaskEntity, String expression)  {

        return new CronTask(() -> {

                    //每个计划任务实际需要执行的具体业务逻辑

                    //采用策略,模式 ,执行我们的job

                   taskSolverChooser.getTask(TaskEntity.getTaskId()).HandlerJob();

                }, expression);

    }


    private boolean exists(List<TaskEntity> tasks, Integer tid){

        for(TaskEntity TaskEntity:tasks){

            if(TaskEntity.getTaskId() == tid){

                return true;

            }

        }

        return false;

    }


    @PreDestroy

    public void destroy() {

        this.registrar.destroy();

    }


}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

1.5 配置程序启动时首次去加我们设置的task

@Component

public class StartInitTask implements CommandLineRunner {


    @Autowired

    private MyScheduledTask myScheduledTask;


    @Override

    public void run(String... args) throws Exception {

        List<TaskEntity> list = Arrays.asList(

                new TaskEntity(1, "测试1", "0/1 * *  * * ?"),

                new TaskEntity(2, "测试2", "0/1 * *  * * ?")

        );

        myScheduledTask.refresh(list);

    }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

1.6 配置web接口去触发,增删启停

@RestController

public class StartController {


    @Autowired

    private MyScheduledTask scheduledTask;


    @PostMapping(value = "/startOrChangeCron")

    public String changeCron(@RequestBody List<TaskEntity> list){

        if (CollectionUtils.isEmpty(list)) {

            // 这里模拟存在数据库的数据

            list = Arrays.asList(

                    new TaskEntity(1, "测试1","0/1 * *  * * ?") ,

                    new TaskEntity(2, "测试2","0/1 * *  * * ?")

            );

        }

        scheduledTask.refresh(list);

        return "task任务:" + list.toString() + "已经开始运行";

    }


    @PostMapping(value = "/stopCron")

    public String stopCron(@RequestBody List<TaskEntity> list){

        if (CollectionUtils.isEmpty(list)) {

            // 这里模拟将要停止的cron可通过前端传来

            list = Arrays.asList(

                    new TaskEntity(1, "测试1","0/1 * *  * * ?") ,

                    new TaskEntity(2, "测试2","0/1 * *  * * ?")

            );

        }

        scheduledTask.stop(list);

        List<Integer> collect = list.stream().map(TaskEntity::getTaskId).collect(Collectors.toList());

        return "task任务:" + collect.toString() + "已经停止启动";

    }


}


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

2. 普通多任务动态cron 实现

注:

基于反射实现,根据方法全类名,去动态执行方法。

多任务分组配置,根据任务类型进行分组。

譬如:定时任务人员的相关操作,有检测人员离职状态,人员业绩达标,人员考勤…等,对人员定时任务做一个分类,在同一个类里面去实现不同的task。

多任务分组配置,其实跟上面步骤类似,这里不再叙述。

可参考: 分组多任务动态cron


3 测试记录

测试1 项目启动自启

TaskServiceJob1Impl和TaskServiceJob1Impl … 设置 阻塞10s

观察日志时间可发现,已经同时并发执行俩个任务。


测试2 触发 刷新【增、删、启】我们的task,。

其实这里没这么智能,如果需要触发刷新接口,实际上是重新加载我们的task,就是对应触发我们,增加任务任务,删除任务,启动任务。

使用idea插件测试接口


观察日志


测试3 触发 停止接口,停止一个接口。

这里测试略过…


四、总结

其实实现简单的动态配置,以上代码可用,比较简单。

————————————————

版权声明:本文为CSDN博主「十三月tlz」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/qq_38345773/article/details/114764525


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐