Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run suspending calls on Dispatchers.Default #3693

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions retrofit/kotlin-test/src/test/java/retrofit2/KotlinSuspendTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package retrofit2

import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
Expand Down Expand Up @@ -353,6 +354,32 @@ class KotlinSuspendTest {
}
}

@Test fun usesCoroutineContextForCallFactory() {
val okHttpClient = OkHttpClient()
var callFactoryThread: Thread? = null
val outerContextThread: Thread
val retrofit = Retrofit.Builder()
.baseUrl(server.url("/"))
.callFactory {
callFactoryThread = Thread.currentThread()
okHttpClient.newCall(it)
}
.addConverterFactory(ToStringConverterFactory())
.build()
val example = retrofit.create(Service::class.java)

server.enqueue(MockResponse().setBody("Hi"))

runBlocking {
outerContextThread = Thread.currentThread()
example.body()
}

assertThat(callFactoryThread).isNotNull
assertThat(outerContextThread).isNotEqualTo(callFactoryThread)
}


@Suppress("EXPERIMENTAL_OVERRIDE")
private object DirectUnconfinedDispatcher : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
Expand Down
105 changes: 57 additions & 48 deletions retrofit/src/main/java/retrofit2/KotlinExtensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package retrofit2

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.suspendCancellableCoroutine
import java.lang.reflect.ParameterizedType
import kotlinx.coroutines.withContext
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.intrinsics.intercepted
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
Expand All @@ -30,57 +30,63 @@ import kotlin.coroutines.resumeWithException
inline fun <reified T: Any> Retrofit.create(): T = create(T::class.java)

suspend fun <T : Any> Call<T>.await(): T {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use these would be less git changes, and needs to rebase.

suspend fun <T : Any> Call<T>.await(): T = withContext(Dispatchers.Default) {
suspend fun <T : Any> Call<T?>.await(): T? = withContext(Dispatchers.Default) {
suspend fun <T> Call<T>.awaitResponse(): Response<T> = withContext(Dispatchers.Default) {

return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
if (response.isSuccessful) {
val body = response.body()
if (body == null) {
val invocation = call.request().tag(Invocation::class.java)!!
val method = invocation.method()
val e = KotlinNullPointerException("Response from " +
// TODO: a better solution for off-main-thread call factories than this.
return withContext(Dispatchers.Default) {
suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
if (response.isSuccessful) {
val body = response.body()
if (body == null) {
val invocation = call.request().tag(Invocation::class.java)!!
val method = invocation.method()
val e = KotlinNullPointerException("Response from " +
method.declaringClass.name +
'.' +
method.name +
" was null but response body type was declared as non-null")
continuation.resumeWithException(e)
continuation.resumeWithException(e)
} else {
continuation.resume(body)
}
} else {
continuation.resume(body)
continuation.resumeWithException(HttpException(response))
}
} else {
continuation.resumeWithException(HttpException(response))
}
}

override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
}
}

@JvmName("awaitNullable")
suspend fun <T : Any> Call<T?>.await(): T? {
return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T?> {
override fun onResponse(call: Call<T?>, response: Response<T?>) {
if (response.isSuccessful) {
continuation.resume(response.body())
} else {
continuation.resumeWithException(HttpException(response))
}
// TODO: a better solution for off-main-thread call factories than this.
return withContext(Dispatchers.Default) {
suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T?> {
override fun onResponse(call: Call<T?>, response: Response<T?>) {
if (response.isSuccessful) {
continuation.resume(response.body())
} else {
continuation.resumeWithException(HttpException(response))
}
}

override fun onFailure(call: Call<T?>, t: Throwable) {
continuation.resumeWithException(t)
}
})
override fun onFailure(call: Call<T?>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
}
}

Expand All @@ -91,19 +97,22 @@ suspend fun Call<Unit>.await() {
}

suspend fun <T> Call<T>.awaitResponse(): Response<T> {
return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
continuation.resume(response)
// TODO: a better solution for off-main-thread call factories than this.
return withContext(Dispatchers.Default) {
suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
continuation.resume(response)
}

override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
}
}

Expand Down