Going

Async

with Kotlin 

(use <space>/<s-space> to navigate)

  • It has two types of functions
  • There are red functions
  • There are blue functions
  • There are no colorless functions

Imagine you are learning

a new programming language...

// Syntax

blue^foo()

red^bar()

Rules:

  • You can call a blue function from red one
  • You can call a blue function from blue one
  • You can call a red function from red one
  • You can't call a red function from blue one
blue^foo() {
    blue^doom // OK
}


blue^foo() {
    ...
    red^bar() // NOT OK
    ...
}

When you write a function you should choose its color

fun doStuff() {
    val bitmap = red^doRedStuff()

    blue^doBlueStuff()
}

 A: It must be red because it calls another red function

Q: Which color is this function?

One more rule:

  • Red functions are more painful to work with

More and more  functions become red in your code

That's are problem because red functions are more painful

Q: Would like to code in a language like that?

What if I told you...

you program

on a language

like that

every day?

Red are asynchronous

Blue are synchronous

// You should write loadBitmapInto function
// ..and it should return a result

fun foo(callback: () -> Unit) {
    loadBitmap(name: String) { bitmap ->
        processBitmap { bitmap ->
            callback("it's done!")
        }
    }
}


  • loadBitmap is async
  • setBitmap is sync
  • ..so if you want to return a value loadBitmapInto should be async

Example - Callbacks

Callback hell

fun foo(callback: () -> Unit) {
    val cf = CompletableFuture.supplyAsync { loadBitmap() }
        .thenCompose { bitmap -> processBitmap(bitmap) }
        .thenAccept(callback)
}


Example - Futures

  • Viral
  • Not composed well with other language features
  • Exceptions (stack)
  • Pull / Push

What should we do?

report("before async block")
async<Unit> {
    report("before await")
    val bitmap = await { loadBitmap("bitmap.bmp") }
    report("after await")
    // do smth. with bitmap
}
report("after async block")
// logcat
Reporting [before async block] from Thread[main,5,main]
Reporting [before await] from Thread[main,5,main]
Reporting [after async block] from Thread[main,5,main]
Reporting [inside slow call] from Thread[ForkJoinPool.commonPool-worker-1,5,main]
Reporting [after await] from Thread[ForkJoinPool.commonPool-worker-1,5,main]

Example - Kotlin 1.1+

fun loadBitmap(name: String): ... {
    ...
    report("inside slow call")
    ...
    val bmp: Bitmap = loadBitmapFromNetwork()
    bmp // return
}
report("before async block")
async<Unit> {
    report("before await")
    val bitmap = await(loadBitmap("bitmap.bmp"))
    report("after await")
    imageView.setBitmap(bitmap)
}
report("after async block")

suspension point

coroutine

  • async/await is just a library
  • only language future required is coroutines

Coroutines

  • Ruby -- 2010  (fibers)
  • Lua -- 2003
  • assembly -- 1958
  • Many other languages have them including Java (bytecode manipulation)

Coroutines in Kotlin are implemented through finate state machines 

Coroutine is a function that can be paused and resumed

Finite

State

Machines

val a = a()
val y = await(foo(a)) // suspension point
b()
val z = await(bar(a, y)) // suspension point
c(z)

Kotlin (1.1) compiles coroutines to FTMs

val a = a()
val y = await(foo(a)) // suspension point
b()
val z = await(bar(a, y)) // suspension point
c(z)
class <anonymous_for_state_machine> implements Continuation<...> {
    // The current state of the state machine
    int label = 0

    // local variables of the coroutine
    A a = null
    Y y = null

    void resume(Object data) {
        if (label == 0) goto L0
        if (label == 1) goto L1
        if (label == 2) goto L2
        else throw IllegalStateException()

      L0:
        // data is expected to be `null` at this invocation
        a = a()
        label = 1
        await(foo(a), this) // 'this' is passed as a continuation 
        return
      L1:
        // external code has resumed this coroutine passing the result of await() as data 
        y = (Y) data
        b()
        label = 2
        await(bar(a, y), this) // 'this' is passed as a continuation
        return
      L3:
        // external code has resumed this coroutine passing the result of await() as data 
        Z z = (Z) data
        c(z)
        label = -1 // No more steps are allowed
        return
    }          
}    

Implement your own async/await

fun asyncFun(
        coroutine c: FunController.() -> Continuation<Unit>
): Unit {
    val controller = FunController()
    c(controller).resume(Unit)
}
  • special keyword coroutine
  • coroutine accepts controller as this
  • coroutine implements Continuation

async

@AllowSuspendExtensions
class FunController() {

    suspend fun <V> awaitFun(f: () -> V, machine: Continuation<V>) {
        // ....
    }

}
  • function marked with suspend receives Continuation object as last argument
  • controller determines which suspending function are available

 

interface Continuation<in P> {

    fun resume(data: P)


    fun resumeWithException(exception: Throwable)
}

await

@AllowSuspendExtensions
class FunController() {

    val executorService: ExecutorService = Executors.newFixedThreadPool(5)

    suspend fun <V> awaitFun(f: () -> V, machine: Continuation<V>) {
        executorService.execute {
            try {
                val v = f() // blocking
                machine.resume(v)
            } catch (e: ExecutionException) {
                machine.resumeWithException(Exception(e.cause))
            }
        }
    }

}

Background job - impl

report("before async")
asyncFun {
    report("before await")
    val result = awaitFun(::slowFun)
    report("after await; result is $result")
}
report("after async")
D/REPORT: before async [on Thread[main,5,main]
D/REPORT: before await [on Thread[main,5,main]
D/REPORT: after async [on Thread[main,5,main]
D/REPORT: inside slow function [on Thread[pool-1-thread-1,5,main]
D/REPORT: after await; result is 42 [on Thread[pool-1-thread-1,5,main]

Background job - usage

val githubApi = GitHubRetrofit().builder
val companies = listOf("yalantis", "google")

asyncFun {
    
    val repos = companies.flatMap { company ->
        awaitFun({
            githubApi.getOrgRepos(company).execute().body()
        })
    }

}

Background job - usage 2

typealias Wrapper = (() -> Unit) -> Unit

fun asyncFun(
        wrapper: Wrapper? = null,
        coroutine c: FunController.() -> Continuation<Unit>
): Unit {
    val controller = FunController(wrapper)
    c(controller).resume(Unit)
}

@AllowSuspendExtensions
class FunController(val wrapper: Wrapper?) {

    val executorService: ExecutorService = Executors.newFixedThreadPool(5)

    suspend fun <V> awaitFun(f: () -> V, machine: Continuation<V>) {
        executorService.execute {
            try {
                val v = f()
                wrapIfNeeded { machine.resume(v) }
            } catch (e: Exception) {
                wrapIfNeeded { machine.resumeWithException(Exception(e.cause)) }
            }
        }
    }

    private fun  wrapIfNeeded(function: () -> Unit) {
        wrapper?.invoke(function) ?: function()
    }

}

Continue on main thread - impl

        val handler = Handler(mainLooper)
        val wrapper: Wrapper = {
            handler.post(it)
        }

        report("before async")
        asyncFun(wrapper) {
            report("before await")
            val result = awaitFun(::slowFun)
            report("after await; result is $result")
        }
        report("after async")

Continue on main thread - usage

D/REPORT: before async [on Thread[main,5,main]
D/REPORT: before await [on Thread[main,5,main]
D/REPORT: after async [on Thread[main,5,main]
D/REPORT: inside slow function [on Thread[pool-1-thread-1,5,main]
D/REPORT: after await; result is 42 [on Thread[main,5,main]
        report("before async")
        asyncFun(null) {
            report("before await")
            val (photo, profile) = awaitFunPair(::loadPhoto, ::loadProfile)
            report("after await")
        }
        report("after async")

 Several jobs parallelly - usage

D/REPORT: before async [on Thread[main,5,main]
D/REPORT: before await [on Thread[main,5,main]
D/REPORT: after async [on Thread[main,5,main]
D/REPORT: job B started [on Thread[pool-1-thread-1,5,main]
D/REPORT: job A started [on Thread[pool-1-thread-2,5,main]
D/REPORT: job A ended [on Thread[pool-1-thread-2,5,main]
D/REPORT: job B ended [on Thread[pool-1-thread-1,5,main]
D/REPORT: after await
    suspend fun <F, S> awaitFunPair(first: () -> F,
                                    second: () -> S,
                                    machine: Continuation<Pair<F, S>>) {

        val firstTask = FutureTask(first)
        val secondTask = FutureTask(second)

        executorService.execute(secondTask)

        executorService.execute {
            try {
                firstTask.run() // blocking
                val result = Pair(firstTask.get(), secondTask.get()) // blocking
                wrapIfNeeded { machine.resume(result) }
            } catch (e: ExecutionException) {
                wrapIfNeeded { machine.resumeWithException(Exception(e.cause)) }
            }
        }
    }

Several jobs parallelly - impl

typical quiz app

asyncFun(wrapper) {
    while(true) {
        val timer = timer(2000)
        val answer = userAnswer()

        val result: FutureTask<*> = awaitFunAny(timer, answer)

        when(result) {
            timer -> showMsg("You're too slow!")
            answer -> {
                val a: Answer = answer.get()
                if (isRightAnswer(a)) showNextQuestion()
                else { showMsg("You lost"); break }
            }
            else -> throw IllegalStateException()
        }
    }
}

Wait for the first job - usage

suspend fun awaitFunAny(vararg fs: FutureTask<*>, machine: Continuation<FutureTask<*>>) {

    val ecs: ExecutorCompletionService<FutureTask<*>> =
        ExecutorCompletionService(executorService)

    fs.forEach { f ->
        ecs.submit { f.run(); f } // blocks each thread and returns task itself
    }

    executorService.execute {
        val f = ecs.take() // blocks

        fs.forEach { if (it != f) it.cancel(true) }

        wrapIfNeeded { machine.resume(f.get()) }
    }

}

Wait for the first job - impl

What's not covered

  • async blocks can return values (handleResult, handleException)
  • it's not only about async/wait
    • generators
    • channel-based concurrency
    • scoped continuations

Links

@defhlt