Rx for the Ultimate Beginner – Part 4 (Kotlin)

This is the fourth and probably the most important part of our ongoing “Rx for the Ultimate Beginner” series, the one that explains flatMap operator. If you haven’t read the previous ones, you can find the outline at the beginning of the first post. This post has a twin sibling for iOS developers with code examples given in Swift.

Where we left

In the last post, we created our own observables from scratch. We either encapsulated an async operation with a

Observable.create()
Observable.create() call or we learned that there are creation operators that work with regular types. We also learned that many of the native platform types are already extended with Rx capabilities by the library.

Composition

What the use of Rx if we are not composing multiple observables? The promise of the paradigm is structured async code after all.

We humans already use natural language (i.e English) to tell stories. There is no notion of synchronization in our speech. Time is a matter of tense and the structure of a sentence hardly ever changes when we mean future or past.

The same logic applies to the observable statements as well. When done properly, a statement created with composing Rx operators, on the high level, looks like an English statement like ones in the below.

Mapping

  • Every X that happens
  • will be converted to Y
  • which can then be substituted as X in one of these four examples.

Filtering

  • Every X that happens
  • that suffices a condition
  • can then be substituted as X in one of these four examples.

Flat Mapping

  • Every X that happens
  • will cause Y
  • which can then be substituted as X in one of these four examples.

Doing Side Effects

  • Every X that happens
  • can change something in the environment. (communicate with the outside world, device sensors or UI elements)

We already used

map()
map() and
filter()
filter() in the previous examples. Despite not worrying about the internals, we actually created temporary Observables that is represented as X in the above English sentences every time we chained an operator. This is fine and suggested since observables are nothing but lightweight wrappers.

Today, we are going to investigate the magical operator

flatMap()
flatMap() which will become our most fundamental building block when we compose multiple observables.

Ultra Simple HTTP Library

Let’s introduce some utility functions for making HTTP calls. These will become useful in the examples. Put these inside your activity.

fun httpGetRx(url: String): Observable<String> {
val queue = Volley.newRequestQueue(this)
return Observable.create<String> { sub ->
val req = StringRequest(Request.Method.GET, url,
Response.Listener<String> { response ->
sub.onNext(response)
},
Response.ErrorListener { error ->
sub.onError(error)
})
queue.add(req)
}
}
fun httpPostRx(url: String, body: MutableMap<String, String>): Observable<String> {
val queue = Volley.newRequestQueue(this)
return Observable.create<String> { sub ->
val req = object : StringRequest(Request.Method.POST, url,
Response.Listener<String> { response ->
sub.onNext(response)
},
Response.ErrorListener { error ->
sub.onError(error)
}) {
override fun getParams(): MutableMap<String, String> {
return body
}
}
queue.add(req)
}
}
fun httpGetRx(url: String): Observable<String> { val queue = Volley.newRequestQueue(this) return Observable.create<String> { sub -> val req = StringRequest(Request.Method.GET, url, Response.Listener<String> { response -> sub.onNext(response) }, Response.ErrorListener { error -> sub.onError(error) }) queue.add(req) } } fun httpPostRx(url: String, body: MutableMap<String, String>): Observable<String> { val queue = Volley.newRequestQueue(this) return Observable.create<String> { sub -> val req = object : StringRequest(Request.Method.POST, url, Response.Listener<String> { response -> sub.onNext(response) }, Response.ErrorListener { error -> sub.onError(error) }) { override fun getParams(): MutableMap<String, String> { return body } } queue.add(req) } }
fun httpGetRx(url: String): Observable<String> {
    val queue = Volley.newRequestQueue(this)

    return Observable.create<String> { sub ->
        val req = StringRequest(Request.Method.GET, url,
                Response.Listener<String> { response ->
                    sub.onNext(response)
                },
                Response.ErrorListener { error ->
                    sub.onError(error)
                })

        queue.add(req)
    }
}


fun httpPostRx(url: String, body: MutableMap<String, String>): Observable<String> {
    val queue = Volley.newRequestQueue(this)

    return Observable.create<String> { sub ->

        val req = object : StringRequest(Request.Method.POST, url,
                Response.Listener<String> { response ->
                    sub.onNext(response)
                },
                Response.ErrorListener { error ->
                    sub.onError(error)
                }) {
            override fun getParams(): MutableMap<String, String> {
                return body
            }
        }

        queue.add(req)
    }
}

The Infamous flatMap

Curious readers would probably have already read the description of this operator but here, we are going full pragmatic and will study with examples.

A very common pattern that occurs in app development is: a user clicks a button, an HTTP call is made, the result of the call is checked for a condition, if the condition satisfies, another HTTP call is made or shown error.

i.e

  • Attempt login. If successful, fetch user profile, else show error.
  • Get upload token. If successful, upload file, else show error.

Now, back to the call chaining. If we didn’t have access to

flatMap()
flatMap(), we would probably write something like the code below which MUST BE AVOIDED AT ALL COSTS.

DON’T COPY THIS, it is just a demonstration of the most common error.

// DON'T COPY THE CODE BELOW
httpPostRx("https://httpbin.org/get?call=login", mutableMapOf( "example" to "user", "123456" to "pass"))
.subscribe(
{ _ ->
httpGetRx("https://httpbin.org/get?call=getProfile")
.subscribe(
{ profile ->
// Show profile
},
{ error ->
Log.e("Rx", error.message, error)
},
{},
{ _ -> }
)
},
{ error ->
Log.e("Rx", error.message, error)
},
{},
{ _ -> }
)
// DON'T COPY THE CODE BELOW httpPostRx("https://httpbin.org/get?call=login", mutableMapOf( "example" to "user", "123456" to "pass")) .subscribe( { _ -> httpGetRx("https://httpbin.org/get?call=getProfile") .subscribe( { profile -> // Show profile }, { error -> Log.e("Rx", error.message, error) }, {}, { _ -> } ) }, { error -> Log.e("Rx", error.message, error) }, {}, { _ -> } )
// DON'T COPY THE CODE BELOW
httpPostRx("https://httpbin.org/get?call=login", mutableMapOf( "example" to "user", "123456" to "pass"))
        .subscribe(
                { _ ->
                    httpGetRx("https://httpbin.org/get?call=getProfile")
                            .subscribe(
                                    { profile ->
                                        // Show profile
                                    },
                                    { error ->
                                        Log.e("Rx", error.message, error)
                                    },
                                    {},
                                    { _ -> }
                            )
                },
                { error ->
                    Log.e("Rx", error.message, error)
                },
                {},
                { _ -> }
        )

Just by the look of it, you should be skeptical. The more calls you have, the longer the indentation will go to the right. If you are subscribing to an observable inside a subscription callback, you are looking at a case where

flatMap()
flatMap() is the tool you need. Quickly forget what you have seen and instead check out the code piece below.

httpPostRx("https://httpbin.org/get?call=login", mutableMapOf( "example" to "user", "123456" to "pass"))
.flatMap { _ ->
httpGetRx("https://httpbin.org/get?call=getProfile")
}
.subscribe(
{ profile ->
// show profile
},
{ error ->
Log.e("Rx", error.message, error)
},
{},
{ _ -> }
)
httpPostRx("https://httpbin.org/get?call=login", mutableMapOf( "example" to "user", "123456" to "pass")) .flatMap { _ -> httpGetRx("https://httpbin.org/get?call=getProfile") } .subscribe( { profile -> // show profile }, { error -> Log.e("Rx", error.message, error) }, {}, { _ -> } )
httpPostRx("https://httpbin.org/get?call=login", mutableMapOf( "example" to "user", "123456" to "pass"))
        .flatMap { _ ->
            httpGetRx("https://httpbin.org/get?call=getProfile")
        }
        .subscribe(
                { profile ->
                    // show profile
                },
                { error ->
                    Log.e("Rx", error.message, error)
                },
                {},
                { _ -> }
        )

A bunch of things happens here under the hood. To understand better, we need to look at the return value of the lambda we provided to the operator. Its expected type is

Observable<T>
Observable<T>. This is different than the
map()
map() operator.

When we provide a lambda to the

map()
map() operator, the lambda is used to convert emissions into other types.

In

flatMap()
flatMap(), we are instead expected to convert emissions to other observables which then can all be subscribed simultaneously and merged into a single observable. If we chain multiple calls on the same level as the first
flatMap()
flatMap() call, each lambda will receive the previous block’s HTTP response as input. All the subscription will stop if one of the calls causes an error. Finally, regardless of the source, all errors are handled in a single lambda. Neat, right?

Here we chain 4 HTTP calls. Please notice that the indentation stays the same during the chain no matter how many calls we append and there are no if statements to check whether the calls are successful.

httpPostRx("https://httpbin.org/get?call=login", mutableMapOf( "example" to "user", "123456" to "pass"))
.flatMap { _ ->
httpGetRx("https://httpbin.org/get?call=getProfile")
}
.flatMap { _ ->
httpGetRx("https://httpbin.org/get?call=getNotifications")
}
.flatMap { _ ->
httpGetRx("https://httpbin.org/get?call=getSettings")
}
.subscribe(
{ settings ->
// show settings (we lost the profile and notifications, read more below)
},
{ error ->
Log.e("Rx", error.message, error)
},
{},
{ _ -> }
)
httpPostRx("https://httpbin.org/get?call=login", mutableMapOf( "example" to "user", "123456" to "pass")) .flatMap { _ -> httpGetRx("https://httpbin.org/get?call=getProfile") } .flatMap { _ -> httpGetRx("https://httpbin.org/get?call=getNotifications") } .flatMap { _ -> httpGetRx("https://httpbin.org/get?call=getSettings") } .subscribe( { settings -> // show settings (we lost the profile and notifications, read more below) }, { error -> Log.e("Rx", error.message, error) }, {}, { _ -> } )
httpPostRx("https://httpbin.org/get?call=login", mutableMapOf( "example" to "user", "123456" to "pass"))
        .flatMap { _ ->
            httpGetRx("https://httpbin.org/get?call=getProfile")
        }
        .flatMap { _ ->
            httpGetRx("https://httpbin.org/get?call=getNotifications")
        }
        .flatMap { _ ->
            httpGetRx("https://httpbin.org/get?call=getSettings")
        }
        .subscribe(
                { settings ->
                    // show settings (we lost the profile and notifications, read more below)
                },
                { error ->
                    Log.e("Rx", error.message, error)
                },
                {},
                { _ -> }
        )

You may ask, “How are we going to propagate each response until the end of the chain?”. There are actually multiple ways for that. We can carry a growing list each time we chain a

flatMap()
flatMap(). Or we can build a tuple to safely label our call responses. It’s your call. Below is how you use
flatMap()
flatMap() to create a growing list on each step.

httpPostRx("https://httpbin.org/get?call=login", mutableMapOf( "example" to "user", "123456" to "pass"))
.flatMap { _ ->
httpGetRx("https://httpbin.org/get?call=getProfile")
.map { profile ->
listOf(profile)
}
}
.flatMap { responses ->
httpGetRx("https://httpbin.org/get?call=getNotifications")
.map { notifications ->
responses + listOf(notifications)
}
}
.flatMap { responses ->
httpGetRx("https://httpbin.org/get?call=getSettings")
.map { settings ->
responses + listOf(settings)
}
}
.subscribe(
{ responses ->
// show everything
},
{ error ->
Log.e("Rx", error.message, error)
},
{},
{ _ -> }
)
httpPostRx("https://httpbin.org/get?call=login", mutableMapOf( "example" to "user", "123456" to "pass")) .flatMap { _ -> httpGetRx("https://httpbin.org/get?call=getProfile") .map { profile -> listOf(profile) } } .flatMap { responses -> httpGetRx("https://httpbin.org/get?call=getNotifications") .map { notifications -> responses + listOf(notifications) } } .flatMap { responses -> httpGetRx("https://httpbin.org/get?call=getSettings") .map { settings -> responses + listOf(settings) } } .subscribe( { responses -> // show everything }, { error -> Log.e("Rx", error.message, error) }, {}, { _ -> } )
httpPostRx("https://httpbin.org/get?call=login", mutableMapOf( "example" to "user", "123456" to "pass"))
        .flatMap { _ ->
            httpGetRx("https://httpbin.org/get?call=getProfile")
                    .map { profile ->
                        listOf(profile)
                    }
        }
        .flatMap { responses ->
            httpGetRx("https://httpbin.org/get?call=getNotifications")
                    .map { notifications ->
                        responses + listOf(notifications)
                    }
        }
        .flatMap { responses ->
            httpGetRx("https://httpbin.org/get?call=getSettings")
                    .map { settings ->
                        responses + listOf(settings)
                    }
        }
        .subscribe(
                { responses ->
                    // show everything
                },
                { error ->
                    Log.e("Rx", error.message, error)
                },
                {},
                { _ -> }
        )

Finally, let’s plug this into a

Button
Button click and some text fields.

val loginButton = Button(this)
val usernameText = EditText(this)
val passText = EditText(this)
///////////////////////////////////
loginButton.clicks()
.flatMap {
httpPostRx("https://httpbin.org/get?call=login", mutableMapOf(usernameText.text.toString() to "user", passText.text.toString() to "pass"))
}
.flatMap { _ ->
httpGetRx("https://httpbin.org/get?call=getProfile")
.map { profile ->
listOf(profile)
}
}
.flatMap { responses ->
httpGetRx("https://httpbin.org/get?call=getNotifications")
.map { notifications ->
responses + listOf(notifications)
}
}
.flatMap { responses ->
httpGetRx("https://httpbin.org/get?call=getSettings")
.map { settings ->
responses + listOf(settings)
}
}
.subscribe(
{ responses ->
// show everything
},
{ error ->
Log.e("Rx", error.message, error)
},
{},
{ _ -> }
)
val loginButton = Button(this) val usernameText = EditText(this) val passText = EditText(this) /////////////////////////////////// loginButton.clicks() .flatMap { httpPostRx("https://httpbin.org/get?call=login", mutableMapOf(usernameText.text.toString() to "user", passText.text.toString() to "pass")) } .flatMap { _ -> httpGetRx("https://httpbin.org/get?call=getProfile") .map { profile -> listOf(profile) } } .flatMap { responses -> httpGetRx("https://httpbin.org/get?call=getNotifications") .map { notifications -> responses + listOf(notifications) } } .flatMap { responses -> httpGetRx("https://httpbin.org/get?call=getSettings") .map { settings -> responses + listOf(settings) } } .subscribe( { responses -> // show everything }, { error -> Log.e("Rx", error.message, error) }, {}, { _ -> } )
val loginButton = Button(this)
val usernameText = EditText(this)
val passText = EditText(this)

///////////////////////////////////


loginButton.clicks()
        .flatMap {
            httpPostRx("https://httpbin.org/get?call=login", mutableMapOf(usernameText.text.toString() to "user", passText.text.toString() to "pass"))
        }
        .flatMap { _ ->
            httpGetRx("https://httpbin.org/get?call=getProfile")
                    .map { profile ->
                        listOf(profile)
                    }
        }
        .flatMap { responses ->
            httpGetRx("https://httpbin.org/get?call=getNotifications")
                    .map { notifications ->
                        responses + listOf(notifications)
                    }
        }
        .flatMap { responses ->
            httpGetRx("https://httpbin.org/get?call=getSettings")
                    .map { settings ->
                        responses + listOf(settings)
                    }
        }
        .subscribe(
                { responses ->
                    // show everything
                },
                { error ->
                    Log.e("Rx", error.message, error)
                },
                {},
                { _ -> }
        )

Conclusion

Using

flatMap()
flatMap() comfortably takes practice. You will be making mistakes along the way. Your skepticism towards your approach should always be there. Kotlin being a strongly typed language really helps here. When you are in doubt, look at the input parameter of your currently worked operator lambda. If it is an
Observable
Observable, then maybe your previously chained operator should have missed the chance of being a
flatMap()
flatMap().

You are now a fellow Rx hipster. Enjoy your new tools! Let us know if you have any questions during the path.

Code

References