Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to specify the number of permits to acquire and release #1553

Closed
wants to merge 10 commits into from

Conversation

khovanskiy
Copy link

Bonus: This feature will help to emulate fair ReadWriteLock in coroutines:

val semaphore = Semaphore(Int.MAX_VALUE)
// read "lock" and "unlock"
semaphore.acquire(1)
try {
    // action
} finally {
    semaphore.release(1)
}
// write "lock" and "unlock"
semaphore.acquire(Int.MAX_VALUE)
try {
    // action
} finally {
    semaphore.release(Int.MAX_VALUE)
}

@elizarov
Copy link
Contributor

This implementation will not work properly. To see why, add tests that have multiple coroutines waiting a acquire a semaphore and one release that releases multiple permits. It should resume all of them, but it will not do it in this implementation.

@khovanskiy
Copy link
Author

The bug is fixed and the new one test is added.

@elizarov
Copy link
Contributor

This code will not work either. Add a test where there are zero remaining permits, then one coroutine is trying to acquire two (and so goes to the queue). Another one releases one permit. The first one should not be resumed (because it needs two permits), but will be resumed in this implementation.

@khovanskiy
Copy link
Author

I had to rewrite the logic of the main loop, but I totally documented its stages. Looks like it is still lock free and fast. The new test is added.

}

private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine<Unit> sc@ { cont ->
private suspend fun tryToAddToQueue(permits: Int) = suspendAtomicCancellableCoroutine<Unit> sc@{ cont ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how permits is used by this method. I'm very surprised that tests pass. Seems like some tests are missing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous commit didn't contain significant changes, sorry. Please, recheck.

@elizarov
Copy link
Contributor

This code would not work properly when multiple acquires are waiting, each one trying to acquire > 1 permit and we release permits one by one. There's no logic to check which ones should be resumed.

@khovanskiy
Copy link
Author

khovanskiy commented Sep 20, 2019

Sorry, I didn't commit all the code, and this led to a misunderstanding. Now all changes are pushed.

@elizarov
Copy link
Contributor

I have yet closely studied the correctness of this algorithm, but one thing I can tell in advance -- this implementation significantly increases overhead of a simple one-permit acquire/release case by allocating additional Slot object, which would make it a no-go, since it defeats the purposes of having an efficient semaphore implementation in the library in the first place. It has to be either adapted to work as efficiently as the previous implementation on its fast-path (with one permit) or moved into a separate data-structure. A performance test (a benchmark) would also be needed.

(cont as CancellableContinuation<Unit>).resume(Unit)
internal fun tryToResumeFromQueue(permits: Int) {
accumulator.getAndAdd(permits) // add thread permits to common accumulator
var remain = accumulator.getAndSet(0) // try to take possession of all the accumulated permits at the moment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ The pair of getAndAdd and getAndAdd is not efficient. Should be replaced with one atomic CAS-loop.

@khovanskiy
Copy link
Author

khovanskiy commented Sep 21, 2019

I replaced the Slot class with atomic integers, so the overhead was minimised.

The benchmark is executed with existed stress tests. Each test was run (N = 1 million) at the following configuration (Intel Q9550, 8Gb DDR2 800).

Test Original semaphore New semaphore
stressTest 16 s 16 s
stressCancellation 69 s 71 s
stressTestAsMutex 19 s 19 s
stressReleaseCancelRace 56 s 57 s

Also I noticed that teamcity build is failed due to changes in methods' signatures in the Semaphore interface (PublicApiTest). What do I need to do to fix this? Add different methods with and without parameter instead of the default parameter or just update kotlinx-coroutines-core.txt file?

…s-number

# Conflicts:
#	kotlinx-coroutines-core/common/src/sync/Semaphore.kt
@khovanskiy khovanskiy requested a review from elizarov September 28, 2019 17:26
@qwwdfsad qwwdfsad force-pushed the develop branch 3 times, most recently from 4a49830 to aff8202 Compare March 10, 2020 17:27
@khovanskiy
Copy link
Author

khovanskiy commented Mar 21, 2020

I fixed some merge conflicts and update public API via apiDump. Is there any chance to push this merge request into future releases? If no, I will not continue to support this branch compatibility with the actual "develop" branch.

@elizarov
Copy link
Contributor

The feature seems useful, but so far I'm skeptical that this implementation can be made truly correct (linearizable). I am unable to prove its correctness by looking at the code and we still miss tests that would be checking Semaphore's correctness under a wide range of scenarios. As our experience shows, simple tests like the ones added here, are not enough for concurrent algorithms of such complexity. I'm waiting for #1737 to be delivered first.

@ndkoval
Copy link
Member

ndkoval commented Apr 29, 2020

@khovanskiy we have added a lincheck test (see #1898), found a couple of bugs, and fixed them. Thus, I am almost sure that your current solution is non-linearizable since it is based on bugged implementation. Could you please rebase onto develop and modify the lincheck test by adding the number of permits to be acquired/released as operation parameters if you are still interested in this PR?

@qwwdfsad
Copy link
Collaborator

We are currently in a big rush in rewriting all the synchronization primitives (#2045) and are not ready to either guide new contributors through the whole new codebase (and also invest a lot of our effort to code style and proper review iterations. It's typically not a big deal for the rest of kotlinx.coroutines, but highly-polished and complex lock-free and linearizable code is a completely different story in terms of time and effort) or to merge it as is.

Thanks for your effort and sorry for the inconvenience

@qwwdfsad qwwdfsad closed this Feb 15, 2021
@qwwdfsad qwwdfsad mentioned this pull request Sep 13, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants