(use <space>/<s-space> to navigate)
// Syntax
blue^foo()
red^bar()
blue^foo() {
blue^doom // OK
}
blue^foo() {
...
red^bar() // NOT OK
...
}
When you write a function you should choose its color
fun doStuff() {
val bitmap = red^doRedStuff()
blue^doBlueStuff()
}
 A: It must be red because it calls another red function
Q: Which color is this function?
// You should write loadBitmapInto function
// ..and it should return a result
fun foo(callback: () -> Unit) {
loadBitmap(name: String) { bitmap ->
processBitmap { bitmap ->
callback("it's done!")
}
}
}
fun foo(callback: () -> Unit) {
val cf = CompletableFuture.supplyAsync { loadBitmap() }
.thenCompose { bitmap -> processBitmap(bitmap) }
.thenAccept(callback)
}
report("before async block")
async<Unit> {
report("before await")
val bitmap = await { loadBitmap("bitmap.bmp") }
report("after await")
// do smth. with bitmap
}
report("after async block")
// logcat
Reporting [before async block] from Thread[main,5,main]
Reporting [before await] from Thread[main,5,main]
Reporting [after async block] from Thread[main,5,main]
Reporting [inside slow call] from Thread[ForkJoinPool.commonPool-worker-1,5,main]
Reporting [after await] from Thread[ForkJoinPool.commonPool-worker-1,5,main]
fun loadBitmap(name: String): ... {
...
report("inside slow call")
...
val bmp: Bitmap = loadBitmapFromNetwork()
bmp // return
}
report("before async block")
async<Unit> {
report("before await")
val bitmap = await(loadBitmap("bitmap.bmp"))
report("after await")
imageView.setBitmap(bitmap)
}
report("after async block")
suspension point
coroutine
Coroutines in Kotlin are implemented through finate state machinesÂ
Coroutine is a function that can be paused and resumed
val a = a()
val y = await(foo(a)) // suspension point
b()
val z = await(bar(a, y)) // suspension point
c(z)
Kotlin (1.1) compiles coroutines to FTMs
val a = a()
val y = await(foo(a)) // suspension point
b()
val z = await(bar(a, y)) // suspension point
c(z)
class <anonymous_for_state_machine> implements Continuation<...> {
// The current state of the state machine
int label = 0
// local variables of the coroutine
A a = null
Y y = null
void resume(Object data) {
if (label == 0) goto L0
if (label == 1) goto L1
if (label == 2) goto L2
else throw IllegalStateException()
L0:
// data is expected to be `null` at this invocation
a = a()
label = 1
await(foo(a), this) // 'this' is passed as a continuation
return
L1:
// external code has resumed this coroutine passing the result of await() as data
y = (Y) data
b()
label = 2
await(bar(a, y), this) // 'this' is passed as a continuation
return
L3:
// external code has resumed this coroutine passing the result of await() as data
Z z = (Z) data
c(z)
label = -1 // No more steps are allowed
return
}
}
fun asyncFun(
coroutine c: FunController.() -> Continuation<Unit>
): Unit {
val controller = FunController()
c(controller).resume(Unit)
}
@AllowSuspendExtensions
class FunController() {
suspend fun <V> awaitFun(f: () -> V, machine: Continuation<V>) {
// ....
}
}
Â
interface Continuation<in P> {
fun resume(data: P)
fun resumeWithException(exception: Throwable)
}
@AllowSuspendExtensions
class FunController() {
val executorService: ExecutorService = Executors.newFixedThreadPool(5)
suspend fun <V> awaitFun(f: () -> V, machine: Continuation<V>) {
executorService.execute {
try {
val v = f() // blocking
machine.resume(v)
} catch (e: ExecutionException) {
machine.resumeWithException(Exception(e.cause))
}
}
}
}
report("before async")
asyncFun {
report("before await")
val result = awaitFun(::slowFun)
report("after await; result is $result")
}
report("after async")
D/REPORT: before async [on Thread[main,5,main]
D/REPORT: before await [on Thread[main,5,main]
D/REPORT: after async [on Thread[main,5,main]
D/REPORT: inside slow function [on Thread[pool-1-thread-1,5,main]
D/REPORT: after await; result is 42 [on Thread[pool-1-thread-1,5,main]
val githubApi = GitHubRetrofit().builder
val companies = listOf("yalantis", "google")
asyncFun {
val repos = companies.flatMap { company ->
awaitFun({
githubApi.getOrgRepos(company).execute().body()
})
}
}
typealias Wrapper = (() -> Unit) -> Unit
fun asyncFun(
wrapper: Wrapper? = null,
coroutine c: FunController.() -> Continuation<Unit>
): Unit {
val controller = FunController(wrapper)
c(controller).resume(Unit)
}
@AllowSuspendExtensions
class FunController(val wrapper: Wrapper?) {
val executorService: ExecutorService = Executors.newFixedThreadPool(5)
suspend fun <V> awaitFun(f: () -> V, machine: Continuation<V>) {
executorService.execute {
try {
val v = f()
wrapIfNeeded { machine.resume(v) }
} catch (e: Exception) {
wrapIfNeeded { machine.resumeWithException(Exception(e.cause)) }
}
}
}
private fun wrapIfNeeded(function: () -> Unit) {
wrapper?.invoke(function) ?: function()
}
}
val handler = Handler(mainLooper)
val wrapper: Wrapper = {
handler.post(it)
}
report("before async")
asyncFun(wrapper) {
report("before await")
val result = awaitFun(::slowFun)
report("after await; result is $result")
}
report("after async")
D/REPORT: before async [on Thread[main,5,main]
D/REPORT: before await [on Thread[main,5,main]
D/REPORT: after async [on Thread[main,5,main]
D/REPORT: inside slow function [on Thread[pool-1-thread-1,5,main]
D/REPORT: after await; result is 42 [on Thread[main,5,main]
report("before async")
asyncFun(null) {
report("before await")
val (photo, profile) = awaitFunPair(::loadPhoto, ::loadProfile)
report("after await")
}
report("after async")
D/REPORT: before async [on Thread[main,5,main]
D/REPORT: before await [on Thread[main,5,main]
D/REPORT: after async [on Thread[main,5,main]
D/REPORT: job B started [on Thread[pool-1-thread-1,5,main]
D/REPORT: job A started [on Thread[pool-1-thread-2,5,main]
D/REPORT: job A ended [on Thread[pool-1-thread-2,5,main]
D/REPORT: job B ended [on Thread[pool-1-thread-1,5,main]
D/REPORT: after await
suspend fun <F, S> awaitFunPair(first: () -> F,
second: () -> S,
machine: Continuation<Pair<F, S>>) {
val firstTask = FutureTask(first)
val secondTask = FutureTask(second)
executorService.execute(secondTask)
executorService.execute {
try {
firstTask.run() // blocking
val result = Pair(firstTask.get(), secondTask.get()) // blocking
wrapIfNeeded { machine.resume(result) }
} catch (e: ExecutionException) {
wrapIfNeeded { machine.resumeWithException(Exception(e.cause)) }
}
}
}
typical quiz app
asyncFun(wrapper) {
while(true) {
val timer = timer(2000)
val answer = userAnswer()
val result: FutureTask<*> = awaitFunAny(timer, answer)
when(result) {
timer -> showMsg("You're too slow!")
answer -> {
val a: Answer = answer.get()
if (isRightAnswer(a)) showNextQuestion()
else { showMsg("You lost"); break }
}
else -> throw IllegalStateException()
}
}
}
suspend fun awaitFunAny(vararg fs: FutureTask<*>, machine: Continuation<FutureTask<*>>) {
val ecs: ExecutorCompletionService<FutureTask<*>> =
ExecutorCompletionService(executorService)
fs.forEach { f ->
ecs.submit { f.run(); f } // blocks each thread and returns task itself
}
executorService.execute {
val f = ecs.take() // blocks
fs.forEach { if (it != f) it.cancel(true) }
wrapIfNeeded { machine.resume(f.get()) }
}
}
@defhlt