Skip to content

Commit

Permalink
simplify mapreduce code
Browse files Browse the repository at this point in the history
  • Loading branch information
kevwan committed Sep 16, 2020
1 parent 68335ad commit df37597
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 16 deletions.
21 changes: 5 additions & 16 deletions core/mr/mapreduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{}
collector := make(chan interface{}, options.workers)
done := syncx.NewDoneChan()

go mapDispatcher(mapper, source, collector, done.Done(), options.workers)
go executeMappers(mapper, source, collector, done.Done(), options.workers)

return collector
}
Expand Down Expand Up @@ -126,7 +126,10 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
reducer(collector, writer, cancel)
drain(collector)
}()
go mapperDispatcher(mapper, source, collector, done.Done(), cancel, options.workers)

go executeMappers(func(item interface{}, writer Writer) {
mapper(item, writer, cancel)
}, source, collector, done.Done(), options.workers)

value, ok := <-output
if err := retErr.Load(); err != nil {
Expand Down Expand Up @@ -226,20 +229,6 @@ func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- i
}
}

func mapDispatcher(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},
done <-chan lang.PlaceholderType, workers int) {
executeMappers(func(item interface{}, writer Writer) {
mapper(item, writer)
}, input, collector, done, workers)
}

func mapperDispatcher(mapper MapperFunc, input <-chan interface{}, collector chan<- interface{},
done <-chan lang.PlaceholderType, cancel func(error), workers int) {
executeMappers(func(item interface{}, writer Writer) {
mapper(item, writer, cancel)
}, input, collector, done, workers)
}

func newOptions() *mapReduceOptions {
return &mapReduceOptions{
workers: defaultWorkers,
Expand Down
1 change: 1 addition & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ go get -u github.com/tal-tech/go-zero
* [通过MapReduce降低服务响应时间](doc/mapreduce.md)
* [关键字替换和敏感词过滤工具](doc/keywords.md)
* [进程内缓存使用方法](doc/collection.md)
* [防止缓存击穿之进程内共享调用](doc/sharedcalls.md)
* [基于prometheus的微服务指标监控](doc/metric.md)
* [文本序列化和反序列化](doc/mapping.md)

Expand Down

0 comments on commit df37597

Please sign in to comment.