-
Notifications
You must be signed in to change notification settings - Fork 60
Concurrent mirror services #17
base: master
Are you sure you want to change the base?
Conversation
end | ||
threads.collect { |thread| thread.join } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Deleted my previous comment because I realized it was wrong after I posted it.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eliminate the local variable:
services.collect do |service|
Thread.new { service.public_send method, *args }
end.each(&:join)
Thanks! I don’t see a need to test that the service is threaded. It’s enough for its existing tests to pass. |
I'll do it later because my flight is about to take off. 😀 Sent from my LGE Nexus 5X using FastHub |
Just pushed the edited version. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that we can do concurrent uploads, too. Check out https://github.com/lloeki/ruby-tee/blob/master/lib/tee.rb for a nice approach.
Generally speaking, passing around IOs forces us into a "pull" execution model: read from this stream indiscriminately. Makes it awkward to push—or "pump"—a chunked input stream out through multiple channels concurrently, like one to calculate a streaming digest and two more to upload to storage services.
service.public_send method, *args | ||
end | ||
Thread.new { service.public_send method, *args } | ||
end.map(&:join) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use concurrent-ruby's promises (or a lower level thread pool executor). Check out its usage in Action Cable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed the code to use thread pool executor, and now it looks like this:
def perform_across_services(method, *args)
pool = Concurrent::ThreadPoolExecutor.new(max_threads: @services.count)
# pool is local only for the example purpose
services.each do |service|
pool.post { service.public_send method, *args }
end
end
But this causes troubles with the test because threads do not join.
I can make them pass by sleeping for a while, but this does not seem like a good solution to the problem. Can you advise me how to handle this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @jeremy suggested, consider using promises:
def perform_across_services(method, *args)
promises = services.collect do |service|
Concurrent::Promise.execute { service.public_send method, *args }
end
Concurrent::Promise.zip(promises).value
end
89b8e94
to
a5e4715
Compare
Thank you for the advice. Just pushed version with promises but I'm using map to get all of the values because there is seems to be a problem with the zip method. It produces promise which rejected, even if all of the given promises are fulfilled. |
Sorry, my example was incorrect. You’ll need to splat the promises you pass to def perform_across_services(method, *args)
promises = services.collect do |service|
Concurrent::Promise.execute { service.public_send method, *args }
end
Concurrent::Promise.zip(*promises).value
end |
Looking good! Are you up for implementing concurrent uploads, too? That’d add the most value, since deletion will usually be fast. |
Yes, I can start working on it. |
@@ -1,4 +1,5 @@ | |||
require "active_support/core_ext/module/delegation" | |||
require "concurrent" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can tighten to require "concurrent/promise"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add concurrent-ruby to the gemspec, too, rather than rely on transitive dep via Active Support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll do it.
end | ||
Concurrent::Promise.zip(*promises).value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes one wish for a simple services.pmap do |service|
/cc @jdantonio 😊
@jeremy I think to add queues for service operations because concurrency causes some problems. E.g. if we have the following code: service.upload(key, ...)
service.delete(key)
service.exists?(key)
|
Active Storage won’t, by itself, check the existence of or delete a key while uploading data for it, so we can ignore those issues. |
Ok, but how to test this behaviour? |
No need to, IMO. It’s still enough for the service’s existing tests to pass. |
This code works, but a few tests do not pass with the current mirror configuration. I don't know why but when disc services are used tests fail. When I configure test with GCS, everything passes. |
each_service.collect do |service| | ||
service.upload key, io.tap(&:rewind), checksum: checksum | ||
end | ||
perform_across_services :upload, key, io.tap(&:rewind), checksum: checksum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn’t going to work. We can’t share a single IO between concurrent uploads. Jeremy pointed you in the right direction above.
each_service.collect do |service| | ||
service.upload key, io.tap(&:rewind), checksum: checksum | ||
perform_async_across_services do |service| | ||
service.upload key, StringIO.new(io.tap(&:rewind).read), checksum: checksum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm almost sure that this is not the best way to create new IO instance but clone
or dup
don't work. When any method is called inside upload
it mutates all IO instances.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I said we can’t just share a single IO instance between multiple threads, so cloning the given IO instance for each thread was a sensible thing to try. But we can’t do that, either: each clone will share the same underlying file descriptor.
So what can we do? Jeremy pointed you to ruby-tee. We probably can’t use it as-is, but its implementation is a good example of a way to do exactly what we want to do: safely share a single IO between multiple threads.
lib/active_storage/async_uploader.rb
Outdated
attr_reader :promise | ||
|
||
def initialize service, key, checksum: nil | ||
@data = "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure if using a string as a buffer, instead of an enumerator, is a problem. I'm also open for a new class name ideas.
end | ||
io.rewind | ||
while chunk = io.read(1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the chunk size but I don't know where to places the constant. In MirrorService or in the AsyncUploader?
@georgeclaghorn can you review the PR, please? |
The tests pass when mirror service is defined, in the config file, but I'm not quite sure how to test the threading itself.