Skip to content

Latest commit

 

History

History
285 lines (248 loc) · 11.8 KB

Coroutine Delay.md

File metadata and controls

285 lines (248 loc) · 11.8 KB

아래와 같은 timeout(혹은 delay)가 내부적으로 어떻게 동작하는지 알아보자.

GlobalScope.launch {
    withTimeout(10) {
        ...
    }
}

일단 withTimeout의 코드를 살펴보자면 아래와 같다.

가장 아랫부분을 보면 time이 실제로 track되는 것은 CoroutineDispatcher의 구현이라고 적혀있다.

Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].

/**
 * Runs a given suspending [block] of code inside a coroutine with a specified [timeout][timeMillis] and throws
 * a [TimeoutCancellationException] if the timeout was exceeded.
 *
 * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
 * the cancellable suspending function inside the block throws a [TimeoutCancellationException].
 *
 * The sibling function that does not throw an exception on timeout is [withTimeoutOrNull].
 * Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
 *
 * **The timeout event is asynchronous with respect to the code running in the block** and may happen at any time,
 * even right before the return from inside of the timeout [block]. Keep this in mind if you open or acquire some
 * resource inside the [block] that needs closing or release outside of the block.
 * See the
 * [Asynchronous timeout and resources][https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html#asynchronous-timeout-and-resources]
 * section of the coroutines guide for details.
 *
 * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
 *
 * @param timeMillis timeout time in milliseconds.
 */
public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    if (timeMillis <= 0L) throw TimeoutCancellationException("Timed out immediately")
    return suspendCoroutineUninterceptedOrReturn { uCont ->
        setupTimeout(TimeoutCoroutine(timeMillis, uCont), block)
    }
}
TimeoutCoroutine
private class TimeoutCoroutine<U, in T: U>(
    @JvmField val time: Long,
    uCont: Continuation<U> // unintercepted continuation
) : ScopeCoroutine<T>(uCont.context, uCont), Runnable {

    // 이 run은 runnable에서 상속받은 메서드이고, cancel하기 위해 사용된다.
    override fun run() {
        cancelCoroutine(TimeoutCancellationException(time, this))
    }

    override fun nameString(): String =
        "${super.nameString()}(timeMillis=$time)"
}

public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
    final override val key: CoroutineContext.Key<*> get() = Job

    ...
    public fun cancelCoroutine(cause: Throwable?): Boolean = cancelImpl(cause)

    // TimeoutCoroutine에서 run을 호출하면, 여기로 와서 적절히 cancel된다.
    // cause is Throwable or ParentJob when cancelChild was invoked
    // returns true is exception was handled, false otherwise
    internal fun cancelImpl(cause: Any?): Boolean {
        var finalState: Any? = COMPLETING_ALREADY
        if (onCancelComplete) {
            // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
            // completing and had recorded exception
            finalState = cancelMakeCompleting(cause)
            if (finalState === COMPLETING_WAITING_CHILDREN) return true
        }
        if (finalState === COMPLETING_ALREADY) {
            finalState = makeCancelling(cause)
        }
        return when {
            finalState === COMPLETING_ALREADY -> true
            finalState === COMPLETING_WAITING_CHILDREN -> true
            finalState === TOO_LATE_TO_CANCEL -> false
            else -> {
                afterCompletion(finalState)
                true
            }
        }
    }
    ...
}
private fun <U, T: U> setupTimeout(
    coroutine: TimeoutCoroutine<U, T>,
    block: suspend CoroutineScope.() -> T
): Any? {
    
    // schedule cancellation of this coroutine on time
    val cont = coroutine.uCont
    val context = cont.context
    coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context)) // <--

    // restart the block using a new coroutine with a new job,
    // however, start it undispatched, because we already are in the proper context
    return coroutine.startUndispatchedOrReturnIgnoreTimeout(coroutine, block)
}

context.delay를 가져와서 invokeOnTimeout를 호출한다. 그리고 block을 restart해서 새로운 job을 가지도록 만든다. (delay 후 다시 시작되어야하기 떄문)

/** Returns [Delay] implementation of the given context */
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay

internal actual val DefaultDelay: Delay = initializeDefaultDelay()

private fun initializeDefaultDelay(): Delay {
    // Opt-out flag
    if (!defaultMainDelayOptIn) return DefaultExecutor
    val main = Dispatchers.Main
    /*
     * When we already are working with UI and Main threads, it makes
     * no sense to create a separate thread with timer that cannot be controller
     * by the UI runtime.
     */
    return if (main.isMissing() || main !is Delay) DefaultExecutor else main // <--
}

CoroutineContext.delay는 기본적으로 DefaultExecutor라는 object를 가져온다.

DefaultExecuter

internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {

    override fun run() {
        // TreadLocalEventLoop에 등록되어 동작한다.
        // TreadLocalEventLoop는 @ThreadLocal이 달려있고, object로 선언되어있어 모든 스레드마다 각 1개씩 생성된다.
        // eventLoop로 등록되므로 일정 주기마다 실행된다. (@InternalCoroutinesApi)
        ThreadLocalEventLoop.setEventLoop(this) 
        ...
    }

    // timeout일때 block을 run 시켜준다.
    override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
        scheduleInvokeOnTimeout(timeMillis, block)

    protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
        val timeNanos = delayToNanos(timeMillis)
        return if (timeNanos < MAX_DELAY_NS) {
            val now = nanoTime()
            DelayedRunnableTask(now + timeNanos, block).also { task ->
                schedule(now, task)
            }
        } else {
            NonDisposableHandle
        }
    }

    // 이 부분은 delayedQueue가 있는지 확인하고, task를 받았을때 스케줄링해주는 함수이다.
    // 위를 보면 알겠지만 DelayedRunnableTask에 람다로 들어간다.
    public fun schedule(now: Long, delayedTask: DelayedTask) {
        when (scheduleImpl(now, delayedTask)) {
            SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
            SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
            SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
            else -> error("unexpected result")
        }
    }
    private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
        if (isCompleted) return SCHEDULE_COMPLETED
        val delayedQueue = _delayed.value ?: run {
            _delayed.compareAndSet(null, DelayedTaskQueue(now))
            _delayed.value!!
        }
        return delayedTask.scheduleTask(now, delayedQueue, this)
    }
    ...
}

DelayedRunnableTask와 DelayedTask

private class DelayedRunnableTask(
    nanoTime: Long,
    private val block: Runnable
) : DelayedTask(nanoTime) { // <--
    override fun run() { block.run() }
    override fun toString(): String = super.toString() + block.toString()
}
    internal abstract class DelayedTask(
        /**
         * This field can be only modified in [scheduleTask] before putting this DelayedTask
         * into heap to avoid overflow and corruption of heap data structure.
         */
        @JvmField var nanoTime: Long
    ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
        @Volatile
        private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK

        override var heap: ThreadSafeHeap<*>?
            get() = _heap as? ThreadSafeHeap<*>
            set(value) {
                require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
                _heap = value
            }

        override var index: Int = -1

        // EventLoop#DefaultExecutor()에서 DelayedTask(DisposableHandle)를 반환하면 이 메서드를 통해 비교되고 실행된다.
        override fun compareTo(other: DelayedTask): Int {
            val dTime = nanoTime - other.nanoTime
            return when {
                dTime > 0 -> 1
                dTime < 0 -> -1
                else -> 0
            }
        }

        fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L

        // 실제로 스케줄링 하는 부분
        @Synchronized
        fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int {
            if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
            delayed.addLastIf(this) { firstTask ->
                if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
                /**
                 * We are about to add new task and we have to make sure that [DelayedTaskQueue]
                 * invariant is maintained. The code in this lambda is additionally executed under
                 * the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
                 */
                if (firstTask == null) {
                    /**
                     * When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
                     * the current now time even if that means "going backwards in time". This makes the structure
                     * self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
                     * are removed from the delayed queue for execution.
                     */
                    delayed.timeNow = now
                } else {
                    /**
                     * Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
                     * and only goes forward in time. We cannot let it go backwards in time or invariant can be
                     * violated for tasks that were already scheduled.
                     */
                    val firstTime = firstTask.nanoTime
                    // compute min(now, firstTime) using a wrap-safe check
                    val minTime = if (firstTime - now >= 0) now else firstTime
                    // update timeNow only when going forward in time
                    if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
                }
                /**
                 * Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
                 * task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
                 * function can be called to reschedule from one queue to another and this might be another reason
                 * where new task's time might now violate invariant.
                 * We correct invariant violation (if any) by simply changing this task's time to now.
                 */
                if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
                true
            }
            return SCHEDULE_OK
        }
    }