-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fibers not running in parallel #34
Comments
@ZelphirKaltstahl Sorry, long story made short, I made all my repositories private on github. The code you are referring to is: ;; Pool of workers that can be used to execute blocking operation in a
;; fibers application.
(define-module (babelia pool))
(import (ice-9 match))
(import (ice-9 q))
(import (ice-9 threads))
(import (srfi srfi-9))
(import (srfi srfi-1))
(import (fibers))
(import (fibers channels))
(import (fibers operations))
(import (babelia thread))
(import (babelia okvs ulid))
(import (babelia log))
(define %channel #f)
(define worker-count (- (current-processor-count) 1))
(define (worker channel)
(parameterize ((thread-index (random-bytes 2)))
(let ((worker (make-channel)))
(let loop ()
(put-message channel (cons 'worker worker))
(let* ((work (get-message worker))
(thunk (car work))
(return (cdr work))
;; Execute thunk and send the returned value. XXX: To be able
;; to keep track of jobs, the channel called `return`, is put
;; in itself. See procedure pool-for-each-par-map.
;; TODO: add a call-with-values
(out (thunk)))
(put-message return (cons return out)))
(loop)))))
(define (arbiter channel)
(let ((works (make-q))
(workers (make-q)))
(let loop ((message (get-message channel)))
(match message
(('worker . worker)
(if (q-empty? works)
(enq! workers worker)
(let ((work (deq! works)))
(put-message worker work))))
(('work . work)
(if (q-empty? workers)
(enq! works work)
(let ((worker (deq! workers)))
(put-message worker work))))
(_ (raise (cons 'fuu message))))
(loop (get-message channel)))))
(define-public (pool-init)
(if %channel
(error 'babelia "pool can not be initialized more than once")
(let ((channel (make-channel)))
(log-debug "pool init")
(set! %channel channel)
(let loop ((index worker-count))
(unless (zero? index)
(call-with-new-thread (lambda () (worker channel)))
(loop (- index 1))))
(arbiter channel))))
(define (publish thunk)
(let ((return (make-channel)))
(put-message %channel (cons 'work (cons thunk return)))
return))
(define-public (pool-apply thunk)
"Execute THUNK in a worker thread.
Pause the calling fiber until the result is available."
(cdr (get-message (publish thunk))))
(define (select channels)
(perform-operation
(apply choice-operation (map get-operation channels))))
;; TODO: Maybe add a timeout argument, in order to be able to display
;; a nicer error.
(define-public (pool-for-each-par-map sproc pproc lst)
"For each item of LST execute (PPROC item) in a worker thread, and
gather returned value with SPROC. SPROC is executed in the calling
fiber.
This a POSIX thread pool based n-for-each-par-map for fibers. It is
somewhat equivalent to:
(for-each SSPROC (map PPROC LST))
But applications of PPROC happens in parallel and waiting for new
values is not blocking the main thread."
(let loop ((channels (map (lambda (item) (publish (lambda () (pproc item))))
lst)))
(unless (null? channels)
(match (select channels)
((channel . value)
(sproc value)
(loop (remove (lambda (x) (eq? x channel)) channels)))
(else (raise 'fuuubar)))))) |
In particular: (pool-for-each-par-map sproc pproc lst) Will only work if / when the count of jobs Anyway, in your case, and based on your messages on the mailing list, correct me if I am wrong: you want to compute in parallel a graph of procedures, where some procedures (define (pool-apply thunk)
"Execute THUNK in a worker thread.
Pause the calling fiber until the result is available."
(cdr (get-message (publish thunk)))) You can not use that as-is because it will pause the calling fiber, without giving you the change to execute a parallel computation. So what you need to do is: (define thunka-channel-result (publish THUNKA))
(define thunkb-channel-result (publish THUNKB))
(define result-thunka (get-message thunka-channel-result))
(define result-thunkb (get-message thunkb-channel-result)) And then possibly call (pool-apply (lambda () (PROCR result-thunka result-thunkb))) Maybe adding a @ZelphirKaltstahl do you have a repository I can look at? Let me know what you think. |
Hi @amirouche ! I do have a copy of your code in a file here on my machine : ) Thanks nevertheless. The code I posted is the code I derived from yours, by using fibers instead of threads for the workers. I want to create a repo for the pool. I guess it will be AGPL then, as your original code was AGPL as well, iirc. I will create a repo and link it here. The question I have now is, why those fibers I am using as workers are not running in parallel. I did make the scheduler with #:parallelism 2, but it does not work in parallel for some reason. At the very bottom of the code I posted you can see, that I am already publishing some busy work twice, before I call the get-message procedure on the channels, to make the pool start working, before requesting the answer. get-message is blocking, just like put message. That much I understand. Once I can get this running in parallel, I can make use of it in my decision tree parallelization attempt and avoid the apparently buggy parallel forms and finally have my decision tree algorithm run in parallel.
Yes, I think that is what I want to do. Furthermore in those THUNKA and THUNKB new thunks can be started, as there may be more layers in the tree. For example:
Each of those thunks on the lowest level can again spawn some thunk. Spawning a thunk here means to publish work for the fibers to finish. This recursive character is important. Repo added: https://notabug.org/ZelphirKaltstahl/guile-fibers-pool |
I realized, that my idea of running things recursively in parallel will not work with the code I wrote above. The case with 2 separate workers should still work in parallel according to my understanding of fibers. In the end I will probably end up spawning a new fiber for each task to be run in parallel and do away with the concept of a worker, which gets assigned new work and is not discarded. It seems impossible to use workers recursively, as they would block for child executions and not be available to receive and work on more work while waiting for a child execution to finish. I could not yet think of a way to avoid spawning an arbitrary number of execution units (threads, processes, fibers, whatever) and still perform the arbitrarily recursive parallel execution. That is for not making use of some kind of global state somewhere. It would still be great to learn, why the code I gave does not run in parallel, as it uses 2 fibers for 2 separate non-recursive tasks. There might be a bug or my understanding of how the fibers library works is simply wrong or I overlooked something that makes my code block even when only one of the workers is assigned work and there are 2 workers available. |
I republished https://github.com/amirouche/guile-babelia @ZelphirKaltstahl did you make any progress? |
@amirouche I have not worked on this any further. I completely forgot about this issue. I did have a nagging feeling everytime I thought about fibers, that there was something I needed to learn about them. I had a few small projects, where I used However, I still have not found a way (without introducing a global state wrapped in some mutex) to have recursive calls, each potentially spawning a fiber, but only as many as some maximum says. Well, except of course limiting the whole Guile process, which runs a program, so that the program itself does not need to impose the limits. But then I might spawn many futures, creating overhead, when I should only be spawning some workers, which are not discarded and receive work and deliver results, in order to reduce overhead. I have also not yet found a way to build any worker pool of fibers. I seem to remember, that you had some worker pool thing in guile-babelia going? Is that correct? |
It depends on the algorithm and its side effects:
(thread (run-fiber thunk)) The channels are bidirectional pipelines, on both sides they must be an operation that may run inside as a flow of execution after a Regarding the nesting of POSIX threads and fibers, given one-posix-thread-per-core, and three POSIX threads, with three fibers in each POSIX threads, it looks like: [to be continued]. |
Have you tried a queue implementing producer-consumer pattern, see this python code, the (define queue (make-channel))
(run-fibers
(spawn (lambda () (let ((msg (channel-receive queue))) (do-something-concurrently msg)))
(for-each (lambda (msg) (channel-send queue msg)) (iota 999999999)))) Fibers will pill up messages, there is no overflow checks last time I checked, so you need to monitor the app memory behavior. Channels use mutex and condition variable, and one of their advantage is that they avoid the need to use mutex and condvars. |
Hey, sorry, I don't know, how long it will take, until I will look at this project again. Might be soon, but also might be a year or two. So hopefully no one is waiting for anything and if anyone is, please don't =) |
I have some code in which fibers do not perform their work in parallel for some reason:
This procedure initializes a pool of fibers. The number of fibers depends on the
parallelism
keyword argument. Later on these fibers receive some work on a channel and return their result on a channel.My understanding is, that fibers of the same scheduler should be able to run in parallel, if the scheduler's
parallelism
keyword argument is> 1
. For some reason however, this does not happen and the fibers run sequentially, one after the other, when I give them work.Why do they not run in parallel?
Is it because the scheduler is run inside a
call-with-new-thread
? Does this limit the parallelism to1
? I need to run it incall-with-new-thread
, because I need to return thechannel-receive
, which will be input to the procedure, which gives work to a thing I calledwork-distributor
. If I cannot run a scheduler incall-with-new-thread
, how can I run a scheduler without blocking until all the fibers have completed their work?So far I have only guesses, why my code does not do computation in parallel.
For a complete example of sequentially running fibers, I will paste my whole code below. It is actually based on @amirouche 's thread pool for his babelia project (I currently cannot find it on Github any longer and do not know where to look for it.), only that I am trying to use fibers instead of threads to perform work, hoping to take advantage of work stealing and work sharing as well as running more lightweight than threads.
The code:
On my machine, this runs in sequence, rather than in parallel.
The text was updated successfully, but these errors were encountered: