中文|English
GoBatch是一款Go语言下的批处理框架,类似于Java语言的Spring Batch。如果你用过Spring Batch,你会发现GoBatch很容易上手。
在GoBatch里面,任务(Job)被划分为多个按先后顺序依次执行的步骤(Step)。在执行任务时,框架会将任务和各个步骤的运行时相关数据记录到数据库中。
步骤包括3种类型:
- 简单步骤:接受一个Handler对象,并在单线程中执行Handler包含的业务逻辑。Handler是接口类型,由用户自行实现。
- 分块步骤:用于处理大批量步骤,将全部数据分成若干小块依次进行处理,分块大小由用户指定。每个分块的处理流程是先使用Reader读取一个分块大小数量的数据,接着通过Processor逐条处理读取的数据,最后将结果通过Writer写入存储。这个流程会一直重复执行,直到所有数据读取完毕(Reader.Read()返回nil)。其中Reader、Processor、Writer是接口类型,由用户实现。
- 分区步骤:用于将一个大任务分成多个子任务,每个子任务可以由独立的线程来执行。在运行时,分区步骤被分为多个并行执行的子步骤,所有子步骤执行完毕,将结果进行合并。分区步骤的业务逻辑可以通过Handler来实现,也可以通过Reader/Processor/Writer来实现,此外,必须通过Partitioner指定分区逻辑,如果需要合并结果,则还要指定Aggregator。分区步骤与分块步骤的区别是:前者是多线程执行,后者是单线程执行。
- 以模块化方式构建批处理应用程序。
- 管理多个批处理任务的运行。
- 任务被分为多个串行执行的步骤,一个步骤可以通过分区由多线程并行执行。
- 自动记录任务执行状态,支持任务失败后断点续跑。
- 内置文件读写组件,支持tsv、csv、json等格式的文件读写及校验。
- 提供多种Listener,便于对任务和步骤进行扩展。
go get -u github.com/chararch/gobatch
- 创建或使用已有的数据库,库名如: gobatch
- 在前述数据库中,使用文件sql/schema_mysql.sql 的内容创建表。
- 使用gobatch框架编写批处理代码并运行。
import (
"chararch/gobatch"
"context"
"database/sql"
"fmt"
)
// simple task
func mytask() {
fmt.Println("mytask executed")
}
//reader
type myReader struct {
}
func (r *myReader) Read(chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
curr, _ := chunkCtx.StepExecution.StepContext.GetInt("read.num", 0)
if curr < 100 {
chunkCtx.StepExecution.StepContext.Put("read.num", curr+1)
return fmt.Sprintf("value-%v", curr), nil
}
return nil, nil
}
//processor
type myProcessor struct {
}
func (r *myProcessor) Process(item interface{}, chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
return fmt.Sprintf("processed-%v", item), nil
}
//writer
type myWriter struct {
}
func (r *myWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
fmt.Printf("write: %v\n", items)
return nil
}
func main() {
//set db for gobatch to store job&step execution context
db, err := sql.Open("mysql", "gobatch:gobatch123@tcp(127.0.0.1:3306)/gobatch?charset=utf8&parseTime=true")
if err != nil {
panic(err)
}
gobatch.SetDB(db)
//build steps
step1 := gobatch.NewStep("mytask").Handler(mytask).Build()
//step2 := gobatch.NewStep("my_step").Handler(&myReader{}, &myProcessor{}, &myWriter{}).Build()
step2 := gobatch.NewStep("my_step").Reader(&myReader{}).Processor(&myProcessor{}).Writer(&myWriter{}).ChunkSize(10).Build()
//build job
job := gobatch.NewJob("my_job").Step(step1, step2).Build()
//register job to gobatch
gobatch.Register(job)
//run
//gobatch.StartAsync(context.Background(), job.Name(), "")
gobatch.Start(context.Background(), job.Name(), "")
}
该示例代码位于 test/example.go
有多种方法编写简单步骤的逻辑,如下:
// 1. write a function with one of the following signature
func(execution *StepExecution) BatchError
func(execution *StepExecution)
func() error
func()
// 2. implement the Handler interface
type Handler interface {
Handle(execution *StepExecution) BatchError
}
当你使用以上函数定义或接口定义编写好了业务逻辑,则可以通过以下方式构造Step对象:
step1 := gobatch.NewStep("step1").Handler(myfunction).Build()
step2 := gobatch.NewStep("step2").Handler(myHandler).Build()
//or
step1 := gobatch.NewStep("step1", myfunction).Build()
step2 := gobatch.NewStep("step2", myHandler).Build()
分块步骤需要实现以下3个接口(其中,只有Reader是必须实现的):
type Reader interface {
//Read each call of Read() will return a data item, if there is no more data, a nil item will be returned.
Read(chunkCtx *ChunkContext) (interface{}, BatchError)
}
type Processor interface {
//Process process an item from reader and return a result item
Process(item interface{}, chunkCtx *ChunkContext) (interface{}, BatchError)
}
type Writer interface {
//Write write items generated by processor in a chunk
Write(items []interface{}, chunkCtx *ChunkContext) BatchError
}
框架还包含一个ItemReader接口,在某些情况下,可以用于代替Reader,其定义如下:
type ItemReader interface {
//ReadKeys read all keys of some kind of data
ReadKeys() ([]interface{}, error)
//ReadItem read value by one key from ReadKeys result
ReadItem(key interface{}) (interface{}, error)
}
为了方便起见,可以通过实现以下接口,在Reader或Writer中执行一些初始化或清理的动作:
type OpenCloser interface {
Open(execution *StepExecution) BatchError
Close(execution *StepExecution) BatchError
}
示例代码可以参考 test/example2
分区步骤必须要实现Partitioner接口,该接口用于将整个步骤要处理的数据分成多个分区,每个分区对应一个子步骤,框架会启动多个线程来并行执行多个子步骤。如果需要对子步骤的执行结果进行合并,还需要实现Aggregator接口。这两个接口定义如下:
type Partitioner interface {
//Partition generate sub step executions from specified step execution and partitions count
Partition(execution *StepExecution, partitions uint) ([]*StepExecution, BatchError)
//GetPartitionNames generate sub step names from specified step execution and partitions count
GetPartitionNames(execution *StepExecution, partitions uint) []string
}
type Aggregator interface {
//Aggregate aggregate result from all sub step executions
Aggregate(execution *StepExecution, subExecutions []*StepExecution) BatchError
}
对于分区步骤的子步骤来说,既可以是一个简单步骤(由Handler定义),也可以是一个分块步骤(通过Reader/Processor/Writer定义)。 如果已有了一个包含ItemReader的分块步骤,则可以通过指定分区数量就可以构造分区步骤,如下:
step := gobatch.NewStep("partition_step").Handler(&ChunkHandler{db}).Partitions(10).Build()
这种方式是由GoBatch框架内部基于ItemReader实现了Partitioner。
我们假定有一个文件的内容如下(其中每行是一条记录,每个字段用'\t'分隔):
trade_1 account_1 cash 1000 normal 2022-02-27 12:12:12
trade_2 account_2 cash 1000 normal 2022-02-27 12:12:12
trade_3 account_3 cash 1000 normal 2022-02-27 12:12:12
……
如果想读取该文件的内容,并将文件中每条记录插入到数据库中的 t_trade 表中,则可以通过以下方式来实现:
type Trade struct {
TradeNo string `order:"0"`
AccountNo string `order:"1"`
Type string `order:"2"`
Amount float64 `order:"3"`
TradeTime time.Time `order:"5"`
Status string `order:"4"`
}
var tradeFile = file.FileObjectModel{
FileStore: &file.LocalFileSystem{},
FileName: "/data/{date,yyyy-MM-dd}/trade.data",
Type: file.TSV,
Encoding: "utf-8",
ItemPrototype: &Trade{},
}
type TradeWriter struct {
db *gorm.DB
}
func (p *TradeWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
models := make([]*Trade, len(items))
for i, item := range items {
models[i] = item.(*Trade)
}
e := p.db.Table("t_trade").Create(models).Error
if e != nil {
return gobatch.NewBatchError(gobatch.ErrCodeDbFail, "save trade into db err", e)
}
return nil
}
func buildAndRunJob() {
//...
step := gobatch.NewStep("trade_import").ReadFile(tradeFile).Writer(&TradeWriter{db}).Partitions(10).Build()
//...
job := gobatch.NewJob("my_job").Step(...,step,...).Build()
gobatch.Register(job)
gobatch.Start(context.Background(), job.Name(), "{\"date\":\"20220202\"}")
}
再假定我们需要将 t_trade 表中的数据导出为一个csv文件,可以按照以下方式来实现:
type Trade struct {
TradeNo string `order:"0" header:"trade_no"`
AccountNo string `order:"1" header:"account_no"`
Type string `order:"2" header:"type"`
Amount float64 `order:"3" header:"amount"`
TradeTime time.Time `order:"5" header:"trade_time" format:"2006-01-02_15:04:05"`
Status string `order:"4" header:"trade_no"`
}
var tradeFileCsv = file.FileObjectModel{
FileStore: &file.LocalFileSystem{},
FileName: "/data/{date,yyyy-MM-dd}/trade_export.csv",
Type: file.CSV,
Encoding: "utf-8",
ItemPrototype: &Trade{},
}
type TradeReader struct {
db *gorm.DB
}
func (h *TradeReader) ReadKeys() ([]interface{}, error) {
var ids []int64
h.db.Table("t_trade").Select("id").Find(&ids)
var result []interface{}
for _, id := range ids {
result = append(result, id)
}
return result, nil
}
func (h *TradeReader) ReadItem(key interface{}) (interface{}, error) {
id := int64(0)
switch r := key.(type) {
case int64:
id = r
case float64:
id = int64(r)
default:
return nil, fmt.Errorf("key type error, type:%T, value:%v", key, key)
}
trade := &Trade{}
result := h.db.Table("t_trade").Find(trade, "id = ?", id)
if result.Error != nil {
return nil, result.Error
}
return trade, nil
}
func buildAndRunJob() {
//...
step := gobatch.NewStep("trade_export").Reader(&TradeReader{db}).WriteFile(tradeFileCsv).Partitions(10).Build()
//...
}
框架提供了多种监听器,用于处理整个批处理任务和步骤执行过程中的各种事件,如下:
type JobListener interface {
BeforeJob(execution *JobExecution) BatchError
AfterJob(execution *JobExecution) BatchError
}
type StepListener interface {
BeforeStep(execution *StepExecution) BatchError
AfterStep(execution *StepExecution) BatchError
}
type ChunkListener interface {
BeforeChunk(context *ChunkContext) BatchError
AfterChunk(context *ChunkContext) BatchError
OnError(context *ChunkContext, err BatchError)
}
type PartitionListener interface {
BeforePartition(execution *StepExecution) BatchError
AfterPartition(execution *StepExecution, subExecutions []*StepExecution) BatchError
OnError(execution *StepExecution, err BatchError)
}
可以构建任务时指定监听器,示例如下:
func buildAndRunJob() {
//...
step := gobatch.NewStep("my_step").Handler(handler,...).Listener(listener,...).Build()
//...
job := gobatch.NewJob("my_job").Step(step,...).Listener(listener,...).Build()
}
GoBatch框架需要使用数据库来存储任务和步骤执行过程中的上下文信息,因此在启动任务之前,必须注册一个 *sql.DB 实例到GoBatch中,如下:
gobatch.SetDB(sqlDb)
如果需要使用分块步骤,则必须设置一个事务管理器(TransactionManager)到GoBatch,事务管理器接口定义如下:
type TransactionManager interface {
BeginTx() (tx interface{}, err BatchError)
Commit(tx interface{}) BatchError
Rollback(tx interface{}) BatchError
}
GoBatch框架包含一个默认的事务管理器,类名DefaultTxManager,如果已经设置了DB实例且尚未设置TransactionManager,则 GoBatch 会自动创建一个 DefaultTxManager 实例。当然,用户也可以指定自己的事务管理器来代替默认实现:
gobatch.SetTransactionManager(&CustomTransactionManager{})
GoBatch 内部使用池化技术来运行任务和步骤。默认最大并发任务数和最大并发步骤数分别是10、1000,如果需要修改默认值,则设置如下:
gobatch.SetMaxRunningJobs(100)
gobatch.SetMaxRunningSteps(5000)