Saturday, August 13, 2016

Async programming patterns in different languages

During the 10 years I've been into professional programming, the largest paradigm shift I've witnessed is the one from sequential to asynchronous programming. Not that it is a new concept - asynchronousity has been has been present in programming since the dawn of time, but what's new is the tight integration in programming languages and the prevalence of it. From events and callbacks in C# to async-await. From callbacks in Javascript to Promises. In Java we got Futures in Java 7, CompletableFuture, lambdas and streams in Java 8. On top of all these language features you have a vast array of libraries that help you implement common async patterns - ReactiveX for instance exists for all the three languages mentioned, and a lot more. I've been programming quite a bit in all the three languages mentioned: C#, Java and Javascript, and I realized I had to re-learn all the same patterns over and over again. On top of the languages features there were libraries as well. Multiple for each language. In this blog post I will iterate what I believe are the 3 most common patterns, list and compare them in these languages. I welcome contributions for other languages or patterns to this github repo - let's together make this more a reference than a blog post! The patterns I most often use are the following: serial execution, parallel execution where you must wait for all to complete, and parallel execution where you just want the first result to return. I'm also most often using these pattern in relation to network transfers (HTTP). So first I will illustrate the three patterns with different use cases:

Use case 1: Serial execution of HTTP calls

Let's say you're building a user interface which depends on two REST services. One provide a list of news article items, while the other is an image hosting service which return image binaries from URL's. The first service returns URL's which are to be used as input for the second service. This makes second operation dependent on the first - they have to be executed serially: Serial execution

Use case 2: Parallel execution, wait for all

This time the user interface is composed of data from two independent services. You want to display a blank screen with a progress bar only while loading. Only when all data is fetched, you start rendering. This is what I dub the form-merge parallel execution: Fork-merge parallel

Use case 3: Parallel execution, take first

In this case you're firing multiple HTTP queries to identical services, but only care for the response from the first to reply. It might be that you're building a performance-critical application, or you have multiple 3rd-party providers delivering the same service but with varying degree of reliability. Take first parallel

Implementation

Below is listed short examples illustrating the three patterns in the different languages, with support functions in the first chapter. For Java I've chosen to use the RxJava framework simply because that's what I know best. If you know how to do the same with plain Java (8), please let me know or contribute with a pull request to the repo!

Support functions

// Resolves the input value after 300ms
function getSlowPromise(returnValue) {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve(returnValue);
        }, 300);
    });
}
// Returns input string after 250ms
public static async Task<string> GetSlowStringTask(String toReturn)
{
    await Task.Delay(250);
    return toReturn;
}

// Returns input int after 250ms
public static async Task<int> GetSlowIntTask(int toReturn)
{
    await Task.Delay(250);
    return toReturn;
}
suspend fun getSlowStringJob(s: String) : String {
    delay(250)
    return s
}

suspend fun getSlowIntJob(i: Int) : Int {
    delay(250)
    return i
}

// Returns the deferred job which completes first
suspend fun <T> awaitFirst(jobs : Array<Deferred<T>>) : Deferred<T> {
    var doneJob : Deferred<T>? = null
    while (doneJob == null) {
        for (job in jobs) {
            if (job.isCompleted) {
                doneJob = job
                break;
            }
            yield()
        }
    }
    return doneJob
}

Pipeline / Serial execution

// createPromise is a method returning a promise which resolves to the input value
getSlowPromise('job1')
    .then((result) => {
        return getSlowPromise(result + '-job2');
    }).then((result) => {
        return getSlowPromise(result + '-job3');
    }).then((result) => {
        // Result from third job
    });
Observable<String> obs1 = Observable.just("Hello world");
obs1.flatMap(s -> {
    Observable<Integer> obs2 =  Observable.just(s.length());
    return obs2;
}).flatMap(i -> {
    Observable<String> obs3 = Observable.just("Length: " + i);
    return obs3;
}).subscribe(s ->{
    System.out.println(s);
    // Will output "Length: 11"
});

var str = await GetSlowStringTask("Hello world");
var len = await GetSlowIntTask(str.Length);
var res = await GetSlowStringTask("Len: " + len);
Console.Out.WriteLine(res);
launch(CommonPool) {
    val res1 = getSlowStringJob("Hello")
    val res2 = getSlowIntJob(res1.length)
    val res3 = getSlowStringJob("First string length: " + res2)
    System.out.println(res3)
}

Parallel fork-join

// Assuming promise1, 2 and 3 are created
// before this code executing
Promise.all([
    promise1,
    promise2,
    promise3
]).then((results) => {
    // 'results' is array of results from each promise
})
// obs1, 2 and 3 are Observable<String>'s
// Assuming each observable only emits 1 value
obs1.mergeWith(obs2)
    .mergeWith(obs3)
    .buffer(3)
    .subscribe(resultArray -> {
        // resultArray is String[3]
    });
var tasks = new Task<int>[3];
tasks[0] = GetSlowIntTask(1);
tasks[1] = GetSlowIntTask(2);
tasks[2] = GetSlowIntTask(3);
Task.WaitAll(tasks);
for (int i = 0; i < 3; i++)
{
    Console.Out.WriteLine("Res " + i + ": " + tasks[i].Result);
}
val job1 = async(CommonPool) { getSlowIntJob(1) }
val job2 = async(CommonPool) { getSlowIntJob(2) }
val job3 = async(CommonPool) { getSlowIntJob(3) }
launch(CommonPool) {
    val sum = job1.await() + job2.await() + job3.await()
    System.out.println("Sum: " + sum)
}

Parallel take first

Promise.race([
    promise1,
    promise2,
    promise3
]).then((result) => {
    // result is first to return
})
// obs1, 2 and 3 are Observable<String>'s
// Assuming each observable only emits 1 value
obs1.mergeWith(obs2)
    .mergeWith(obs3)
    .first()
    .subscribe(result -> {
        // result is String
    });
var tasks = new Task<int>[3];
tasks[0] = GetSlowIntTask(1);
tasks[1] = GetSlowIntTask(2);
tasks[2] = GetSlowIntTask(3);
int firstResult = Task.WaitAny(tasks);
Console.Out.WriteLine("Res " + firstResult);
val job1 = async(CommonPool) { getSlowIntJob(1) }
val job2 = async(CommonPool) { getSlowIntJob(2) }
val job3 = async(CommonPool) { getSlowIntJob(3) }
launch(CommonPool) {
    val jobs = arrayOf(job1, job2, job3)
    val first = awaitFirst(jobs)
    System.out.println("Finished first: " + first.getCompleted())
}

Thanks to Espen Volden for contributing with Javascript examples

Article updated 18 Feb 2017 with Kotlin 1.1 Coroutines examples

15 comments:

  1. Ideas! i think no need of any ideas just get to gather and start having fun, chit chat and other activities.


    garage door repair Calgary

    ReplyDelete
  2. Lovely pictures, awesome these are looking so funny interesting but professional and artistic pics.


    garage door repair

    ReplyDelete
  3. Thanks for sharing nice information with us. i like your post and all you share with us is up to date and quite informative, i would like to bookmark the page so i can come here again to read you, as you have done a wonderful job.lice removal service

    ReplyDelete
  4. Thanks for sharing nice information with us. i like your post and all you share with us is up to date and quite informative, i would like to bookmark the page so i can come here again to read you, as you have done a wonderful job.We buy houses Pittsburgh

    ReplyDelete
  5. what a great style, it not easy job well don.


    garage door repair

    ReplyDelete
  6. Can you please provide more information on this subject? BTW your blog is great. Cheers.


    garage door repair Pickering

    ReplyDelete
  7. אני לא מסכים עם כל מה שכתוב, אבל מאמר מעניין מאוד

    חברת ניקיון בצפון

    ReplyDelete
  8. You got a really useful blog I have been here reading for about half an hour. I am a newbie and your post is valuable for me.
    garage door cable repair

    ReplyDelete
  9. מרתק כמה שלא הבנתי כלום עד שקראתי את המאמר הנפלא הזה
    פיתוח אפליקציות Gapps

    ReplyDelete