동시에 읽고 쓰이는 공유 객체에서 스레드 동기화를 적절하게 사용하는 방법에 대해 알아보자.
- Atomic
- Synchronized block
- 스레드 한정
- Mutex
- Actor
CAS(Compare And Swap) 알고리즘을 사용한다.
원리를 간단하게 말하자면, 변경하려는 값과 현재 저장된 값을 비교해서 같으면 변경하고 다르면 값을 변경하지 못하게 하는 것이다. 변경하려는 값과 현재 저장된 값이 다른 경우는 중간에 다른 스레드에 의해 값이 변경된 경우로 볼 수 있기에 변경을 허용하지 않는다.
사용법은 아래와 같다.
suspend fun exampleFun(action: suspend () -> Unit) {
val n = 100
val k = 1000
val write = measureTimeMillis {
coroutineScope {
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("$write ms동안 ${n*k}개의 일을 했습니다.")
}
var counter = AtomicInteger(0)
fun main() = runBlocking {
withContext(Dispatchers.Default) {
exampleFun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
// 13 ms동안 100000개의 일을 했습니다.
// Counter = 100000
Synchronized는 객체가 가지는 LOCK의 속성을 이용해 단일 스레드만 객체에 접근하게 하는 방식이다. 아래의 예시를 보면 Counter 인스턴스(this
)를 Lock 파라미터로 설정해주어서 간단하게 사용한다. 하지만 이방식을 사용할 때 Lock을 걸었다 풀었다 하는 과정에서 더 많은 비용을 가진다고 한다. 또한 설계를 잘못하면 Deadlock에 빠질 수 있다.
class Counter {
fun plusCount() {
synchronized(this) {
counter++
}
}
}
suspend fun exampleFun(action: suspend () -> Unit) {
val n = 100
val k = 1000
val write = measureTimeMillis {
coroutineScope {
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("$write ms동안 ${n * k}개의 일을 했습니다.")
}
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
val c = Counter()
exampleFun {
c.plusCount()
}
}
println("Counter = $counter")
}
// 19 ms동안 100000개의 일을 했습니다.
// Counter = 100000
newSingleThreadContext
는 새로운 스레드를 만들고 그 스레드에서만 작업이 수행되도록 보장해주는 것이다.
suspend fun exampleFun(action: suspend () -> Unit) {
val n = 100
val k = 1000
val write = measureTimeMillis {
coroutineScope {
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("$write ms동안 ${n*k}개의 일을 했습니다.")
}
var counter = 0
val newContext = newSingleThreadContext("newContext")
fun main() = runBlocking {
withContext(Dispatchers.Default) {
exampleFun {
withContext(newContext){
counter++
}
}
}
println("Counter = $counter")
}
// 673 ms동안 100000개의 일을 했습니다.
// Counter = 100000
공유 상태를 수정할 때 임계 영역을 이용하게 하여, 임계영역을 동시에 접근하는 것을 허용하지 않는다. 뮤텍스는 앞서 보았던 synchronized block과 비슷하게 객체의 Lock을 이용하여 스레드 간 동기화를 처리한다. Mutex
는 스레드를 suspend 하고 synchronized block은 스레드를 block 하기 때문에 실제 성능은 더 좋다고 한다.
...
val mutex = Mutex()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
exampleFun {
mutex.withLock {
count++
}
}
}
println("Counter = $count")
}
// 336 ms동안 100000개의 일을 했습니다.
// Counter = 100000
액터가 독점적으로 자료를 가지며 그 자료를 다른 코루틴과 공유하지 않고 액터를 통해서만 접근하게 만든다.
액터는 내부적으로 ReceivedChannel
을 가지고 있다. 이를 통해 단일 스레드만 액터에 접근하게 하고 다른 스레드는 채널을 통해 상태를 수정하게 하여 스레드/코루틴 간의 동기화를 이루어낸다.
이 방법을 사용하려면 우선 sealed class를 만들어야 한다. sealed class는 외부에서 확장이 불가능하고 sealed class를 상속받은 클래스, 객체는 스레드에서 채널에게 상태를 수정해달라고 요청하는 일종의 신호로 사용된다. 아래는 그 예시이다.
suspend fun exampleFun(action: suspend () -> Unit) {
val n = 100
val k = 1000
val write = measureTimeMillis {
coroutineScope {
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("$write ms동안 ${n * k}개의 일을 했습니다.")
}
sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0
for (msg in channel) {
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
fun main() = runBlocking<Unit> {
val counter = counterActor()
withContext(Dispatchers.Default) {
exampleFun {
counter.send(IncCounter)
}
}
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close()
}