阅读 79

Spring Batch Step 流程

Step是一个独立封装域对象,包含了所有定义和控制实际处理信息批任务的序列。这是一个比较抽象的描述,因为任意一个Step的内容都是开发者自己编写的Job。一个Step的简单或复杂取决于开发者的意愿。一个简单的Step也许是从本地文件读取数据存入数据库,写很少或基本无需写代码。一个复杂的Step也许有复杂的业务规则(取决于所实现的方式),并作为整个个流程的一部分。


image.png

1.1 ItemReader

最简单的概念, ItemReader 就是一种从各个输入源读取数据,然后提供给后续步骤的方式. 最常见的例子包括:

  • Flat FileFlat File Item Readers 从纯文本文件中读取一行行的数据, 存储数据的纯文本文件通常具有固定的格式, 并且使用某种特殊字符来分隔每条记录中的各个字段(例如逗号,Comma).
  • XML XML ItemReaders 独立地处理XML,包括用于解析、映射和验证对象的技术。还可以对输入数据的XML文件执行XSD schema验证。
  • Database 数据库就是对请求返回结果集的资源,结果集可以被映射转换为需要处理的对象。默认的SQL ItemReaders调用一个 RowMapper 来返回对象, 并跟踪记录当前行,以备有重启的情况, 存储基本统计信息,并提供一些事务增强特性,关于事物将在稍后解释。

ItemReader 是一个通用输入操作的基本接口:

 public interface ItemReader<T> {       
    T read() throws Exception, UnexpectedInputException, ParseException;
 }

read 是ItemReader中最根本的方法; 每次调用它都会返回一个 Item 或 null(如果没有更多item)。每个 item条目, 一般对应文件中的一行(line), 或者对应数据库中的一行(row), 也可以是XML文件中的一个元素(element)。 一般来说, 这些item都可以被映射为一个可用的domain对象(如 Trade, User 等等), 但也不是强制要求(最偷懒的方式,返回一个Map)。

一般约定 ItemReader 接口的实现都是向前型的(forward only). 但如果底层资源是事务性质的(如JMS队列),并且发生回滚(rollback), 那么下一次调用 read 方法有可能会返回和前次逻辑上相等的结果(对象)。值得一提的是, 处理过程中如果没有items, ItemReader 不应该抛出异常。例如,数据库 ItemReader 配置了一条查询语句, 返回结果数为0, 则第一次调用read方法将返回null。

1.2 ItemWriter

ItemWriter 在功能上类似于 ItemReader,但属于相反的操作。 资源仍然需要定位,打开和关闭, 区别就在于在于ItemWriter 执行的是写入操作(write out), 而不是读取。 在使用数据库或队列的情况下,写入操作对应的是插入( insert ),更新( update ),或发送( send )。 序列化输出的格式依赖于每个批处理作业自己的定义。

和 ItemReader 接口类似, ItemWriter 也是个相当通用的接口:

public interface ItemWriter<T> {          
  void write(List<? extends T> items) throws Exception; 
}

类比于ItemReader中的read,write方法是ItemWriter 接口的根本方法; 只要传入的items列表是打开的,那么它就会尝试着将其写入(write out)。 因为一般来说,items 将要被批量写入到一起,然后再输出, 所以 write 方法接受一个List 参数,而不是单个对象(item)。list输出后,在write方法返回(return)之前,对缓冲执行刷出(flush)操作是很必要的。例如,如果使用Hibernate DAO时,对每个对象要调用一次DAO写操作, 操作完成之后, 方法 return 之前,writer就应该关闭hibernate的Session会话。

1.3 ItemProcessor

ItemReader 和 ItemWriter 接口对于每个任务来说都是非常必要的, 但如果想要在写出数据之前执行某些业务逻辑操作时要怎么办呢? 一个选择是对读取(reading)和写入(writing)使用组合模式(composite pattern): 创建一个 ItemWriter 的子类实现, 内部包含另一个 ItemWriter 对象的引用(对于 ItemReader 也是类似的). 示例如下:

public class CompositeItemWriter<T> implements ItemWriter<T> {         
  ItemWriter<T> itemWriter;        
  public CompositeItemWriter(ItemWriter<T> itemWriter) {            
    this.itemWriter = itemWriter;       
  }        
  public void write(List<? extends T> items) throws Exception {            
    // ... 此处可以执行某些业务逻辑            
    itemWriter.write(item);       
  }        
  public void setDelegate(ItemWriter<T> itemWriter){            
    this.itemWriter = itemWriter;       
  }
}

上面的类中包含了另一个ItemWriter引用,通过代理它来实现某些业务逻辑。 这种模式对于 ItemReader 也是一样的道理, 但也可能持有内部 ItemReader 所拥有的多个数据输入对象的引用。 在ItemWriter中如果我们想要自己控制 write 的调用也可能需要持有其他引用。

但假如我们只想在对象实际被写入之前 “改造” 一下传入的item, 就没必要实现ItemWriter和执行 write 操作: 我们只需要这个将被修改的item对象而已。 对于这种情况, Spring Batch提供了 ItemProcessor 接口:

public interface ItemProcessor<I, O> {     
  O process(I item) throws Exception;
}

ItemProcessor非常简单; 传入一个对象,对其进行某些处理/转换,然后返回另一个对象(也可以是同一个)。传入的对象和返回的对象类型可以一样,也可以不一致。关键点在于处理过程中可以执行一些业务逻辑操作,当然这完全取决于开发者怎么实现它。一个ItemProcessor可以被直接关联到某个Step(步骤),例如,假设ItemReader的返回类型是 Foo ,而在写出之前需要将其转换成类型Bar的对象。就可以编写一个ItemProcessor来执行这种转换:

public class Foo {} public class Bar {        
  public Bar(Foo foo) {
  }
} 
public class FooProcessor implements ItemProcessor<Foo,Bar>{       
  public Bar process(Foo foo) throws Exception {         
    //执行某些操作,将 Foo 转换为 Bar对象        
    return new Bar(foo);        
  }
} 
public class BarWriter implements ItemWriter<Bar>{             
  public void write(List<? extends Bar> bars) throws Exception {           
//write bars        
  }
}

在上面的简单示例中,有两个类: Foo和Bar, 以及实现了ItemProcessor接口的FooProcessor类。因为是demo,所以转换很简单, 在实际使用中可能执行转换为任何类型, 响应的操作请读者根据需要自己编写。 BarWriter将被用于写出Bar对象,如果传入其他类型的对象可能会抛出异常。 同样,如果 FooProcessor 传入的参数不是 Foo 也会抛出异常。FooProcessor可以注入到某个Step中:

<job id="ioSampleJob">     
  <step name="step1">         
  <tasklet>           
    <chunk reader="fooReader" processor="fooProcessor" writer="barWriter" commit-interval="2"/>         
  </tasklet>     
  </step>
</job>

1.3.1 Chaining ItemProcessors

在很多情况下执行单个转换就可以了, 但假如想要将多个 ItemProcessors "串联(chain)" 在一起要怎么实现呢? 我们可以使用前面提到的组合模式(composite pattern)来完成。 接着前面单一转换的示例, 我们将Foo转换为Bar,然后再转换为Foobar类型,并执行写出:

public class Foo {
} 
public class Bar {    
  public Bar(Foo foo) {
  }
} 
public class Foobar{    
  public Foobar(Bar bar) {
  }
} 
public class FooProcessor implements ItemProcessor<Foo,Bar>{    
  public Bar process(Foo foo) throws Exception {     
    //Perform simple transformation, convert a Foo to a Bar     
    return new Bar(foo);    
  }
} 
public class BarProcessor implements ItemProcessor<Bar,FooBar>{     
  public FooBar process(Bar bar) throws Exception {     
    return new Foobar(bar);    
  }
} 
public class FoobarWriter implements ItemWriter<FooBar>{     
  public void write(List<? extends FooBar> items) throws Exception {    
    //write items    
  }
}

可以将 FooProcessor 和 BarProcessor “串联”在一起来生成 Foobar 对象,如果用 Java代码表示,那就像下面这样:

CompositeItemProcessor<Foo,Foobar> compositeProcessor = new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();itemProcessors.add(new FooTransformer());
itemProcessors.add(new BarTransformer());
compositeProcessor.setDelegates(itemProcessors);

就和前面的示例类似,复合处理器也可以配置到Step中:

<job id="ioSampleJob">    
  <step name="step1">       
    <tasklet>          
      <chunk reader="fooReader" processor="compositeProcessor" writer="foobarWriter" commit-interval="2"/>      
     </tasklet>   
   </step>
</job>
<bean id="compositeItemProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">     
  <property name="delegates">       
    <list>         
      <bean class="..FooProcessor" />         
      <bean class="..BarProcessor" />       
    </list>     
  </property>
</bean>

1.3.2 Filtering Records

item processor 的典型应用就是在数据传给ItemWriter之前进行过滤(filter out)。 过滤(Filtering)是一种有别于跳过(skipping)的行为; skipping表明某几行记录是无效的,而 filtering 则只是表明某条记录不应该写入(written)。

例如, 某个批处理作业,从一个文件中读取三种不同类型的记录: 准备 insert 的记录、准备 update 的记录,需要 delete 的记录。如果系统中不允许删除记录, 那么我们肯定不希望将 “delete” 类型的记录传递给 ItemWriter。 但因为这些记录又不是损坏的信息(bad records), 我们只想将其过滤掉,而不是跳过。 因此,ItemWriter只会收到 "insert" 和 "update"的记录。

要过滤某条记录, 只需要 ItemProcessor 返回“ null ” 即可. 框架将自动检测结果为“ null ”的情况, 不会将该item 添加到传给ItemWriter的list中。 像往常一样, 在 ItemProcessor 中抛出异常将会导致跳过(skip)。

1.3.3 容错(Fault Tolerance)

当某一个分块回滚时, 读取后已被缓存的那些item可能会被重新处理。 如果一个step被配置为支持容错(通常使用 skip跳过 或retry重试处理),使用的所有 ItemProcessor 都应该实现为幂等的(idempotent)。 通常ItemProcessor对已经处理过的输入数据不执行任何修改, 而只更新需要处理的实例。

1.4 ItemStream

ItemReader 和 ItemWriter 都为各自的目的服务, 但他们之间有一个共同点, 就是都需要与另一个接口配合。 一般来说,作为批处理作业作用域范围的一部分,readers 和 writers 都需要打开(open),关闭(close),并需要某种机制来持久化自身的状态:

public interface ItemStream {       
void open(ExecutionContext executionContext) throws ItemStreamException;       
void update(ExecutionContext executionContext) throws ItemStreamException;       
void close() throws ItemStreamException; }

在描述每种方法之前,我们应该提到ExecutionContext。ItemReader的客户端也应该实现ItemStream,在任何 read 之前调用open以打开需要的文件或数据库连接等资源。实现ItemWriter也有类似的限制/约束,即需要同时实现ItemStream。如之前所述,如果将数据存放在ExecutionContext中,那么它可以在某个时刻用来启动 ItemReader 或 ItemWriter,而不是在初始状态时。对应的, 应该确保在调用open之后的适当位置调用 close 来安全地释放所有分配的资源。调用update主要是为了确保当前持有的所有状态都被加载到所提供的 ExecutionContext中。 update 一般在提交之前调用,以确保当前状态被持久化到数据库之中。

在特殊情况下, ItemStream 的客户端是一个Step(由 Spring Batch Core 决定), 会为每个 StepExecution 创建一个ExecutionContext,以允许用户存储特定部分的执行状态, 一般来说如果同一个JobInstance重启了,则预期它将会在重启后被返回。对于熟悉 Quartz的人来说, 逻辑上非常像是Quartz的JobDataMap。

1.5 委托模式(Delegate Pattern)与注册Step

请注意, CompositeItemWriter是委托模式的一个示例, 这在Spring Batch中很常见的。 委托自身可以实现回调接口StepListener。如果实现了,那么他们就会被当作Job中Step的一部分与 Spring Batch Core 结合使用, 然后他们基本上必定需要手动注册到Step中。

一个 reader, writer, 或 processor,如果实现了 ItemStream / StepListener接口,就会被自动组装到 Step 中。 但因为delegates 并不为 Step 所知, 因此需要被注入(作为listeners监听器或streams流,或两者都可):

<job id="ioSampleJob">      
  <step name="step1">       
    <tasklet>          
      <chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter" commit-interval="2">              
        <streams>                 
          <stream ref="barWriter" />              
        </streams>          
      </chunk>       
    </tasklet>      
  </step>
</job> 
<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">      
  <property name="delegate" ref="barWriter" />
</bean> 
<bean id="barWriter" class="...BarWriter" />

1.6 纯文本平面文件(Flat Files)

最常见的批量数据交换机制是使用纯文本平面文件(flat file)。 XML由统一约定好的标准来定义文件结构(即XSD), 与XML等格式不同, 想要阅读纯文本平面文件必须先了解其组成结构。一般来说,纯文本平面文件分两种类型: 有分隔的类型(Delimited) 与固定长度类型(Fixed Length)。有分隔的文件中各个字段由分隔符进行间隔, 比如英文逗号(,)。而固定长度类型的文件每个字段都有固定的长度。

1.6.1 The FieldSet(字段集)

当在Spring Batch中使用纯文本文件时, 不管是将其作为输入还是输出, 最重要的一个类就是 FieldSet。许多架构和类库会抽象出一些方法/类来辅助你从文件读取数据, 但是这些方法通常返回 String 或者 String[] 数组, 很多时候这确实是些半成品。 而 FieldSet 是Spring Batch中专门用来将文件绑定到字段的抽象。它允许开发者和使用数据库差不多的方式来使用数据输入文件入。 FieldSet 在概念上非常类似于Jdbc的 ResultSet 。 FieldSet 只需要一个参数: 即token数组 String[] 。另外,您还可以配置字段的名称, 然后就可以像使用 ResultSet 一样, 使用 index 或者 name 都可以取得对应的值:

String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);

在 FieldSet 接口可以返回很多类型的对象/数据, 如 Date , long , BigDecimal 等。 FieldSet 最大的优势在于,它对文本输入文件提供了统一的解析。 不是每个批处理作业采用不同的方式进行解析,而一直是一致的, 不论是在处理格式异常引起的错误,还是在进行简单的数据转换。

1.6.2 FlatFileItemReader

平面文件(flat file)是最多包含二维(表格)数据的任意类型的文件。在 Spring Batch 框架中 FlatFileItemReader 类负责读取平面文件, 该类提供了用于读取和解析平面文件的基本功能。FlatFileItemReader 主要依赖两个东西: Resource 和LineMapper。LineMapper接口将在下一节详细讨论。 resource 属性代表一个 Spring Core Resource(Spring核心资源)。关于如何创建这一类 bean 的文档可以参考Spring框架, Chapter Resources。所以本文档就不再深入讲解创建 Resource 对象的细节。但可以找到一个文件系统资源的简单示例,如下所示:

Resource resource = new FileSystemResource("resources/trades.csv");

在复杂的批处理环境中,目录结构通常由EAI基础设施管理, 并且会建立放置区(drop zones),让外部接口将文件从ftp移动到批处理位置, 反之亦然。文件移动工具(File moving utilities)超出了spring batch架构的范畴, 但在批处理作业中包括文件移动步骤这种事情那也是很常见的。 批处理架构只需要知道如何定位需要处理的文件就足够了。Spring Batch 将会从这个起始点开始,将数据传输给数据管道。当然, Spring Integration也提供了很多这一类的服务。

FlatFileItemReader 中的其他属性让你可以进一步指定数据如何解析:

FlatFileItemReader 的属性(Properties):

image

LineMapper

就如同 RowMapper 在底层根据 ResultSet 构造一个 Object 并返回, 平面文件处理过程中也需要将一行 String 转换并构造成Object:

public interface LineMapper<T> {        
  T mapLine(String line, int lineNumber) throws Exception;
}

FlatFileItemReader

基本的约定是, 给定当前行以及和它关联的行号(line number), mapper 应该能够返回一个领域对象。这类似于在 RowMapper中每一行也有一个 line number 相关联, 正如 ResultSet 中的每一行(Row)都有其绑定的 row number。这允许行号能被绑定到生成的领域对象以方便比较(identity comparison)或者更方便进行日志记录。

但与 RowMapper 不同的是, LineMapper 只能取得原始行的String值, 正如上面所说, 给你的是一个半成品。 这行文本值必须先被解析为 FieldSet, 然后才可以映射为一个对象,如下所述。

LineTokenizer

对将每一行输入转换为 FieldSet 这种操作的抽象是很有必要的, 因为可能会有各种平面文件格式需要转换为 FieldSet。在Spring Batch中, 对应的接口是 LineTokenizer:

public interface LineTokenizer {        
  FieldSet tokenize(String line); 
}

使用 LineTokenizer 的约定是, 给定一行输入内容(理论上 String 可以包含多行内容), 返回一个表示该行的 FieldSet 对象。这个FieldSet接着会传递给 FieldSetMapper。Spring Batch 包括以下LineTokenizer实现:

  • DelmitedLineTokenizer 适用于处理使用分隔符(delimiter)来分隔一条数据中各个字段的文件。最常见的分隔符是逗号(comma),但管道或分号也经常使用。
  • FixedLengthTokenizer 适用于记录中的字段都是“固定宽度(fixed width)”的文件。每种记录类型中,每个字段的宽度必须先定义。
  • PatternMatchingCompositeLineTokenizer 通过使用正则模式匹配,来决定对特定的某一行应该使用 LineTokenizers 列表中的哪一个来执行字段拆分。

FieldSetMapper

FieldSetMapper 接口只定义了一个方法, mapFieldSet , 这个方法接收一个 FieldSet 对象,并将其内容映射到一个 object中。根据作业需要, 这个对象可以是自定义的 DTO , 领域对象, 或者是简单数组。FieldSetMapper 与 LineTokenizer 结合使用以将资源文件中的一行数据转化为所需类型的对象:

public interface FieldSetMapper<T> {       
  T mapFieldSet(FieldSet fieldSet); 
}

这和JdbcTemplate中的RowMapper是一样的道理。

DefaultLineMapper

既然读取平面文件的接口已经定义好了,那很明显我们需要执行以下三个步骤:

  1. 从文件中读取一行。
  2. 将读取的字符串传给 LineTokenizer#tokenize() 方法,以获取一个 FieldSet。
  3. 将解析后的 FieldSet 传给 FieldSetMapper ,然后将 ItemReader#read() 方法执行的结果返回给调用者。

上面的两个接口代表了两个不同的任务: 将一行文本转换为 FieldSet, 以及把 FieldSet 映射为一个领域对象。 因为LineTokenizer 的输入对应着 LineMapper 的输入(一行), 并且 FieldSetMapper 的输出对应着 LineMapper 的输出, 所以SpringBatch 提供了一个使用LineTokenizer和FieldSetMapper的默认实现。DefaultLineMapper 就是大多数情况下用户所需要的:

FlatFileItemReader

public class DefaultLineMapper<T> implements LineMapper<T>, InitializingBean {      
  private LineTokenizer tokenizer;      
  private FieldSetMapper<T> fieldSetMapper;      
  public T mapLine(String line, int lineNumber) throws Exception {           
    return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));     
  }      
  public void setLineTokenizer(LineTokenizer tokenizer) {          
    this.tokenizer = tokenizer;     
   }      
  public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {          
    this.fieldSetMapper = fieldSetMapper;  
   }
}

上面的功能由一个默认实现类来提供,而不是 reader 本身内置的(以前版本的框架这样干), 让用户可以更灵活地控制解析过程,特别是需要访问原始行的时候。

文件分隔符读取简单示例

下面的例子用来说明一个实际的领域情景。这个批处理作业将从如下文件中读取 football player(足球运动员) 信息:

ID,lastName,firstName,position,birthYear,debutYear"AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996","AbduRa00,Abdullah,Rabih,rb,1975,1999","AberWa00,Abercrombie,Walter,rb,1959,1982","AbraDa00,Abramowicz,Danny,wr,1945,1967","AdamBo00,Adams,Bob,te,1946,1969","AdamCh00,Adams,Charlie,wr,1979,2003"

该文件的内容将被映射为领域对象 Player:

public class Player implements Serializable {      
  private String ID;      
  private String lastName;      
  private String firstName;      
  private String position;      
  private int birthYear;      
  private int debutYear;      
  public String toString() {      
  return "PLAYER:ID=" + ID + ",Last Name=" + lastName +",First Name=" + firstName + ",Position=" + position +          ",Birth Year=" + birthYear + ",DebutYear=" +debutYear;     
}    
 // setters and getters...
}

为了将FieldSet映射为Player对象, 需要定义一个FieldSetMapper,返回player对象:

protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {    
  public Player mapFieldSet(FieldSet fieldSet) {      
    Player player = new Player();      
    player.setID(fieldSet.readString(0));      
    player.setLastName(fieldSet.readString(1));            
    player.setFirstName(fieldSet.readString(2));          
    player.setPosition(fieldSet.readString(3));     
    player.setBirthYear(fieldSet.readInt(4));      
    player.setDebutYear(fieldSet.readInt(5));      
      return player;   
   }
}

然后就可以通过正确构建一个FlatFileItemReader调用read方法来读取文件:

FlatFileItemReader<Player> itemReader = new FlatFileItemReader<Player>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
//DelimitedLineTokenizer defaults to comma as its 
delimiterLineMapper<Player> lineMapper = new DefaultLineMapper<Player>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();

每调用一次read方法,都会读取文件中的一行,并返回一个新的Player对象。如果到达文件结尾,则会返回null。

根据Name映射 Fields

有一个额外的功能, DelimitedLineTokenizer 和 FixedLengthTokenizer 都支持,在功能上类似于 Jdbc 的 ResultSet。字段的名称可以注入到这些 LineTokenizer 实现以提高映射函数的读取能力。首先, 平面文件中所有字段的列名会注入给tokenizer:

tokenizer.setNames(new String[] {"ID", "lastName","firstName","position","birthYear","debutYear"});

FieldSetMapper 可以像下面这样使用此信息:

public class PlayerMapper implements FieldSetMapper<Player> {     
  public Player mapFieldSet(FieldSet fs) {        
    if(fs == null){        
      return null;        
  }     
  Player player = new Player();     
  player.setID(fs.readString("ID"));     
  player.setLastName(fs.readString("lastName"));     
  player.setFirstName(fs.readString("firstName"));     
  player.setPosition(fs.readString("position"));     
  player.setDebutYear(fs.readInt("debutYear"));     
  player.setBirthYear(fs.readInt("birthYear"));    
   
  return player;    
 }
}

将FieldSet字段映射为Domain Object

很多时候, 创建一个 FieldSetMapper 就跟 JdbcTemplate 里编写 RowMapper 一样繁琐。Spring Batch通过使用JavaBean规范,提供了一个 FieldSetMapper 来自动将字段映射到对应setter的属性域。还是使用足球的例子,
BeanWrapperFieldSetMapper 的配置如下所示:

<bean id="fieldSetMapper" class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">      
  <property name="prototypeBeanName" value="player" />
</bean>
<bean id="player" class="org.springframework.batch.sample.domain.Player" scope="prototype" />

对于 FieldSet 中的每个条目(entry), mapper都会在Player对象的新实例中查找相应的setter (因此,需要指定 prototype scope),和 Spring容器 查找 setter匹配属性名是一样的方式。FieldSet 中每个可用的字段都会被映射, 然后返回组装好的 Player 对象,不需要再手写代码。

Fixed Length File Formats

到这一步,我们讨论了带分隔符的文件, 但实际应用中可能只有一半左右是这种文件。还有很多机构使用固定长度形式的平面文件。固定长度文件的示例如下:

UK21341EAH4121131.11customer1UK21341EAH4221232.11customer2UK21341EAH4321333.11customer3UK21341EAH4421434.11customer4UK21341EAH4521535.11customer5

虽然看起来像是一个很长的字段,但实际上代表了4个分开的字段:

  1. ISIN : 唯一标识符,订购的商品编码 - 占12字符。
  2. Quantity : 订购的商品数量 - 占3字符。
  3. Price : 商品的价格 - 占5字符。
  4. Customer : 订购商品的顾客Id - 占9字符。

配置好 FixedLengthLineTokenizer 以后, 每个字段的长度必须用范围(range)的形式指定:

<bean id="fixedLengthLineTokenizer" class="org.springframework.batch.io.file.transform.FixedLengthTokenizer">    
  <property name="names" value="ISIN,Quantity,Price,Customer" />    
  <property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>

因为 FixedLengthLineTokenizer 使用的也是 LineTokenizer 接口, 所以返回值同样是 FieldSet, 和使用分隔符基本上是一样的。这也就可以使用同样的方式来处理其输出, 例如使用 BeanWrapperFieldSetMapper。

注意:

要支持上面这种范围式的语法需要使用专门的属性编辑器: RangeArrayPropertyEditor , 可以在ApplicationContext 中配置。当然,这个 bean 在批处理命名空间中的 ApplicationContext 里已经自动声明了。

单文件中含有多种类型数据的处理

前面所有的文件读取示例,为简单起见都做了一个关键性假设: 在同一个文件中的所有记录都具有相同的格式。但情况有时候并非如此。其实在一个文件包含不同的格式的记录是很常见的,需要使用不同的拆分方式,映射到不同的对象中。下面是一个文件中的片段,仅作演示:

USER;Smith;Peter;;T;20014539;FLINEA;1044391041ABC037.49G201XX1383.12HLINEB;2134776319DEF422.99M005LI

这个文件中有三种类型的记录, "USER", "LINEA", 以及 "LINEB"。 一行 "USER" 对应一个 User 对象。 "LINEA" 和 "LINEB"对应的都是 Line 对象, 只是 "LINEA" 包含的信息比“LINEB”要多。

ItemReader分别读取每一行, 当然我们必须指定不同的 LineTokenizer 和 FieldSetMapper 以便ItemWriter 能获得到正确的item。 PatternMatchingCompositeLineMapper 就是专门拿来干这个事的, 可以通过模式映射到对应的 LineTokenizer 和FieldSetMapper:

<bean id="orderFileLineMapper" class="org.spr...PatternMatchingCompositeLineMapper">    
  <property name="tokenizers">       
    <map>         
      <entry key="USER*" value-ref="userTokenizer" />         
      <entry key="LINEA*" value-ref="lineATokenizer" />         
      <entry key="LINEB*" value-ref="lineBTokenizer" />       
    </map>    
  </property>    
<property name="fieldSetMappers">       
    <map>        
      <entry key="USER*" value-ref="userFieldSetMapper" />         
      <entry key="LINE*" value-ref="lineFieldSetMapper" />       
    </map>    
</property>
</bean>

在这个示例中, "LINEA" 和 "LINEB" 使用独立的 LineTokenizer,但使用同一个 FieldSetMapper.

PatternMatchingCompositeLineMapper 使用 PatternMatcher 的 match 方法来为每一行选择正确的代理(delegate)。PatternMatcher 支持两个有特殊的意义通配符(wildcard): 问号(“ ? ”, question mark) 将匹配 1 个字符(注意不是0-1次), 而星号(“ * ”,asterisk)将匹配 0 到多个 字符。

请注意,在上面的配置中,所有以星号结尾的 pattern , 使他们变成了行的有效前缀。 PatternMatcher 总是匹配最具体的可能模式, 而不是按配置的顺序从上往下来。所以如果 " LINE* " 和 " LINEA* " 都配置为 pattern, 那么 " LINEA " 将会匹配到" LINEA* ", 而 " LINEB " 将匹配到 " LINE* "。此外,单个星号(“ * ”)可以作为默认匹配所有行的模式,如果该行不匹配其他任何模式的话。

<entry key="*" value-ref="defaultLineTokenizer" />

还有一个 PatternMatchingCompositeLineTokenizer 可用来单独解析。

Flat File 的异常处理

在解析一行时, 可能有很多情况会导致异常被抛出。很多平面文件不是很完整, 或者里面的某些记录格式不正确。许多用户会选择忽略这些错误的行, 只将这个问题记录到日志, 比如原始行,行号。稍后可以人工审查这些日志,也可以由另一个批处理作业来检查。出于这个原因,Spring Batch提供了一系列的异常类: FlatFileParseException ,和 FlatFileFormatException 。

FlatFileParseException 是由 FlatFileItemReader 在读取文件时解析错误而抛出的。 FlatFileFormatException 是由实现了LineTokenizer 接口的类抛出的, 表明在拆分字段时发生了一个更具体的错误。

IncorrectTokenCountException

DelimitedLineTokenizer 和 FixedLengthLineTokenizer 都可以指定列名(column name), 用来创建一个FieldSet。但如果column name 的数量和 拆分时找到的列数目, 则不会创建 FieldSet,只会抛出 IncorrectTokenCountException 异常, 里面包含了 字段的实际数量,还有预期的数量:

tokenizer.setNames(new String[] {"A", "B", "C", "D"});    try {       tokenizer.tokenize("a,b,c");     }catch(IncorrectTokenCountException e){       assertEquals(4, e.getExpectedCount());       assertEquals(3, e.getActualCount());}

因为 tokenizer 配置了4列的名称,但在这个文件中只找到 3 个字段, 所以会抛出 IncorrectTokenCountException 异常。

IncorrectLineLengthException

固定长度格式的文件在解析时有额外的要求, 因为每一列都必须严格遵守其预定义的宽度。如果一行的总长度不等于所有字段宽度之和, 就会抛出一个异常:

tokenizer.setColumns(new Range[] { new Range(1, 5),new Range(6, 10),new Range(11, 15) });try {tokenizer.tokenize("12345");fail("Expected IncorrectLineLengthException");}catch (IncorrectLineLengthException ex) {assertEquals(15, ex.getExpectedLength());assertEquals(5, ex.getActualLength());}

上面配置的范围是: 1-5 , 6-10 , 以及 11-15 , 因此预期的总长度是15。但在这里传入的行的长度是 5 ,所以会导致IncorrectLineLengthException 异常。之所以直接抛出异常, 而不是先去映射第一个字段的原因是为了更早发现处理失败, 而不再调用 FieldSetMapper 来读取第2列。但是呢,有些情况下, 行的长度并不总是固定的。 出于这个原因, 可以通过设置'strict' 属性的值,不验证行的宽度:

tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));

上面示例和前一个几乎完全相同, 只是调用了 tokenizer.setStrict(false) 。这个设置告诉 tokenizer 在对一行进行解析(tokenizing)时不要去管(enforce)行的长度。然后就正确地创建了一个 FieldSet并返回。当然,剩下的值就只会包含空的token值。

1.6.3 FlatFileItemWriter

将数据写入到纯文本文件也必须解决和读取文件时一样的问题。 在事务中,一个 step 必须通过分隔符或采用固定长度的格式将数据写出去.

LineAggregator

与 LineTokenizer 接口的处理方式类似, 写入文件时也需要有某种方式将一条记录的多个字段组织拼接成单个 String,然后再将string写入文件. Spring Batch 对应的接口是 LineAggregator :

public interface LineAggregator<T> {        
  public String aggregate(T item);
 }

接口 LineAggregator 与 LineTokenizer 相互对应. LineTokenizer 接收 String ,处理后返回一个 FieldSet 对象, 而LineAggregator 则是接收一条记录,返回对应的 String.

PassThroughLineAggregator

LineAggregator 接口最基础的实现类是 PassThroughLineAggregator , 这个简单实现仅仅是将接收到的对象调用 toString() 方法的值返回:

public class PassThroughLineAggregator<T> implements LineAggregator<T> {  
     public String aggregate(T item) {        
       return item.toString();     
    }
}

上面的实现对于需要直接转换为string的时候是很管用的,但是 FlatFileItemWriter 的一些优势也是很有必要的,比如 事务,以及支持重启特性等.

简单的文件写入示例

既然已经有了 LineAggregator 接口以及其最基础的实现, PassThroughLineAggregator, 那就可以解释基础的写出流程了:

  1. 将要写出的对象传递给 LineAggregator 以获取一个字符串(String).
  2. 将返回的 String 写入配置指定的文件中.

下面是 FlatFileItemWriter 中对应的代码:

public void write(T item) throws Exception {      
  write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}

简单的配置如下所示:

<bean id="itemWriter" class="org.spr...FlatFileItemWriter">    
  <property name="resource" value="file:target/test-outputs/output.txt" />    
  <property name="lineAggregator">        
    <bean class="org.spr...PassThroughLineAggregator"/>    
  </property>
</bean>

属性提取器 FieldExtractor

上面的示例可以应对最基本的文件写入情景。但使用 FlatFileItemWriter 时可能更多地是需要将某个领域对象写到文件,因此必须转换到单行之中。 在读取文件时,有以下步骤:

  1. 从文件中读取一行.
  2. 将这一行字符串传递给 LineTokenizer#tokenize() 方法, 以获取 FieldSet 对象
  3. 将分词器返回的 FieldSet 传给一个 FieldSetMapper 映射器, 然后将 ItemReader#read() 方法得到的结果 return。

文件的写入也很类似, 但步骤正好相反:

  1. 将要写入的对象传递给 writer
  2. 将领域对象的属性域转换为数组
  3. 将结果数组合并(aggregate)为一行字符串

因为框架没办法知道需要将领域对象的哪些字段写入到文件中,所以就需要有一个 FieldExtractor 来将对象转换为数组:

public interface FieldExtractor<T> {    
  Object[] extract(T item);
}

FieldExtractor 的实现类应该根据传入对象的属性创建一个数组, 稍后使用分隔符将各个元素写入文件,或者作为 field-width line 的一部分.

PassThroughFieldExtractor

在很多时候需要将一个集合(如 array、Collection, FieldSet等)写出到文件。 从集合中“提取”一个数组那真的是非常简单: 直接进行简单转换即可。 因此在这种场合PassThroughFieldExtractor 就派上用场了。应该注意,如果传入的对象不是集合类型的,那么 PassThroughFieldExtractor 将返回一个数组, 其中只包含提取的单个对象。

BeanWrapperFieldExtractor

与文件读取一节中所描述的 BeanWrapperFieldSetMapper 一样, 通常使用配置来指定如何将领域对象转换为一个对象数组是比较好的办法, 而不用自己写个方法来进行转换。BeanWrapperFieldExtractor 就提供了这类功能:

BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<Name>();
extractor.setNames(new String[] { "first", "last", "born" });
String first = "Alan";
String last = "Turing";
int born = 1912;
Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);
assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);

这个 extractor 实现只有一个必需的属性,就是 names , 里面用来存放要映射字段的名字。 就像 BeanWrapperFieldSetMapper 需要字段名称来将 FieldSet 中的 field 映射到对象的 setter 方法一样, BeanWrapperFieldExtractor 需要 names 映射 getter 方法来创建一个对象数组。值得注意的是, names的顺序决定了field在数组中的顺序。

分隔符文件(Delimited File)写入示例

最基础的平面文件格式是将所有字段用分隔符(delimiter)来进行分隔(separated)。这可以通过 DelimitedLineAggregator 来完成。下面的例子把一个表示客户信用额度的领域对象写出:

public class CustomerCredit {     
  private int id;     
  private String name;     
  private BigDecimal credit;     
  //getters and setters removed for clarity
}

因为使用到了领域对象,所以必须提供 FieldExtractor 接口的实现,当然也少不了要使用的分隔符:

<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">     
  <property name="resource" ref="outputResource" />     
  <property name="lineAggregator">         
    <bean class="org.spr...DelimitedLineAggregator">              
      <property name="delimiter" value=","/>              
      <property name="fieldExtractor">                    
      <bean class="org.spr...BeanWrapperFieldExtractor">                    
        <property name="names" value="name,credit"/>                    
      </bean>              
      </property>          
    </bean>      
  </property>
</bean>

在这种情况下, 本章前面提到过的 BeanWrapperFieldExtractor 被用来将 CustomerCredit 中的 name 和 credit 字段转换为一个对象数组, 然后在各个字段之间用逗号分隔写入文件。

固定宽度的(Fixed Width)文件写入示例

平面文件的格式并不是只有采用分隔符这种类型。许多人喜欢对每个字段设置一定的宽度,这样就能区分各个字段了,这种做法通常被称为“固定宽度, fixed width”。 Spring Batch 通过 FormatterLineAggregator 支持这种文件的写入。使用上面描述的CustomerCredit 领域对象, 则可以对它进行如下配置:

<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">      
    <property name="resource" ref="outputResource" />      
    <property name="lineAggregator">          
      <bean class="org.spr...FormatterLineAggregator">               
        <property name="fieldExtractor">                    
          <bean class="org.spr...BeanWrapperFieldExtractor">                        
        <property name="names" value="name,credit" />                    
      </bean>               
  </property>               
  <property name="format" value="%-9s%-2.0f" />         
  </bean>       
  </property>
</bean>

上面的示例大部分看起来是一样的, 只有 format 属性的值不同:

<property name="format" value="%-9s%-2.0f" />

底层实现采用 Java 5 提供的 Formatter 。Java的 Formatter (格式化) 基于C语言的 printf 函数功能。关于如何配置formatter 请参考 Formatter 的javadoc.

处理文件创建(Handling File Creation)

FlatFileItemReader 与文件资源的关系很简单。在初始化 reader 时,如果文件存在则打开, 如果文件不存在那就抛出一个异常
(exception)。

但是文件的写入就没那么简单了。乍一看可能会觉得跟 FlatFileItemWriter 一样简单直接粗暴: 如果文件存在则抛出异常, 如果
不存在则创建文件并开始写入。

但是, 作业的重启有可能会有BUG。 在正常的重启情景中, 约定与前面所想的恰恰相反: 如果文件存在, 则从已知的最后一个
正确位置开始写入, 如果不存在, 则抛出异常。

如果此作业(Job)的文件名每次都是一样的那怎么办? 这时候可能需要删除已存在的文件(重启则不删除)。 因为有这些可能性,
FlatFileItemWriter 有一个属性 shouldDeleteIfExists 。将这个属性设置为 true , 打开 writer 时会将已有的同名文件删除。

1.7 XML Item Readers and Writers

Spring Batch为读取XML映射为Java对象以及将Java对象写为XML记录提供了事务基础。

[注意]XML流的限制 StAX API 被用在其他XML解析引擎不适合批处理请求 I/O 时的情况(DOM方式把整个输入文件加载到内存中, 而SAX方式在解析过程中需要用户提供回调)。

让我们仔细看看在Spring Batch中 XML输入和输出是如何运行的。 首先,有一些不同于文件读取和写入的概念,但在Spring Batch XML处理中是很常见的。在处理XML时, 并不像读取文本文件(FieldSets)时采取分隔符标记逐行读取的方式, 而是假定XML资源是对应于单条记录的文档片段(' fragments ')的集合:

image

图 3.1: XML 输入文件

“ trade ”标签在上面的场景中是根元素“root element”。 在' <trade> '和' </trade> '之间的一切都被认为是一个 文档片段' fragment '。 Spring Batch使用 Object/XML映射(OXM)将 fragments 绑定到对象。 但 Spring Batch 并不依赖某个特定的XML绑定技术。 Spring OXM 委托是最典型的用途, 其为常见的OXM技术提供了统一的抽象。 Spring OXM 依赖是可选的, 如有必要,你也可以自己实现 Spring Batch 的某些接口。 OXM支持的技术间的关系如下图所示:

image

图 3.2: OXM Binding

上面介绍了OXM以及如何使用XML片段来表示记录, 接着让我们仔细了解下 readers 和 writers 。

1.7.1 StaxEventItemReader

StaxEventItemReader 提供了从XML输入流进行记录处理的典型设置。 首先,我们来看一下 StaxEventItemReader能处理的一组XML记录。

<?xml version="1.0" encoding="UTF-8"?>    
<records>       
 <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">            
<isin>XYZ0001</isin>            
<quantity>5</quantity>            
<price>11.39</price>            
<customer>Customer1</customer>        
</trade>        
<trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">            
<isin>XYZ0002</isin>           
 <quantity>2</quantity>            
<price>72.99</price>            
<customer>Customer2c</customer>        
</trade>        
<trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">            
<isin>XYZ0003</isin>            
<quantity>9</quantity>            
<price>99.99</price>            
<customer>Customer3</customer>        
</trade>
</records>

能被处理的XML记录需要满足下列条件:

  • Root Element Name 片段根元素的名称就是要映射的对象。上面的示例代表的是 trade 的值。
  • Resource Spring Resource 代表了需要读取的文件。
  • Unmarshaller Spring OXM提供的Unmarshalling 用于将 XML片段映射为对象.
<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">     
<property name="fragmentRootElementName" value="trade" />     
<property name="resource" value="data/iosample/input/input.xml" />     
<property name="unmarshaller" ref="tradeMarshaller" />
</bean>

请注意,在上面的例子中,我们选用一个 XStreamMarshaller, 里面接受一个id为 aliases 的 map, 将首个entry的 key 值作为文档片段的name(即根元素), 将 value 作为绑定的对象类型。类似于FieldSet, 后面的其他元素映射为对象内部的字段名/值对。在配置文件中,我们可以像下面这样使用Spring配置工具来描述所需的alias:

<bean id="tradeMarshaller" class="org.springframework.oxm.xstream.XStreamMarshaller">     
<property name="aliases">        
<util:map id="aliases">            
<entry key="trade" value="org.springframework.batch.sample.domain.Trade" />       <entry key="price" value="java.math.BigDecimal" />            
<entry key="name" value="java.lang.String" />       
</util:map>     
</property>
</bean>

当 reader 读取到XML资源的一个新片段时(匹配默认的标签名称)。reader 根据这个片段构建一个独立的XML(或至少看起来是这样),并将 document 传给反序列化器(通常是一个Spring OXM Unmarshaller 的包装类)将XML映射为一个Java对象。

总之,这个过程类似于下面的Java代码,其中配置了 Spring的注入功能:

StaxEventItemReader xmlStaxEventItemReader = new StaxEventItemReader()Resource resource = new ByteArrayResource(xmlResource.getBytes())Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(marshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());
boolean hasNext = trueCustomerCredit credit = null;  
while (hasNext) {   
    credit = xmlStaxEventItemReader.read();  
   if (credit == null) {   
      hasNext = false;
    }else {   
    System.out.println(credit);
  }
}

1.7.2 StaxEventItemWriter

输出与输入相对应. StaxEventItemWriter 需要 1个 Resource , 1个 marshaller 以及 1个 rootTagName . Java对象传递给marshaller(通常是标准的Spring OXM marshaller), marshaller 使用自定义的事件writer写入Resource, 并过滤由OXM工具为每条 fragment 产生的 StartDocument 和 EndDocument事件。我们用 MarshallingEventWriterSerializer 示例来显示这一点。Spring配置如下所示:

<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">     
<property name="resource" ref="outputResource" />     
<property name="marshaller" ref="customerCreditMarshaller" />     
<property name="rootTagName" value="customers" />     
<property name="overwriteOutput" value="true" />
</bean>

上面配置了3个必需的属性,以及1个可选属性 overwriteOutput = true , (本章前面提到过) 用来指定一个已存在的文件是否可以被覆盖。应该注意的是, writer 使用的 marshaller 和前面讲的 reading 示例中是完全相同的:

<bean id="customerCreditMarshaller" class="org.springframework.oxm.xstream.XStreamMarshaller">      
<property name="aliases">         
<util:map id="aliases">           
<entry key="customer" value="org.springframework.batch.sample.domain.CustomerCredit" />           
<entry key="credit" value="java.math.BigDecimal" />           
<entry key="name" value="java.lang.String" />         
</util:map>      
</property>
</bean>

我们用一段Java代码来总结所讨论的知识点, 并演示如何通过代码手动设置所需的属性:

StaxEventItemWriter staxItemWriter = new StaxEventItemWriter();
FileSystemResource resource = new FileSystemResource("data/outputFile.xml")Map aliases = new HashMap();
aliases.put("customer","org.springframework.batch.sample.domain.CustomerCredit");
aliases.put("credit","java.math.BigDecimal");
aliases.put("name","java.lang.String");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
staxItemWriter.setResource(resource);
staxItemWriter.setMarshaller(marshaller);
staxItemWriter.setRootTagName("trades");
staxItemWriter.setOverwriteOutput(true);
ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
CustomerCredit Credit = new CustomerCredit();
trade.setPrice(11.39);
credit.setName("Customer1");
staxItemWriter.write(trade);

1.8 多个数据输入文件

在单个 Step 中处理多个输入文件是很常见的需求。如果这些文件都有相同的格式, 则可以使用 MultiResourceItemReader来进行处理(支持 XML/或 纯文本文件)。 假如某个目录下有如下3个文件:

file-1.txtfile-2.txtignored.txt

file-1.txt 和 file-2.txt 具有相同的格式, 根据业务需求需要一起处理. 可以通过 MuliResourceItemReader 使用 通配符的形式来读取这两个文件:

<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
<property name="resources" value="classpath:data/input/file-*.txt" />
<property name="delegate" ref="flatFileItemReader" />
</bean>

delegate 引用的是一个简单的 FlatFileItemReader。上面的配置将会从两个输入文件中读取数据,处理回滚以及重启场景。应该注意的是,所有 ItemReader 在添加额外的输入文件后(如本示例),如果重新启动则可能会导致某些潜在的问题。 官方建议是每个批作业处理独立的目录,一直到成功完成为止。

1.9 数据库(Database)

和大部分企业应用一样,数据库也是批处理系统存储数据的核心机制。 但批处理与其他应用的不同之处在于,批处理系统一般都运行于大规模数据集基础上。 如果一条SQL语句返回100万行, 则结果集可能全部存放在内存中m直到所有行全部读完。Spring Batch提供了两种类型的解决方案来处理这个问题: 游标(Cursor) 和 可分页的数据库ItemReaders.

1.9.1 基于Cursor的ItemReaders

使用游标(cursor)是大多数批处理开发人员默认采用的方法, 因为它是处理有关系的数据“流”在数据库级别的解决方案。Java的 ResultSet 类其本质就是用面向对象的游标处理机制。 ResultSet 维护着一个指向当前数据行的cursor。调用 ResultSet的 next 方法则将游标移到下一行。

Spring Batch 基于 cursor 的 ItemReaders 在初始化时打开游标, 每次调用 read 时则将游标向前移动一行, 返回一个可用于进行处理的映射对象。最好将会调用 close 方法, 以确保所有资源都被释放。

Spring 的 JdbcTemplate 的解决办法, 是通过回调模式将 ResultSet 中所有行映射之后,在返回调用方法前关闭结果集来处理的。

但是,在批处理的时候就不一样了, 必须得等 step 执行完成才能调用close。下图描绘了基于游标的ItemReader是如何处理的,使用的SQL语句非常简单, 而且都是类似的实现方式:

image

这个例子演示了基本的处理模式。 数据库中有一个 “ FOO ” 表,它有三个字段: ID , NAME , 以及 BAR , select 查询所有ID大于1但小于7的行。这样的话游标起始于 ID 为 2的行(第1行)。这一行的结果会被映射为一个Foo对象。再次调用read()则将光标移动到下一行, 也就是ID为3的Foo。 在所有行读取完毕之后这些结果将会被写出去, 然后这些对象就会被垃圾回收(假设没有其他引用指向他们)。

JdbcCursorItemReader

JdbcCursorItemReader 是基于 cursor 的Jdbc实现。它直接使用ResultSet,需要从数据库连接池中获取连接来执行SQL语句。我们的示例使用下面的数据库表:

CREATE TABLE CUSTOMER (   ID BIGINT IDENTITY PRIMARY KEY,   NAME VARCHAR(45),   CREDIT FLOAT);

我们一般使用领域对象来对应到每一行, 所以用 RowMapper 接口的实现来映射 CustomerCredit 对象:

public class CustomerCreditRowMapper implements RowMapper {      
public static final String ID_COLUMN = "id";      
public static final String NAME_COLUMN = "name";      
public static final String CREDIT_COLUMN = "credit";       
public Object mapRow(ResultSet rs, int rowNum) throws SQLException {          
  CustomerCredit customerCredit = new CustomerCredit();          
  customerCredit.setId(rs.getInt(ID_COLUMN));          
  customerCredit.setName(rs.getString(NAME_COLUMN));          
  customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));          
  return customerCredit;     
  }
}

一般来说Spring的用户对 JdbcTemplate 都不陌生,而 JdbcCursorItemReader 使用其作为关键API接口, 我们一起来学习如何通过 JdbcTemplate 读取这一数据, 看看它与 ItemReader 有何区别。 为了演示方便, 我们假设CUSTOMER表有1000行数据。第一个例子将使用 JdbcTemplate :

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",new CustomerCreditRowMapper());

当执行完上面的代码, customerCredits 这个 List 中将包含 1000 个 CustomerCredit 对象。 在 query 方法中, 先从DataSource 获取一个连接, 然后用来执行给定的SQL, 获取结果后对 ResultSet 中的每一行调用一次 mapRow 方法。 让我们来对比一下 JdbcCursorItemReader 的实现:

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){    
customerCredit = itemReader.read(); 
   counter++;
}
itemReader.close(executionContext);

运行这段代码后 counter 的值将变成 1000。如果上面的代码将返回的 customerCredit 放入 List, 则结果将和使用JdbcTemplate 的例子完全一致。 但是呢, 使用 ItemReader 的强大优势在于, 它允许数据项变成 “流式(streamed)”。 调用一次 read 方法, 通过ItemWriter写出数据对象, 然后再通过 read 获取下一项。 这使得 item 读取和写出可以进行 “分块(chunks)”, 并且周期性地提交, 这才是高性能批处理的本质。此外,它可以很容易地通过配置注入到某个 Spring Batch Step中:

<bean id="itemReader" class="org.spr...JdbcCursorItemReader">     
<property name="dataSource" ref="dataSource"/>     
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>     <property name="rowMapper">          
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>     
</property>
</bean>

因为在Java中有很多种不同的方式来打开游标, 所以 JdbcCustorItemReader 有许多可以设置的属性 :

image

HibernateCursorItemReader

使用 Spring 的程序员需要作出一个重要的决策,即是否使用ORM解决方案,这决定了是否使用 JdbcTemplate 或
HibernateTemplate , Spring Batch开发者也面临同样的选择。HibernateCursorItemReader 是 Hibernate 的游标实现。 其实在批处理中使用 Hibernate 那是相当有争议。这很大程度上是因为 Hibernate 最初就是设计了用来开发在线程序的。

但也不是说Hibernate就不能用来进行批处理。最简单的解决办法就是使用一个 StatelessSession (无状态会话), 而不使用标准 session 。这样就去掉了在批处理场景中 Hibernate 那些恼人的缓存、脏检查等等。

更多无状态会话与正常hibernate会话之间的差异, 请参考你使用的 hibernate 版本对应的文档。HibernateCursorItemReader 允许您声明一个HQL语句, 并传入 SessionFactory , 然后每次调用 read 时就会返回一个对象, 和 JdbcCursorItemReader 一样。下面的示例配置也使用和 JDBC reader 相同的数据库表:

HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){     
  customerCredit = itemReader.read();     counter++;
}
itemReader.close(executionContext);

这里配置的 ItemReader 将以完全相同的方式返回CustomerCredit对象,和 JdbcCursorItemReader 没有区别, 如果
hibernate 映射文件正确的话。 useStatelessSession 属性的默认值为 true , 这里明确设置的目的只是为了引起你的注意,我们可以通过他来进行切换。 还值得注意的是 可以通过 setFetchSize 设置底层 cursor 的 fetchSize 属性 。与JdbcCursorItemReader一样,配置很简单:

<bean id="itemReader" class="org.springframework.batch.item.database.HibernateCursorItemReader">     
<property name="sessionFactory" ref="sessionFactory" />     
<property name="queryString" value="from CustomerCredit" />
</bean>

StoredProcedureItemReader

有时候使用存储过程来获取游标数据是很有必要的。 StoredProcedureItemReader 和 JdbcCursorItemReader 其实差不多,只是不再执行一个查询来获取游标,而是执行一个存储过程, 由存储过程返回一个游标。 存储过程有三种返回游标的方式:

  1. 作为一个 ResultSet 返回(SQL Server, Sybase, DB2, Derby 以及 MySQL支持)
  2. 作为一个 out 参数返回 ref-cursor (Oracle和PostgreSQL使用这种方式)
  3. 作为存储函数(stored function)的返回值

下面是一个基本的配置示例, 还是使用上面 “客户信用” 的例子:

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">     
<property name="dataSource" ref="dataSource"/>    
 <property name="procedureName" value="sp_customer_credit"/>     
<property name="rowMapper">          
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>     
</property>
</bean>

这个例子依赖于存储过程提供一个 ResultSet 作为返回结果(方式1)。

如果存储过程返回一个ref-cursor(方式2),那么我们就需要提供返回的ref-cursor(out 参数)的位置。下面的示例中,第一个参数是返回的ref-cursor:

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">     <property name="dataSource" ref="dataSource"/>     
<property name="procedureName" value="sp_customer_credit"/>    
<property name="refCursorPosition" value="1"/>     
<property name="rowMapper">           
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>     
</property>
</bean>

如果存储函数的返回值是一个游标(方式 3), 则需要将 function 属性设置为 true , 默认为 false 。如下面所示:

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">      
<property name="dataSource" ref="dataSource"/>      
<property name="procedureName" value="sp_customer_credit"/>      
<property name="function" value="true"/>      
<property name="rowMapper">           
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>      
</property>
</bean>

在所有情况下,我们都需要定义 RowMapper 以及 DataSource, 还有存储过程的名字。

如果存储过程/函数需要传入参数, 那么必须声明并通过 parameters 属性来设置值。下面是一个关于 Oracle 的示例, 其中声明了三个参数。 第一个是 out 参数,用来返回 ref-cursor, 第二第三个参数是 in 型参数, 类型都是 INTEGER :

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">    
<property name="dataSource" ref="dataSource"/>    
<property name="procedureName" value="spring.cursor_func"/>    
<property name="parameters">    
<list>       
<bean class="org.springframework.jdbc.core.SqlOutParameter">       
<constructor-arg index="0" value="newid"/>       
<constructor-arg index="1">           
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>       
</constructor-arg>       
</bean>       
<bean class="org.springframework.jdbc.core.SqlParameter">       
<constructor-arg index="0" value="amount"/>       
<constructor-arg index="1">         
<util:constant static-field="java.sql.Types.INTEGER"/>       
</constructor-arg>       
</bean>      
 <bean class="org.springframework.jdbc.core.SqlParameter">       
<constructor-arg index="0" value="custid"/>       
<constructor-arg index="1">         
<util:constant static-field="java.sql.Types.INTEGER"/>       
</constructor-arg>       
</bean>    
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>

除了参数声明, 我们还需要指定一个 PreparedStatementSetter 实现来设置参数值。这和上面的 JdbcCursorItemReader 一样。

1.9.2 可分页的 ItemReader

另一种是使用数据库游标执行多次查询,每次查询只返回一部分结果。 我们将这一部分称为一页(a page)。 分页时每次查询必须指定想要这一页的起始行号和想要返回的行数。

JdbcPagingItemReader

分页 ItemReader 的一个实现是 JdbcPagingItemReader 。 JdbcPagingItemReader 需要一个 PagingQueryProvider 来负责提供获取每一页所需的查询SQL。由于每个数据库都有不同的分页策略, 所以我们需要为各种数据库使用对应的PagingQueryProvider 。 也有自动检测所使用数据库类型的 SqlPagingQueryProviderFactoryBean ,会根据数据库类型选用适当的 PagingQueryProvider 实现。 这简化了配置,同时也是推荐的最佳实践。

SqlPagingQueryProviderFactoryBean 需要指定一个 select 子句以及一个 from 子句(clause). 当然还可以选择提供 where子句. 这些子句加上所需的排序列 sortKey 被组合成为一个 SQL 语句(statement).

在 reader 被打开以后, 每次调用 read 方法则返回一个 item,和其他的 ItemReader一样. 使用分页是因为可能需要额外的行.

下面是一个类似 'customer credit' 示例的例子,使用上面提到的基于 cursor的ItemReaders:

<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>

这里配置的ItemReader将返回CustomerCredit对象, 必须指定使用的RowMapper。 ' pageSize '属性决定了每次数据库查询返回的实体数量。

' parameterValues '属性可用来为查询指定参数映射map。如果在where子句中使用了命名参数,那么这些entry的key应该和命名参数一一对应。如果使用传统的 '?' 占位符, 则每个entry的key就应该是占位符的数字编号,和JDBC占位符一样索引都是从1开始。

JpaPagingItemReader

另一个分页ItemReader的实现是 JpaPagingItemReader 。JPA没有 Hibernate 中StatelessSession 之类的概念,所以我们必须使用JPA规范提供的其他功能。因为JPA支持分页,所以在使用JPA来处理分页时这是一种很自然的选择。读取每页后, 实体将会分离而且持久化上下文将会被清除,以允许在页面处理完成后实体会被垃圾回收。

JpaPagingItemReader 允许您声明一个JPQL语句,并传入一个 EntityManagerFactory 。然后就和其他的 ItemReader 一样,每次调用它的 read 方法都会返回一个 item. 当需要更多实体,则内部就会自动发生分页。下面是一个示例配置,和上面的JDBC reader一样,都是 'customer credit':

<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>

这里配置的ItemReader和前面所说的 JdbcPagingItemReader 返回一样的 CustomerCredit对象, 假设 Customer 对象有正确的JPA注解或者ORM映射文件。 ' pageSize ' 属性决定了每次查询时读取的实体数量。

IbatisPagingItemReader

如果使用 IBATIS/MyBatis, 则可以使用 IbatisPagingItemReader, 顾名思义, 也是一种实现分页的ItemReader。IBATIS不对分页提供直接支持, 但通过提供一些标准变量就可以为IBATIS查询提供分页支持。

下面是和上面的示例同样功能的配置,使用IbatisPagingItemReader来读取CustomerCredits:

<bean id="itemReader" class="org.spr...IbatisPagingItemReader">
<property name="sqlMapClient" ref="sqlMapClient"/>
<property name="queryId" value="getPagedCustomerCredits"/>
<property name="pageSize" value="1000"/>
</bean>

上述 IbatisPagingItemReader 配置引用了一个IBATIS查询,名为“getPagedCustomerCredits”。如果使用MySQL,那么查询XML应该类似于下面这样。

<select id="getPagedCustomerCredits" resultMap="customerCreditResult">select id, name, credit from customer order by id asc LIMIT #_skiprows#, #_pagesize#</select>

_skiprows 和 _pagesize 变量都是 IbatisPagingItemReader 提供的,还有一个 _page 变量,需要时也可以使用。分页查询的语法根据数据库不同使用。下面是使用Oracle的一个例子(但我们需要使用CDATA来包装某些特殊符号,因为是放在XML文档中嘛):

<select id="getPagedCustomerCredits" resultMap="customerCreditResult">select * from (select * from (select t.id, t.name, t.credit, ROWNUM ROWNUM_ from customer t order by id)) where ROWNUM_ <![CDATA[ > ]]> ( #_page# * #_pagesize# )) where ROWNUM <![CDATA[ <= ]]> #_pagesize#</select>

1.9.3 Database ItemWriters

虽然文本文件和XML都有自己特定的 ItemWriter, 但数据库和他们并不一样。这是因为事务提供了所需的全部功能。 对于文件来说 ItemWriters 是必要的, 因为如果需要事务特性,他们必须充当这种角色, 跟踪输出的 item,并在适当的时间flushing/clearing。使用数据库时不需要这个功能,因为写已经包含在事务之中。 用户可以自己创建实现ItemWriter接口的DAO, 或使用一个处理常见问题的自定义ItemWriter,无论哪种方式,都不会有任何问题。 需要注意的一件事是批量输出时的性能和错误处理能力。 在使用hibernate作为ItemWriter 时是最常见的, 但在使用Jdbc batch 模式时可能也会存在同样的问题。批处理数据库输出没有任何固有的缺陷,如果我们注意 flush 并且数据没有错误的话。 但是,在写出时如果发生了什么错误,就可能会引起混乱,因为没有办法知道是哪个item引起的异常, 甚至是否某个单独的 item 负有责任,如下图所示:

image

如果 items 在输出之前有缓冲, 则遇到任何错误将不会立刻抛出, 直到缓冲区刷新之后,提交之前才会抛出。例如, 我们假设每一块写出20个item, 第15个 item 会抛出 DataIntegrityViolationException 。如果与 Step 有关, 则20项数据都会写入成功, 因为没有办法知道会出现错误,直到全部写入完成。一旦调用 Session#flush(), 就会清空缓冲区buffer, 而异常也将被放出来。在这一点上, Step无能为力, 事务也必须回滚。 通常, 异常会导致 item 被跳过(取决于 skip/retry 策略), 然后该item就不会被输出。 然而,在批处理的情况下, 是没有办法知道到底是哪一项引起的问题, 在错误发生时整个缓冲区都将被写出。解决这个问题的唯一方法就是在每一个 item之后 flush一下:

image

这种用法是很常见的, 尤其是在使用Hibernate时,ItemWriter的简单实现建议, 在每次调用 write() 之后执行 flush。这样做可以让跳过 items 变得可靠, 而Spring Batch 在错误发生后会在内部关注适当粒度的ItemWriter调用。

1.10 重用已存在的 Service

批处理系统通常是与其他应用程序相结合的方式使用。最常见的是与一个在线应用系统结合, 但也支持与瘦客户端集成,通过移动每个程序所使用的批量数据。由于这些原因,所以很多用户想要在批处理作业中重用现有的DAO或其他服务。Spring容器通过注入一些必要的类就可以实现这些重用。但可能需要现有的服务作为 ItemReader 或者 ItemWriter, 也可以适配另一个Spring Batch类, 或其本身就是一个 step 主要的ItemReader。为每个需要包装的服务编写一个适配器类是很简单的, 而因为这是很普遍的需求,所以 Spring Batch 提供了实现: ItemReaderAdapter 和 ItemWriterAdapter 。两个类都实现了标准的Spring方法委托模式调用,设置也相当简单。下面是一个reader的示例:

<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">     
<property name="targetObject" ref="fooService" />    
 <property name="targetMethod" value="generateFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />

特别需要注意的是, targetMethod 必须和 read 方法行为对等: 如果不存在则返回null, 否则返回一个 Object。 其他的值会使框架不知道何时该结束处理, 或者引起无限循环或不正确的失败,这取决于 ItemWriter 的实现。 ItemWriter 的实现同样简单:

<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">  
  <property name="targetObject" ref="fooService" />    
<property name="targetMethod" value="processFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />

1.11 输入校验

在本章中, 已经讨论了很多种用来解析 input 的方法。 如果格式不对,那这些基本的实现都是抛出异常。 如果数据丢失一部分,FixedLengthTokenizer 也会抛出异常。 同样, 使用 FieldSetMapper 时,如果读取超出 RowMapper 索引范围的值,又或者返回值类型不匹配,都会抛出异常。 所有的异常都会在 read 返回之前抛出。 然而, 他们不能确定返回的item是否是合法的。 例如, 如果其中一个字段是 age , 很显然不能是负数。 解析为数字是没问题的, 因为确实存在这个数, 所以就不会抛出异常。 因为当下已经有大量的第三方验证框架, 所以 Spring Batch 并不提供另一个验证框架, 而是提供了一个非常简单的接口, 其他框架可以实现这个接口来提供兼容:

public interface Validator {    void validate(Object value) throws ValidationException;}

The contract is that the validate method will throw an exception if the object is invalid, and return normally if it is valid.Spring Batch provides an out of the box ItemProcessor:

约定是如果对象无效则 validate 方法抛出一个异常, 如果对象合法那就正常返回。 Spring Batch 提供了开箱即用的ItemProcessor:

<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">     
<property name="validator" ref="validator" />
</bean>
<bean id="validator" class="org.springframework.batch.item.validator.SpringValidator">   
 <property name="validator">        
<bean id="orderValidator" class="org.springmodules.validation.valang.ValangValidator">             
<property name="valang">                
<value>                   <![CDATA[                       { orderId : ? > 0 AND ? <= 9999999999 : 'Incorrect order ID' : 'error.order.id' }                       { totalLines : ? = size(lineItems) : 'Bad count of order lines'                         : 'error.order.lines.badcount'}                       { customer.registered : customer.businessCustomer = FALSE OR ? = TRUE                         : 'Business customer must be registered'                         : 'error.customer.registration'}                       { customer.companyName : customer.businessCustomer = FALSE OR ? HAS TEXT                         : 'Company name for business customer is mandatory'                         :'error.customer.companyname'}                   ]]>                
</value>             
 </property>           
</bean>        
 </property>
</bean>

这个示例展示了一个简单的 ValangValidator, 用来校验 order 对象。 这样写目的是为了尽可能多地演示如何使用 Valang 来添加校验程序。

1.12 不保存执行状态

默认情况下,所有 ItemReader 和 ItemWriter 在提交之前都会把当前状态信息保存到 ExecutionContext 中。 但有时我们又不希望保存这些信息。 例如,许多开发者使用处理指示器(process indicator)让数据库读取程序 '可重复运行(rerunnable)'。 在数据表中添加一个附加列来标识该记录是否已被处理。 当某条记录被读取/写入时,就将标志位从 false 变为 true , 然后只要在SQL语句的where子句中包含一个附加条件, 如 " where PROCESSED_IND = false ", 就可确保在任务重启后只查询到未处理过的记录。 这种情况下,就不需要保存任何状态信息, 比如当前 row number 什么的, 因为在重启后这些信息都没用了。 基于这种考虑, 所有的 readers 和 writers 都含有一个 saveState 属性:

<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">      <property name="dataSource" ref="dataSource" />      
<property name="rowMapper">         
<bean class="org.springframework.batch.sample.PlayerSummaryMapper" />      </property>      
<property name="saveState" value="false" />          
<property name="sql">            
 <value>                SELECT games.player_id, games.year_no, SUM(COMPLETES),                SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),                SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),                SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)                from games, players where players.player_id =                games.player_id group by games.player_id, games.year_no             
</value>           
</property>
</bean>

上面配置的这个 ItemReader 在任何情况下都不会将 entries(状态信息)存放到 ExecutionContext 中.

1.13 创建自定义 ItemReaders 与 ItemWriters

到目前为止,本章已将 Spring Batch 中基本的读取(reading)和写入(writing)概念讲完, 还对一些常用的实现进行了讨论。然而,这些都是相当普通的, 还有很多潜在的场景可能没有现成的实现。本节将通过一个简单的例子,来演示如何创建自定义的 ItemReader 和 ItemWriter ,并且如何正确地实现和使用。 ItemReader 同时也将 ItemStream , 以说明如何让reader(读取器)或writer(写入器)支持重启(restartable)。

1.13.1 自定义 ItemReader 示例

为了实现这个目的,我们实现一个简单的 ItemReader , 从给定的list中读取数据。 我们将实现最基本的 ItemReader 功能, read:

public class CustomItemReader<T> implements ItemReader<T>{      
  List<T> items;      
  public CustomItemReader(List<T> items) {         
    this.items = items;      
  }      
  public T read() throws Exception, UnexpectedInputException,NoWorkFoundException, ParseException {          
    if (!items.isEmpty()) {              
      return items.remove(0);        
   }        
   return null;      
 }
}

这是一个简单的类, 传入一个 items list, 每次读取时删除其中的一条并返回。 如果list里面没有内容,则将返回null, 从而满足ItemReader 的基本要求, 测试代码如下所示:

List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<String>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

使 ItemReader 支持重启

现在剩下的问题就是让 ItemReader 变为可重启的。到目前这一步,如果发生掉电之类的情况,那么必须重新启动ItemReader,而且是从头开始。在很多时候这是允许的,但有时侯更好的处理办法是让批处理作业在上次中断的地方重新开始。判断的关键是根据 reader 是有状态的还是无状态的。 无状态的 reader 不需要考虑重启的情况, 但有状态的则需要根据其最后一个已知的状态来重新启动。出于这些原因, 官方建议尽可能地让 reader 成为无状态的,使开发者不需要考虑重新启动的情况。

如果需要保存状态信息,那应该使用 ItemStream 接口:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {      
  List<T> items;      
  int currentIndex = 0;      
  private static final String CURRENT_INDEX = "current.index";      
  public CustomItemReader(List<T> items) {           
    this.items = items;      
  }      
  public T read() throws Exception, UnexpectedInputException,ParseException {           
    if (currentIndex < items.size()) {           
      return items.get(currentIndex++);       
    }       
    return null;       
  }       
  public void open(ExecutionContext executionContext) throws ItemStreamException {           
    if(executionContext.containsKey(CURRENT_INDEX)){               
      currentIndex = new      Long(executionContext.getLong(CURRENT_INDEX)).intValue();           
    }else{              
       currentIndex = 0;          
   }       
}       
public void update(ExecutionContext executionContext) throws ItemStreamException {            
    executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());      
  }       
 public void close() throws ItemStreamException {
 }
}

每次调用 ItemStream 的 update 方法时, ItemReader 的当前 index 都会被保存到给定的 ExecutionContext 中,key 为' current.index '。 当调用 ItemStream 的 open 方法时, ExecutionContext会检查是否包含该 key 对应的条目。 如果找到key, 那么当前索引 index 就好移动到该位置。这是一个相当简单的例子,但它仍然符合通用原则:

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");items.add("3");
itemReader = new CustomItemReader<String>(items);((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多数ItemReaders具有更加复杂的重启逻辑。 例如 JdbcCursorItemReader , 存储了游标(Cursor)中最后所处理的行的row id。

还值得注意的是 ExecutionContext 中使用的 key 不应该过于简单。这是因为 ExecutionContext 被一个 Step 中的所有ItemStreams 共用。在大多数情况下,使用类名加上 key 的方式应该就足以保证唯一性。然而,在极端情况下, 同一个类的多个ItemStream 被用在同一个Step中时( 如需要输出两个文件的情况),就需要更加具备唯一性的name标识。出于这个原因,SpringBatch 的许多 ItemReader 和 ItemWriter 实现都有一个 setName() 方法, 允许覆盖默认的 key name。

1.13.2 自定义 ItemWriter 示例

自定义实现 ItemWriter 和上一小节所讲的 ItemReader 有很多方面是类似, 但也有足够多的不同之处。 但增加可重启特性在本质上是一样的, 所以本节的示例就不再讨论这一点。和 ItemReader 示例一样, 为了简单我们使用的参数也是 List :

public class CustomItemWriter<T> implements ItemWriter<T> {      List<T> output = TransactionAwareProxyFactory.createTransactionalList();      public void write(List<? extends T> items) throws Exception {            output.addAll(items);      }      public List<T> getOutput() {            return output;      }}

让 ItemWriter 支持重新启动

要让 ItemWriter 支持重新启动,我们将会使用和 ItemReader 相同的过程, 实现并添加 ItemStream 接口来同步 execution context。 在示例子中我们可能要记录处理过的items数量,并添加为到 footer 记录。 我们可以在 ItemWriter 的实现类中同时实现 ItemStream , 以便在 stream 重新打开时从执行上下文中取回原来的数据重建计数器。

实际开发中, 如果自定义 ItemWriter restartable(支持重启),则会委托另一个 writer(例如, 在写入文件时), 否则会写入到关系型数据库(支持事务的资源)中, 此时 ItemWriter 不需要 restartable特性,因为自身是无状态的。 如果你的 writer 有状态, 则应该实现2个接口: ItemStream 和 ItemWriter 。 请记住, writer客户端需要知道 ItemStream 的存在, 所以需要在 xml 配置文件中将其注册为 stream.

调度流程

image.png

1、顺序流程(Sequential Flow)

<job  id="job">
    <step  id="stepA"  parent="s1"  next="stepB" />
    <step  id="stepB"  parent="s2"  next="stepC"/>
    <step  id="stepC"  parent="s3" />
</job>
image.png

2、条件流程(Conditional Flow)

image.png
<job id="job">
    <step id="stepA" parent="s1">
        <next  on="*"  to="stepB" />
        <next  on="FAILED"  to="stepC" />
    </step>
    <step id="stepB" parent="s2"  next="stepC">
        <fail on="FAILED"  exit-code="EARLY TERMINATION"/>
    </step>
    <step id="stepC" parent="s3">
        <stop  on="COMPLETED"/>
        <end  on="FAILED"/>
    </step>
</job>
image.png

3、自定义流程(Programmatic Flow Decisions)

首先,继承 JobExecutionDecider,自定义流程.

public class MyDecider implements JobExecutionDecider {
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        if (someCondition) {
            return "FAILED";
        }
        else {
            return "COMPLETED";
        }
    }
}

然后就可以使用这个流程了。

<job id="job">
    <step id="step1" parent="s1" next="decision" />

    <decision id="decision" decider="decider">
        <next on="FAILED" to="step2" />
        <next on="COMPLETED" to="step3" />
    </decision>

    <step id="step2" parent="s2" next="step3"/>
    <step id="step3" parent="s3" />
</job>

<beans:bean id="decider" class="com.MyDecider"/>

4、并行流程(Split Flow)

<split id="split1" next="step4">
    <flow>
        <step id="step1" parent="s1" next="step2"/>
        <step id="step2" parent="s2"/>
    </flow>
    <flow>
        <step id="step3" parent="s3"/>
    </flow>
</split>
<step id="step4" parent="s4"/>

作者:天生小包

原文链接:https://www.jianshu.com/p/894e313a7607

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐