Skip to content

wushilin/threads

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

27 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

I have re-written this with Generics, Yay!

Thanks to go generics, the code does not use interface{} now!

However, due to Golang generics limitation (methods can't introduce new type parameter), so instead of

pool.Submit(func() interface{})

We now have to use

threads.SubmitTask(pool, func() T)

I personally don't think this is a big deal

Threads

Starting go routine in golang is far too easy, and it is cheap! However, generally it is still a a bad idea to allow go routine to grow uncontrolled. Hence sometimes you prefer a thread pool concept just like java. Here you have it

Install

go get github.com/wushilin/threads

Documentation

$ godoc -http=":16666"

Browse http://localhost:16666

Usage

Thread Pool API

Import

import "github.com/wushilin/threads"

Creating a thread pool, with 30 executors and max of 1 million pending jobs

var thread_pool *threads.ThreadPool = threads.NewPool(30, 1000000)
// Note that if pending job is more than 1000000, the new submission (call to Submit) will be blocked
// until the job queue has some space.

// thread_pool.Start() must be called. Without this, threads won't start processing jobs
// You now can't submit jobs before pool is started because it may cause dead lock if the buffer is not enough.
thread_pool.Start()

// After thread_pool is started, there is 30 go routines in background, processing jobs

Submiting a job and gets a Future

var fut future.Future = thread_pool.SubmitTask(thread_poo, func() int {
  return  1 + 6
})

// Here, submited func that returns a value. The func will be executed by a backend processor
// where there is free go routine. The submission returns a *threads.Future, which can be used
// to retrieve the returned value from the func. 
// e.g. 
// resultInt := fut.GetWait() // <= resultInt will be 7, and type is int. Thanks to genercs in go

Wait until the future is ready to be retrieve

result := fut.GetWait() // <= result will be 7
fmt.Printf("Result of 1 + 6 is %d", result)
// Wait until it is run and result is ready

// or if you prefer no blocking, call returns immediately, but may contain no result
ok, result := fut.GetNow()
if ok {
  // result is ready
  fmt.Println("Result of 1 + 6 is", result) // <= result will be 7
} else {
  fmt.Println("Result is not ready yet")
}

// or if you want to wait for max 3 seconds
ok, result := fut.GetTimeout(3*time.Second)
if ok {
  // result is ready
  fmt.Println("Result of 1 + 6 is", result) // <= result will be 7
} else {
  fmt.Println("Result is not ready yet") // <= timed out after 3 seconds
}

Stop accepting new jobs

// once shutdown, you can't re-start it back
thread_pool.Shutdown()
// Now thread_pool can't submit new jobs. All existing submited jobs will be still processed
// The future previous returned will still materialize

// Wait until all jobs to complete. Calling Wait() on non-shutdown thread pool will be blocked forever
thread_pool.Wait() 
// You can't call Wait() before you call Shutdown because it may cause dead lock
// after this call, all futures should be able to be retrieved without delay
// You can safely disregard this thread_pool after this call. It is useless anyway

Getting stats of this pool

thread_pool.ActiveCount() // active jobs - being executed right now
thread_pool.PendingCount() // pending count - not started yet
thread_pool.CompletedCount() //jobs done - result populated already

Convenient wrapper to do multiple tasks in parallel

jobs := make([]func() int, 60)
//... populate the jobs with actual jobs
// This will start as many threads as possible to run things in parallel
var fg *threads.FutureGroup = threads.ParallelDo(jobs)

// This will start at most 10 threads for parallel processing
var fg *threads.FutureGroup = threads.ParallelDoWithLimit(jobs, 10)

// retrieve futures, wait for all and get result!
var results []int = fg.WaitAll()

// If you prefer more flexible handling... - you get a copy of the array
var []future.Future futures = fg.Futures()

Interesting future concepts

see github.com/wushilin/future

fut.Then(print) => print function is called with argument of Future's value, when value become available

fut.Then(print).Then(save) => multiple then functions can be called

fut := SubmitTask(thread_pool, func() int {
	return 5
})
fut2 := future.Chain(fut, func(i int) string {
	return fmt.Sprintf("Student #%d", i)
}

//fut2 is a future of "string" instead of "int" now. 

fut2.Then(print)
// print fut2 when it is available

fut3 := future.DelayedFutureOf("hello how are you", 3 * time.Second) => fut3 is available after 3 seconds

Future Group now supports

FutureGroup.Count() // Count the number of futures in the group
FutureGroup.Futures() // Get a copy of futures (not the underlying future directly)
FutureGroup.ReadyCount() // Check how many of futures are ready
FutureGroup.IsAllReady() // Test if all results are present (non-blocking)
FutureGroup.ThreadPool() // returns original thread pool that produced the future group. You may want to call its Wait() methods (but usually not necessary)