Saturday, August 13, 2016

Async programming patterns in different languages

During the 10 years I've been into professional programming, the largest paradigm shift I've witnessed is the one from sequential to asynchronous programming. Not that it is a new concept - asynchronousity has been has been present in programming since the dawn of time, but what's new is the tight integration in programming languages and the prevalence of it. From events and callbacks in C# to async-await. From callbacks in Javascript to Promises. In Java we got Futures in Java 7, CompletableFuture, lambdas and streams in Java 8. On top of all these language features you have a vast array of libraries that help you implement common async patterns - ReactiveX for instance exists for all the three languages mentioned, and a lot more. I've been programming quite a bit in all the three languages mentioned: C#, Java and Javascript, and I realized I had to re-learn all the same patterns over and over again. On top of the languages features there were libraries as well. Multiple for each language. In this blog post I will iterate what I believe are the 3 most common patterns, list and compare them in these languages. I welcome contributions for other languages or patterns to this github repo - let's together make this more a reference than a blog post! The patterns I most often use are the following: serial execution, parallel execution where you must wait for all to complete, and parallel execution where you just want the first result to return. I'm also most often using these pattern in relation to network transfers (HTTP). So first I will illustrate the three patterns with different use cases:

Use case 1: Serial execution of HTTP calls

Let's say you're building a user interface which depends on two REST services. One provide a list of news article items, while the other is an image hosting service which return image binaries from URL's. The first service returns URL's which are to be used as input for the second service. This makes second operation dependent on the first - they have to be executed serially: Serial execution

Use case 2: Parallel execution, wait for all

This time the user interface is composed of data from two independent services. You want to display a blank screen with a progress bar only while loading. Only when all data is fetched, you start rendering. This is what I dub the form-merge parallel execution: Fork-merge parallel

Use case 3: Parallel execution, take first

In this case you're firing multiple HTTP queries to identical services, but only care for the response from the first to reply. It might be that you're building a performance-critical application, or you have multiple 3rd-party providers delivering the same service but with varying degree of reliability. Take first parallel

Implementation

Below is listed short examples illustrating the three patterns in the different languages, with support functions in the first chapter. For Java I've chosen to use the RxJava framework simply because that's what I know best. If you know how to do the same with plain Java (8), please let me know or contribute with a pull request to the repo!

Support functions

// Resolves the input value after 300ms
function getSlowPromise(returnValue) {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve(returnValue);
        }, 300);
    });
}
// Returns input string after 250ms
public static async Task GetSlowStringTask(String toReturn)
{
    await Task.Delay(250);
    return toReturn;
}

// Returns input int after 250ms
public static async Task GetSlowIntTask(int toReturn)
{
    await Task.Delay(250);
    return toReturn;
}

Pipeline / Serial execution

// createPromise is a method returning a promise which resolves to the input value
getSlowPromise('job1')
    .then((result) => {
        return getSlowPromise(result + '-job2');
    }).then((result) => {
        return getSlowPromise(result + '-job3');
    }).then((result) => {
        // Result from third job
    });
Observable obs1 = Observable.just("Hello world");
obs1.flatMap(s -> {
    Observable obs2 =  Observable.just(s.length());
    return obs2;
}).flatMap(i -> {
    Observable obs3 = Observable.just("Length: " + i);
    return obs3;
}).subscribe(s ->{
    System.out.println(s);
    // Will output "Length: 11"
});

var str = await GetSlowStringTask("Hello world");
var len = await GetSlowIntTask(str.Length);
var res = await GetSlowStringTask("Len: " + len);
Console.Out.WriteLine(res);

Parallel fork-join

// Assuming promise1, 2 and 3 are created
// before this code executing
Promise.all([
    promise1,
    promise2,
    promise3
]).then((results) => {
    // 'results' is array of results from each promise
})
// obs1, 2 and 3 are Observable's
// Assuming each observable only emits 1 value
obs1.mergeWith(obs2)
    .mergeWith(obs3)
    .buffer(3)
    .subscribe(resultArray -> {
        // resultArray is String[3]
    });
var tasks = new Task[3];
tasks[0] = GetSlowIntTask(1);
tasks[1] = GetSlowIntTask(2);
tasks[2] = GetSlowIntTask(3);
Task.WaitAll(tasks);
for (int i = 0; i < 3; i++)
{
    Console.Out.WriteLine("Res " + i + ": " + tasks[i].Result);
}

Parallel take first

Promise.race([
    promise1,
    promise2,
    promise3
]).then((result) => {
    // result is first to return
})
// obs1, 2 and 3 are Observable's
// Assuming each observable only emits 1 value
obs1.mergeWith(obs2)
    .mergeWith(obs3)
    .first()
    .subscribe(result -> {
        // result is String
    });
var tasks = new Task[3];
tasks[0] = GetSlowIntTask(1);
tasks[1] = GetSlowIntTask(2);
tasks[2] = GetSlowIntTask(3);
int firstResult = Task.WaitAny(tasks);
Console.Out.WriteLine("Res " + firstResult);

Thanks to Espen Volden for contributing with Javascript examples

This article has also been posted to VG's techblog