批处理框架 Spring Batch 这么强,你会用吗?
spring batch 简介
spring batch 是 spring 提供的一个数据处理框架。企业域中的许多应用程序需要批量处理才能在关键任务环境中执行业务操作。这些业务运营包括:
- 无需用户交互即可最有效地处理大量信息的自动化,复杂处理。这些操作通常包括基于时间的事件(例如月末计算,通知或通信)。
- 在非常大的数据集中重复处理复杂业务规则的定期应用(例如,保险利益确定或费率调整)。
- 集成从内部和外部系统接收的信息,这些信息通常需要以事务方式格式化,验证和处理到记录系统中。批处理用于每天为企业处理数十亿的交易。
Spring Batch 是一个轻量级,全面的批处理框架,旨在开发对企业系统日常运营至关重要的强大批处理应用程序。Spring Batch 构建了人们期望的 Spring Framework 特性(生产力,基于 POJO 的开发方法和一般易用性),同时使开发人员可以在必要时轻松访问和利用更高级的企业服务。Spring Batch 不是一个 schuedling 的框架。
Spring Batch 提供了可重用的功能,这些功能对于处理大量的数据至关重要,包括记录 / 跟踪,事务管理,作业处理统计,作业重启,跳过和资源管理。它还提供更高级的技术服务和功能,通过优化和分区技术实现极高容量和高性能的批处理作业。
Spring Batch 可用于两种简单的用例(例如将文件读入数据库或运行存储过程)以及复杂的大量用例(例如在数据库之间移动大量数据,转换它等等) 上)。大批量批处理作业可以高度可扩展的方式利用该框架来处理大量信息。
Spring Batch 架构介绍
一个典型的批处理应用程序大致如下:
- 从数据库,文件或队列中读取大量记录。
- 以某种方式处理数据。
- 以修改之后的形式写回数据。
其对应的示意图如下:
spring batch 的一个总体的架构如下:
在 spring batch 中一个 job 可以定义很多的步骤 step,在每一个 step 里面可以定义其专属的 ItemReader 用于读取数据,ItemProcesseor 用于处理数据,ItemWriter 用于写数据,而每一个定义的 job 则都在 JobRepository 里面,我们可以通过 JobLauncher 来启动某一个 job。
Spring Batch 核心概念介绍
下面是一些概念是 Spring batch 框架中的核心概念。
什么是 Job
Job 和 Step 是 spring batch 执行批处理任务最为核心的两个概念。
其中 Job 是一个封装整个批处理过程的一个概念。Job 在 spring batch 的体系当中只是一个最顶层的一个抽象概念,体现在代码当中则它只是一个最上层的接口,其代码如下:
/**
* Batch domain object representing a job. Job is an explicit abstraction
* representing the configuration of a job specified by a developer. It should
* be noted that restart policy is applied to the job as a whole and not to a
* step.
*/
public interface Job {
String getName();
boolean isRestartable();
void execute(JobExecution execution);
JobParametersIncrementer getJobParametersIncrementer();
JobParametersValidator getJobParametersValidator();
}
在 Job 这个接口当中定义了五个方法,它的实现类主要有两种类型的 job,一个是 simplejob,另一个是 flowjob。在 spring batch 当中,job 是最顶层的抽象,除 job 之外我们还有 JobInstance 以及 JobExecution 这两个更加底层的抽象。
一个 job 是我们运行的基本单位,它内部由 step 组成。job 本质上可以看成 step 的一个容器。一个 job 可以按照指定的逻辑顺序组合 step,并提供了我们给所有 step 设置相同属性的方法,例如一些事件监听,跳过策略。
Spring Batch 以 SimpleJob 类的形式提供了 Job 接口的默认简单实现,它在 Job 之上创建了一些标准功能。一个使用 java config 的例子代码如下:
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.end()
.build();
}
这个配置的意思是:首先给这个 job 起了一个名字叫 footballJob,接着指定了这个 job 的三个 step,他们分别由方法,playerLoad,gameLoad, playerSummarization 实现。
什么是 JobInstance
我们在上文已经提到了 JobInstance,他是 Job 的更加底层的一个抽象,他的定义如下:
public interface JobInstance {
/**
* Get unique id for this JobInstance.
* @return instance id
*/
public long getInstanceId();
/**
* Get job name.
* @return value of 'id' attribute from <job>
*/
public String getJobName();
}
他的方法很简单,一个是返回 Job 的 id,另一个是返回 Job 的名字。
JobInstance 指的是 job 运行当中,作业执行过程当中的概念。Instance 本就是实例的意思。
比如说现在有一个批处理的 job,它的功能是在一天结束时执行行一次。我们假定这个批处理 job 的名字为’EndOfDay’。在这个情况下,那么每天就会有一个逻辑意义上的 JobInstance, 而我们必须记录 job 的每次运行的情况。
什么是 JobParameters
在上文当中我们提到了,同一个 job 每天运行一次的话,那么每天都有一个 jobIntsance,但他们的 job 定义都是一样的,那么我们怎么来区别一个 job 的不同 jobinstance 了。不妨先做个猜想,虽然 jobinstance 的 job 定义一样,但是他们有的东西就不一样,例如运行时间。
spring batch 中提供的用来标识一个 jobinstance 的东西是:JobParameters。JobParameters 对象包含一组用于启动批处理作业的参数,它可以在运行期间用于识别或甚至用作参考数据。我们假设的运行时间,就可以作为一个 JobParameters。
例如, 我们前面的’EndOfDay’的 job 现在已经有了两个实例,一个产生于 1 月 1 日,另一个产生于 1 月 2 日,那么我们就可以定义两个 JobParameter 对象:一个的参数是 01-01, 另一个的参数是 01-02。因此,识别一个 JobInstance 的方法可以定义为:
因此,我么可以通过 Jobparameter 来操作正确的 JobInstance
什么是 JobExecution
JobExecution 指的是单次尝试运行一个我们定义好的 Job 的代码层面的概念。job 的一次执行可能以失败也可能成功。只有当执行成功完成时,给定的与执行相对应的 JobInstance 才也被视为完成。
还是以前面描述的 EndOfDay 的 job 作为示例,假设第一次运行 01-01-2019 的 JobInstance 结果是失败。那么此时如果使用与第一次运行相同的 Jobparameter 参数(即 01-01-2019)作业参数再次运行,那么就会创建一个对应于之前 jobInstance 的一个新的 JobExecution 实例, JobInstance 仍然只有一个。
JobExecution 的接口定义如下:
public interface JobExecution {
/**
* Get unique id for this JobExecution.
* @return execution id
*/
public long getExecutionId();
/**
* Get job name.
* @return value of 'id' attribute from <job>
*/
public String getJobName();
/**
* Get batch status of this execution.
* @return batch status value.
*/
public BatchStatus getBatchStatus();
/**
* Get time execution entered STARTED status.
* @return date (time)
*/
public Date getStartTime();
/**
* Get time execution entered end status: COMPLETED, STOPPED, FAILED
* @return date (time)
*/
public Date getEndTime();
/**
* Get execution exit status.
* @return exit status.
*/
public String getExitStatus();
/**
* Get time execution was created.
* @return date (time)
*/
public Date getCreateTime();
/**
* Get time execution was last updated updated.
* @return date (time)
*/
public Date getLastUpdatedTime();
/**
* Get job parameters for this execution.
* @return job parameters
*/
public Properties getJobParameters();
}
每一个方法的注释已经解释的很清楚,这里不再多做解释。只提一下 BatchStatus,JobExecution 当中提供了一个方法 getBatchStatus 用于获取一个 job 某一次特地执行的一个状态。BatchStatus 是一个代表 job 状态的枚举类,其定义如下:
public enum BatchStatus {STARTING, STARTED, STOPPING,
STOPPED, FAILED, COMPLETED, ABANDONED }
这些属性对于一个 job 的执行来说是非常关键的信息,并且 spring batch 会将他们持久到数据库当中. 在使用 Spring batch 的过程当中 spring batch 会自动创建一些表用于存储一些 job 相关的信息,用于存储 JobExecution 的表为batch_job_execution, 下面是一个从数据库当中截图的实例:
什么是 Step
每一个 Step 对象都封装了批处理作业的一个独立的阶段。事实上,每一个 Job 本质上都是由一个或多个步骤组成。每一个 step 包含定义和控制实际批处理所需的所有信息。任何特定的内容都由编写 Job 的开发人员自行决定。
一个 step 可以非常简单也可以非常复杂。例如,一个 step 的功能是将文件中的数据加载到数据库中,那么基于现在 spring batch 的支持则几乎不需要写代码。更复杂的 step 可能具有复杂的业务逻辑,这些逻辑作为处理的一部分。
与 Job 一样,Step 具有与 JobExecution 类似的 StepExecution,如下图所示:
什么是 StepExecution
StepExecution 表示一次执行 Step, 每次运行一个 Step 时都会创建一个新的 StepExecution,类似于 JobExecution。但是,某个步骤可能由于其之前的步骤失败而无法执行。且仅当 Step 实际启动时才会创建 StepExecution。
一次 step 执行的实例由 StepExecution 类的对象表示。每个 StepExecution 都包含对其相应步骤的引用以及 JobExecution 和事务相关的数据,例如提交和回滚计数以及开始和结束时间。
此外,每个步骤执行都包含一个 ExecutionContext,其中包含开发人员需要在批处理运行中保留的任何数据,例如重新启动所需的统计信息或状态信息。下面是一个从数据库当中截图的实例:
什么是 ExecutionContext
ExecutionContext 即每一个 StepExecution 的执行环境。它包含一系列的键值对。我们可以用如下代码获取 ExecutionContext
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
什么是 JobRepository
JobRepository 是一个用于将上述 job,step 等概念进行持久化的一个类。它同时给 Job 和 Step 以及下文会提到的 JobLauncher 实现提供 CRUD 操作。
首次启动 Job 时,将从 repository 中获取 JobExecution,并且在执行批处理的过程中,StepExecution 和 JobExecution 将被存储到 repository 当中。
@EnableBatchProcessing注解可以为 JobRepository 提供自动配置。
什么是 JobLauncher
JobLauncher 这个接口的功能非常简单,它是用于启动指定了 JobParameters 的 Job,为什么这里要强调指定了 JobParameter,原因其实我们在前面已经提到了,jobparameter 和 job 一起才能组成一次 job 的执行。下面是代码实例:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
上面 run 方法实现的功能是根据传入的 job 以及 jobparamaters 从 JobRepository 获取一个 JobExecution 并执行 Job。
什么是 Item Reader
ItemReader 是一个读数据的抽象,它的功能是为每一个 Step 提供数据输入。当 ItemReader 以及读完所有数据时,它会返回 null 来告诉后续操作数据已经读完。Spring Batch 为 ItemReader 提供了非常多的有用的实现类,比如 JdbcPagingItemReader,JdbcCursorItemReader 等等。
ItemReader 支持的读入的数据源也是非常丰富的,包括各种类型的数据库,文件,数据流,等等。几乎涵盖了我们的所有场景。
下面是一个 JdbcPagingItemReader 的例子代码:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
JdbcPagingItemReader 必须指定一个 PagingQueryProvider,负责提供 SQL 查询语句来按分页返回数据。
下面是一个 JdbcCursorItemReader 的例子代码:
private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName,
String tenant) {
JdbcCursorItemReader<Map<String, Object>> itemReader = new JdbcCursorItemReader<>();
itemReader.setDataSource(dataSource);
itemReader.setSql("sql here");
itemReader.setRowMapper(new RowMapper());
return itemReader;
}
什么是 Item Writer
既然 ItemReader 是读数据的一个抽象,那么 ItemWriter 自然就是一个写数据的抽象,它是为每一个 step 提供数据写出的功能。写的单位是可以配置的,我们可以一次写一条数据,也可以一次写一个 chunk 的数据,关于 chunk 下文会有专门的介绍。
ItemWriter 对于读入的数据是不能做任何操作的。
Spring Batch 为 ItemWriter 也提供了非常多的有用的实现类,当然我们也可以去实现自己的 writer 功能。
什么是 Item Processor
ItemProcessor 对项目的业务逻辑处理的一个抽象, 当 ItemReader 读取到一条记录之后,ItemWriter 还未写入这条记录之前,I 我们可以借助 temProcessor 提供一个处理业务逻辑的功能,并对数据进行相应操作。如果我们在 ItemProcessor 发现一条数据不应该被写入,可以通过返回 null 来表示。ItemProcessor 和 ItemReader 以及 ItemWriter 可以非常好的结合在一起工作,他们之间的数据传输也非常方便。我们直接使用即可。
chunk 处理流程
spring batch 提供了让我们按照 chunk 处理数据的能力,一个 chunk 的示意图如下:
它的意思就和图示的一样,由于我们一次 batch 的任务可能会有很多的数据读写操作,因此一条一条的处理并向数据库提交的话效率不会很高,因此 spring batch 提供了 chunk 这个概念,我们可以设定一个 chunk size,spring batch 将一条一条处理数据,但不提交到数据库,只有当处理的数据数量达到 chunk size 设定的值得时候,才一起去 commit.
java 的实例定义代码如下:
在上面这个 step 里面,chunk size 被设为了 10,当 ItemReader 读的数据数量达到 10 的时候,这一批次的数据就一起被传到 itemWriter,同时 transaction 被提交。
skip 策略和失败处理
一个 batch 的 job 的 step,可能会处理非常大数量的数据,难免会遇到出错的情况,出错的情况虽出现的概率较小,但是我们不得不考虑这些情况,因为我们做数据迁移最重要的是要保证数据的最终一致性。spring batch 当然也考虑到了这种情况,并且为我们提供了相关的技术支持,请看如下 bean 的配置:
我们需要留意这三个方法,分别是 skipLimit(),skip(),noSkip(),
skipLimit 方法的意思是我们可以设定一个我们允许的这个 step 可以跳过的异常数量,假如我们设定为 10,则当这个 step 运行时,只要出现的异常数目不超过 10,整个 step 都不会 fail。注意,若不设定 skipLimit,则其默认值是 0.
skip 方法我们可以指定我们可以跳过的异常,因为有些异常的出现,我们是可以忽略的。
noSkip 方法的意思则是指出现这个异常我们不想跳过,也就是从 skip 的所以 exception 当中排除这个 exception,从上面的例子来说,也就是跳过所有除 FileNotFoundException 的 exception。
那么对于这个 step 来说,FileNotFoundException 就是一个 fatal 的 exception,抛出这个 exception 的时候 step 就会直接 fail
批处理操作指南
本部分是一些使用 spring batch 时的值得注意的点
批处理原则
- 在构建批处理解决方案时,应考虑以下关键原则和注意事项。
- 批处理体系结构通常会影响体系结构
- 尽可能简化并避免在单批应用程序中构建复杂的逻辑结构
- 保持数据的处理和存储在物理上靠得很近(换句话说,将数据保存在处理过程中)。
- 最大限度地减少系统资源的使用,尤其是 I / O. 在 internal memory 中执行尽可能多的操作。
- 查看应用程序 I / O(分析 SQL 语句)以确保避免不必要的物理 I / O. 特别是,需要寻找以下四个常见缺陷:
- 当数据可以被读取一次并缓存或保存在工作存储中时,读取每个事务的数据。
- 重新读取先前在同一事务中读取数据的事务的数据。
- 导致不必要的表或索引扫描。
- 未在 SQL 语句的 WHERE 子句中指定键值。
- 在批处理运行中不要做两次一样的事情。例如,如果需要数据汇总以用于报告目的,则应该(如果可能)在最初处理数据时递增存储的总计,因此您的报告应用程序不必重新处理相同的数据。
- 在批处理应用程序开始时分配足够的内存,以避免在此过程中进行耗时的重新分配。
- 总是假设数据完整性最差。插入适当的检查和记录验证以维护数据完整性。
- 尽可能实施校验和以进行内部验证。例如,对于一个文件里的数据应该有一个数据条数纪录,告诉文件中的记录总数以及关键字段的汇总。
- 在具有真实数据量的类似生产环境中尽早计划和执行压力测试。
- 在大批量系统中,数据备份可能具有挑战性,特别是如果系统以 24-7 在线的情况运行。数据库备份通常在在线设计中得到很好的处理,但文件备份应该被视为同样重要。如果系统依赖于文件,则文件备份过程不仅应该到位并记录在案,还应定期进行测试。
如何默认不启动 job
在使用 java config 使用 spring batch 的 job 时,如果不做任何配置,项目在启动时就会默认去跑我们定义好的批处理 job。那么如何让项目在启动时不自动去跑 job 呢?
spring batch 的 job 会在项目启动时自动 run,如果我们不想让他在启动时 run 的话,可以在 application.properties 中添加如下属性:
spring.batch.job.enabled=false
在读数据时内存不够
在使用 spring batch 做数据迁移时,发现在 job 启动后,执行到一定时间点时就卡在一个地方不动了,且 log 也不再打印,等待一段时间之后,得到如下错误:
红字的信息为:Resource exhaustion event:the JVM was unable to allocate memory from the heap.
翻译过来的意思就是项目发出了一个资源耗尽的事件,告诉我们 java 虚拟机无法再为堆分配内存。
造成这个错误的原因是: 这个项目里的 batch job 的 reader 是一次性拿回了数据库里的所有数据,并没有进行分页,当这个数据量太大时,就会导致内存不够用。解决的办法有两个:
- 调整 reader 读数据逻辑,按分页读取,但实现上会麻烦一些,且运行效率会下降
- 增大 service 内存
作者:小梁同学jxy
原文链接:https://www.jianshu.com/p/c492e0f23e0b