5 Star 23 Fork 7

chararch / gobatch

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
MIT

GoBatch

GoDoc Reference Go Report Card MIT license

中文|English

GoBatch是一款Go语言下的批处理框架,类似于Java语言的Spring Batch。如果你用过Spring Batch,你会发现GoBatch很容易上手。

架构

在GoBatch里面,任务(Job)被划分为多个按先后顺序依次执行的步骤(Step)。在执行任务时,框架会将任务和各个步骤的运行时相关数据记录到数据库中。

步骤包括3种类型:

  • 简单步骤:接受一个Handler对象,并在单线程中执行Handler包含的业务逻辑。Handler是接口类型,由用户自行实现。
  • 分块步骤:用于处理大批量步骤,将全部数据分成若干小块依次进行处理,分块大小由用户指定。每个分块的处理流程是先使用Reader读取一个分块大小数量的数据,接着通过Processor逐条处理读取的数据,最后将结果通过Writer写入存储。这个流程会一直重复执行,直到所有数据读取完毕(Reader.Read()返回nil)。其中Reader、Processor、Writer是接口类型,由用户实现。
  • 分区步骤:用于将一个大任务分成多个子任务,每个子任务可以由独立的线程来执行。在运行时,分区步骤被分为多个并行执行的子步骤,所有子步骤执行完毕,将结果进行合并。分区步骤的业务逻辑可以通过Handler来实现,也可以通过Reader/Processor/Writer来实现,此外,必须通过Partitioner指定分区逻辑,如果需要合并结果,则还要指定Aggregator。分区步骤与分块步骤的区别是:前者是多线程执行,后者是单线程执行

功能

  1. 以模块化方式构建批处理应用程序。
  2. 管理多个批处理任务的运行。
  3. 任务被分为多个串行执行的步骤,一个步骤可以通过分区由多线程并行执行。
  4. 自动记录任务执行状态,支持任务失败后断点续跑。
  5. 内置文件读写组件,支持tsv、csv、json等格式的文件读写及校验。
  6. 提供多种Listener,便于对任务和步骤进行扩展。

安装

go get -u github.com/chararch/gobatch

使用步骤

  1. 创建或使用已有的数据库,库名如: gobatch
  2. 在前述数据库中,使用文件sql/schema_mysql.sql 的内容创建表。
  3. 使用gobatch框架编写批处理代码并运行。

代码

一个例子

import (
	"chararch/gobatch"
	"context"
	"database/sql"
	"fmt"
)

// simple task
func mytask() {
	fmt.Println("mytask executed")
}

//reader
type myReader struct {
}
func (r *myReader) Read(chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
	curr, _ := chunkCtx.StepExecution.StepContext.GetInt("read.num", 0)
	if curr < 100 {
		chunkCtx.StepExecution.StepContext.Put("read.num", curr+1)
		return fmt.Sprintf("value-%v", curr), nil
	}
	return nil, nil
}

//processor
type myProcessor struct {
}
func (r *myProcessor) Process(item interface{}, chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
	return fmt.Sprintf("processed-%v", item), nil
}

//writer
type myWriter struct {
}
func (r *myWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
	fmt.Printf("write: %v\n", items)
	return nil
}

func main()  {
	//set db for gobatch to store job&step execution context
	db, err := sql.Open("mysql", "gobatch:gobatch123@tcp(127.0.0.1:3306)/gobatch?charset=utf8&parseTime=true")
	if err != nil {
		panic(err)
	}
	gobatch.SetDB(db)

	//build steps
	step1 := gobatch.NewStep("mytask").Handler(mytask).Build()
	//step2 := gobatch.NewStep("my_step").Handler(&myReader{}, &myProcessor{}, &myWriter{}).Build()
	step2 := gobatch.NewStep("my_step").Reader(&myReader{}).Processor(&myProcessor{}).Writer(&myWriter{}).ChunkSize(10).Build()

	//build job
	job := gobatch.NewJob("my_job").Step(step1, step2).Build()

	//register job to gobatch
	gobatch.Register(job)

	//run
	//gobatch.StartAsync(context.Background(), job.Name(), "")
	gobatch.Start(context.Background(), job.Name(), "")
}

该示例代码位于 test/example.go

编写简单步骤

有多种方法编写简单步骤的逻辑,如下:

// 1. write a function with one of the following signature
func(execution *StepExecution) BatchError
func(execution *StepExecution)
func() error
func()

// 2. implement the Handler interface
type Handler interface {
	Handle(execution *StepExecution) BatchError
}

当你使用以上函数定义或接口定义编写好了业务逻辑,则可以通过以下方式构造Step对象:

step1 := gobatch.NewStep("step1").Handler(myfunction).Build()
step2 := gobatch.NewStep("step2").Handler(myHandler).Build()
//or
step1 := gobatch.NewStep("step1", myfunction).Build()
step2 := gobatch.NewStep("step2", myHandler).Build()

编写分块步骤

分块步骤需要实现以下3个接口(其中,只有Reader是必须实现的):

type Reader interface {
    //Read each call of Read() will return a data item, if there is no more data, a nil item will be returned.
    Read(chunkCtx *ChunkContext) (interface{}, BatchError)
}
type Processor interface {
    //Process process an item from reader and return a result item
    Process(item interface{}, chunkCtx *ChunkContext) (interface{}, BatchError)
}
type Writer interface {
    //Write write items generated by processor in a chunk
    Write(items []interface{}, chunkCtx *ChunkContext) BatchError
}

框架还包含一个ItemReader接口,在某些情况下,可以用于代替Reader,其定义如下:

type ItemReader interface {
    //ReadKeys read all keys of some kind of data
    ReadKeys() ([]interface{}, error)
    //ReadItem read value by one key from ReadKeys result
    ReadItem(key interface{}) (interface{}, error)
}

为了方便起见,可以通过实现以下接口,在Reader或Writer中执行一些初始化或清理的动作:

type OpenCloser interface {
	Open(execution *StepExecution) BatchError
	Close(execution *StepExecution) BatchError
}

示例代码可以参考 test/example2

编写分区步骤

分区步骤必须要实现Partitioner接口,该接口用于将整个步骤要处理的数据分成多个分区,每个分区对应一个子步骤,框架会启动多个线程来并行执行多个子步骤。如果需要对子步骤的执行结果进行合并,还需要实现Aggregator接口。这两个接口定义如下:

type Partitioner interface {
	//Partition generate sub step executions from specified step execution and partitions count
	Partition(execution *StepExecution, partitions uint) ([]*StepExecution, BatchError)
	//GetPartitionNames generate sub step names from specified step execution and partitions count
	GetPartitionNames(execution *StepExecution, partitions uint) []string
}

type Aggregator interface {
    //Aggregate aggregate result from all sub step executions
    Aggregate(execution *StepExecution, subExecutions []*StepExecution) BatchError
}

对于分区步骤的子步骤来说,既可以是一个简单步骤(由Handler定义),也可以是一个分块步骤(通过Reader/Processor/Writer定义)。 如果已有了一个包含ItemReader的分块步骤,则可以通过指定分区数量就可以构造分区步骤,如下:

    step := gobatch.NewStep("partition_step").Handler(&ChunkHandler{db}).Partitions(10).Build()

这种方式是由GoBatch框架内部基于ItemReader实现了Partitioner。

读写文件

我们假定有一个文件的内容如下(其中每行是一条记录,每个字段用'\t'分隔):

trade_1	account_1	cash	1000	normal	2022-02-27 12:12:12
trade_2	account_2	cash	1000	normal	2022-02-27 12:12:12
trade_3	account_3	cash	1000	normal	2022-02-27 12:12:12
……

如果想读取该文件的内容,并将文件中每条记录插入到数据库中的 t_trade 表中,则可以通过以下方式来实现:

type Trade struct {
    TradeNo   string    `order:"0"`
    AccountNo string    `order:"1"`
    Type      string    `order:"2"`
    Amount    float64   `order:"3"`
    TradeTime time.Time `order:"5"`
    Status    string    `order:"4"`
}

var tradeFile = file.FileObjectModel{
    FileStore:     &file.LocalFileSystem{},
    FileName:      "/data/{date,yyyy-MM-dd}/trade.data",
    Type:          file.TSV,
    Encoding:      "utf-8",
    ItemPrototype: &Trade{},
}

type TradeWriter struct {
    db *gorm.DB
}

func (p *TradeWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
    models := make([]*Trade, len(items))
    for i, item := range items {
        models[i] = item.(*Trade)
    }
    e := p.db.Table("t_trade").Create(models).Error
    if e != nil {
        return gobatch.NewBatchError(gobatch.ErrCodeDbFail, "save trade into db err", e)
    }
    return nil
}

func buildAndRunJob() {
    //...
    step := gobatch.NewStep("trade_import").ReadFile(tradeFile).Writer(&TradeWriter{db}).Partitions(10).Build()
    //...
    job := gobatch.NewJob("my_job").Step(...,step,...).Build()
    gobatch.Register(job)
    gobatch.Start(context.Background(), job.Name(), "{\"date\":\"20220202\"}")
}

再假定我们需要将 t_trade 表中的数据导出为一个csv文件,可以按照以下方式来实现:

type Trade struct {
    TradeNo   string    `order:"0" header:"trade_no"`
    AccountNo string    `order:"1" header:"account_no"`
    Type      string    `order:"2" header:"type"`
    Amount    float64   `order:"3" header:"amount"`
    TradeTime time.Time `order:"5" header:"trade_time" format:"2006-01-02_15:04:05"`
    Status    string    `order:"4" header:"trade_no"`
}

var tradeFileCsv = file.FileObjectModel{
    FileStore:     &file.LocalFileSystem{},
    FileName:      "/data/{date,yyyy-MM-dd}/trade_export.csv",
    Type:          file.CSV,
    Encoding:      "utf-8",
    ItemPrototype: &Trade{},
}


type TradeReader struct {
    db *gorm.DB
}

func (h *TradeReader) ReadKeys() ([]interface{}, error) {
    var ids []int64
    h.db.Table("t_trade").Select("id").Find(&ids)
    var result []interface{}
    for _, id := range ids {
        result = append(result, id)
    }
    return result, nil
}

func (h *TradeReader) ReadItem(key interface{}) (interface{}, error) {
    id := int64(0)
    switch r := key.(type) {
    case int64:
        id = r
    case float64:
        id = int64(r)
    default:
        return nil, fmt.Errorf("key type error, type:%T, value:%v", key, key)
    }
    trade := &Trade{}
    result := h.db.Table("t_trade").Find(trade, "id = ?", id)
    if result.Error != nil {
        return nil, result.Error
    }
    return trade, nil
}

func buildAndRunJob() {
    //...
    step := gobatch.NewStep("trade_export").Reader(&TradeReader{db}).WriteFile(tradeFileCsv).Partitions(10).Build()
    //...
}

监听器

框架提供了多种监听器,用于处理整个批处理任务和步骤执行过程中的各种事件,如下:

type JobListener interface {
	BeforeJob(execution *JobExecution) BatchError
	AfterJob(execution *JobExecution) BatchError
}

type StepListener interface {
	BeforeStep(execution *StepExecution) BatchError
	AfterStep(execution *StepExecution) BatchError
}

type ChunkListener interface {
	BeforeChunk(context *ChunkContext) BatchError
	AfterChunk(context *ChunkContext) BatchError
	OnError(context *ChunkContext, err BatchError)
}

type PartitionListener interface {
	BeforePartition(execution *StepExecution) BatchError
	AfterPartition(execution *StepExecution, subExecutions []*StepExecution) BatchError
	OnError(execution *StepExecution, err BatchError)
}

可以构建任务时指定监听器,示例如下:

func buildAndRunJob() {
    //...
    step := gobatch.NewStep("my_step").Handler(handler,...).Listener(listener,...).Build()
    //...
    job := gobatch.NewJob("my_job").Step(step,...).Listener(listener,...).Build()
}

全局设置

指定DB实例

GoBatch框架需要使用数据库来存储任务和步骤执行过程中的上下文信息,因此在启动任务之前,必须注册一个 *sql.DB 实例到GoBatch中,如下:

    gobatch.SetDB(sqlDb)

指定事务管理器

如果需要使用分块步骤,则必须设置一个事务管理器(TransactionManager)到GoBatch,事务管理器接口定义如下:

type TransactionManager interface {
	BeginTx() (tx interface{}, err BatchError)
	Commit(tx interface{}) BatchError
	Rollback(tx interface{}) BatchError
}

GoBatch框架包含一个默认的事务管理器,类名DefaultTxManager,如果已经设置了DB实例且尚未设置TransactionManager,则 GoBatch 会自动创建一个 DefaultTxManager 实例。当然,用户也可以指定自己的事务管理器来代替默认实现:

  gobatch.SetTransactionManager(&CustomTransactionManager{})

设置最大并发任务数和最大并发步骤数

GoBatch 内部使用池化技术来运行任务和步骤。默认最大并发任务数和最大并发步骤数分别是10、1000,如果需要修改默认值,则设置如下:

    gobatch.SetMaxRunningJobs(100)
    gobatch.SetMaxRunningSteps(5000)
MIT License Copyright (c) 2021 chararch Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

简介

GoBatch is a batch processing framework in Go like Spring Batch in Java 展开 收起
Go
MIT
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Go
1
https://gitee.com/chararch/gobatch.git
git@gitee.com:chararch/gobatch.git
chararch
gobatch
gobatch
master

搜索帮助