阅读 79

Automi一个GO流处理API


【译文】原文地址
在流处理当中,数据被分解成序列元素可以后续对大数据集进行处理。本文是关于Automi这个项目是一个Go包,支持开发者创建项目使用流处理方法来处理数据。
Automi Github地址https://github.com/vladimirvivien/automi

总体上,Automi提供如下特性:

  • 使用Go语义来组合多个阶段数据流处理器
  • 支持函数编程风格的操作
  • 使用Go原生包作为数据source和sink(比如channel、slice等)。
  • 自带数据转换和过滤操作
  • 支持批处理操作(Sort、GroupBy、Sum等)
    在开始之前,我们思考一个例子。数据源会产生字符到数据流中。随着时间推移,每个字符会随着流转移并被处理。首先用一个过滤器将不需要的字符移除掉。然后将字符映射成字符串,最后收集每个字符串。下图用一个工厂流水线来说明流及其相关操作。


    流处理说明

    接下来,我们看看如何使用Automi来实现上面的例子,用代码将字符导入到流中并处理每个字符。下面代码从切片中创建了一个数据流,然后将过滤和转换操作函数添加到流处理中。

func main() {
   // 使用切片来创建一个流到数据源
   strm := stream.New(
      []rune("B世!ぽ[@opqDQRS](http://twitter.com/opqDQRS)#$%^...O6PTnVWXѬYZbcef7ghijCklrAstvw"),
   ) // 过滤非大写字母字符
   strm.Filter(func(item rune) bool {
      return item >= 65 && item < (65+26)
   }) // 字符转换为字符串
   strm.Map(func(item rune) string {
      return string(item)
   }) //使用collectors将每个字符打印到输出台
   strm.Into(collectors.Writer(os.Stdout)) // Open the stream
  if err := <-strm.Open(); err != nil {
    log.Fatal(err)
  }
}

我们详细分析上面代码每一步都做了什么。

  • str流对象实例是根据切片[]rune创建的。每个元素在流当中按顺序流出。
  • str.Filter方法使用用户定义的匿名函数func(item rune) bool处理流中的各字符。当接收到大写字母就返回true,非大写字母字符就返回fasle,过滤掉不需要的字符。
  • str.Map方法接收上一个过滤操作处理后的字符,并应用用户自定义的fun(item rune) string函数将字符转换成字符串,并将结果推送到下一步。
  • 最后,stri.Into方法指定一个搜集器,对上一步操作的字符进行搜集。在这个例子中,字符被收集并写入到标准输出当中。
    注意到目前这一步,只是将数据流申明好了,直到调用open函数才会启动流处理过程。执行流时,必须使用s t r.Open()方法来触发流启动并将定义的操作执行起来。
> go run .
BDQRSUEFGHIJKLMNOPTVWXYZCA

Automi概念

Automi使用的流水线管道,是通过Go channel来实现的,作为数据流的传递通道。如下所描述的,Automi使用了四个组件包括:stream流本身,数据发射器(emitter)产生数据到流当中,流操作器(operator)用于处理流中的每个对象,收集器(collectors)搜集流中的对象。


Automi组件

流代表可传输数据对象的管道。Automi内部使用Go channel作为管道来传送流中的数据对象。这意味着,流支持缓存、自动同步和并发安全等特性。

一个Automi流可以使用New函数来创建,需要传入一个发射器作为参数,或者一个可以封装成发射器等值。

strm := stream.New(<emitter or value>)

New函数返回的Stream类型,使用API来配置流,可不断地添加方法到这个流当中,如下所示:

func main() {
  strm := stream.New([]rune(...))
  strm.Filter(func(item rune) bool {
     return item >= 65 && item < (65+26)
  }).Map(func(item rune) string {
    return string(item) 
  }).Into(collectors.Writer(os.Stdout))
  ...
}

发射器(emitter)

流是从发射器开始的,发射器可以从内存、网络或者文件中获取数据并读取当流。emitters包实现了如下发射器:

emitters.Channel
emitters.CSV
emitters.Reader
emitters.Scanner
emitters.Slice

如果上面的无法满足你的需求,你还可以创建自定义emitter,通过实现如下接口即可:

type Emitter interface { 
   GetOutput() <-chan interface{}
}

流操作器

一个流操作器是将流中的每个对象应用到对应的操作中。Automi使用Go支持的函数式编程特性允许开发者自定义操作器。

stream.Filter(func(row []string) bool {
   count, err := strconv.Atoi(row[1])
   if err != nil {
      count = 0
   }
   return (count > 0)
})

Automi包含很多操作器函数,如下所示:

Stream.Filter
Stream.Map
Stream.FlatMap
Stream.Reduce
Stream.Process
Stream.GroupByKey
Stream.GroupByName
Stream.GroupByPos
Stream.Sort
Stream.SortByKey
Stream.SortByName
Stream.SortByPos
Stream.SortWith
Stream.Sum
Stream.SumByKey
Stream.SumByName
Stream.SumByPos
Stream.SumAllKeys

上面列出的很多操作器都支持用户自定义函数,其他的应用于配置、转换等功能到流中。

收集器(collector)

收集器可以是内存、网络、或文件资源,用于存放流中数据。Automi可以收集数据到如下定义的收集器中:

colelctors.CSV
collectors.Func
collectors.Null
collectors.Slice
collectors.Writer

如果你需要使用自定义的收集器,可以实现如下接口:

type Collector interface { 
   SetInput(<-chan interface{})
}

更多流处理例子

下面我们来探索更多的例子来展示Automi是如何使用的。

从channels中创建流

下面的例子展示了流数据是如何从channel中获得。如下代码所示,函数emitterFunc返回类型为<-chan time.Time对象,作为流的发射器参数传入流。然后将一个实现time.Time对象转换为字符串的流操作器应用到流中,最收集字符串到标准输出当中。


从channel读数据到流

以下代码实现了上面的流处理:

func main() {
   // emitterFunc returns a chan used for data
   emitterFunc := func() <-chan time.Time {
      times := make(chan time.Time)
      go func() {
         times <- time.Unix(100000, 0)
         times <- time.Unix(2*100000, 0)
         times <- time.Unix(4*100000, 0)
         times <- time.Unix(8*100000, 0)
         close(times)
      }()
      return times
   }
   strm := stream.New(emitterFunc())
   strm.Map(func(item time.Time) string {
      return item.String()
   }).Into(collectors.Writer(os.Stdout))
   if err := <-strm.Open(); err != nil {
      fmt.Println(err)
      return
   }
}

源代码示例2
在前面的示例中,当打开流时,仅仅将收集的数据打印到标准输出。

HTTP请求流

Automi流可以从实现了io.reader接口到值当中读取数据,或使用任何实现了io.writer接口到值来收集数据。以下通过描述一个服务器通过http.Request.Body中读取字节切片到流中来说明,并使用base64转换收到的每个切片,然后将转换后的数据写入到http.Response中。


HTTP服务器从request.Body读取流数据

以下代码就是上面的实现:

func main() {
   http.HandleFunc(
      "/",
      func(resp http.ResponseWriter, req *http.Request) {
         resp.Header().Add("Content-Type", "text/html")
         resp.WriteHeader(http.StatusOK)
         strm := stream.New(req.Body)
         strm.Process(func(data []byte) string {
            return base64.StdEncoding.EncodeToString(data)
         }).Into(resp)
         if err := <-strm.Open(); err != nil {
            resp.WriteHeader(http.StatusInternalServerError)
         }
      },
   )
   http.ListenAndServe(":4040", nil)
}

当服务器接收到“/”路径请求,会将req.Body内容注入到流当中,并应用用户定义操作方法来计算数据到base64编码,然后返回计算及过到resp中。

源代码示例3

使用CSVs创建流

下面的例子将展示流数据来源于CSV文件,并使用两个用户定义的操作器处理,最后将结果收集到另外一个CSV文件中。



以下代码展示上面的流功能。使用emitter.CSV来创建CSV发射器csvEmitter,将序列化每行内容到[]string并注入到流当中。Map函数应用一个操作器返回每行到前6个对象,Filter方法删除每行第二列(row[1])为0到数。

func main() {
   csvEmitter := emitters.CSV("./stats.txt").
      CommentChar('#').
      DelimChar(',').
      HasHeaders()
   stream := stream.New(csvEmitter)
   
   // select first 6 cols per row:
   stream.Map(func(row []string) []string {
      return row[:6]
   })
 
   // filter out rows with col[1] = 0
   stream.Filter(func(row []string) bool {
      count, err := strconv.Atoi(row[1])
      if err != nil {
         count = 0
      }
      return (count > 0)
   })
   stream.Into(collectors.CSV("./out.txt"))
   if err := <-stream.Open(); err != nil {
      fmt.Println(err)
      return
   }
}

当流被打开,会使用collects.CSV来收集流中的对象到另一个CSV文件中。

更多示例

在Github上还有很多用例:

作者:Go语言由浅入深

原文链接:https://www.jianshu.com/p/4d89afb50baf

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐