Rx for the Ultimate Beginner – Part 3 (Kotlin)

This is the third part of our ongoing “Rx for the Ultimate Beginner” series. If you haven’t read the previous ones, you can find the outline at the beginning of the first post. This post has a twin for iOS developers with code examples given in Swift.

A Small Recap

In the last post, we talked about different kinds of observables, namely hot and cold observables. If the source is capable of producing values even if no subscriber is connected and a subscription caused no side effects on the source, we named the observable as hot. If the subscriber’s existence determined the production of the emission and subscriptions can cause side effects on the source, we called them cold.

We also explored the Observable Contract. The contract specified that during the lifetime of a subscription, a subscriber can observe many consecutive onNext calls, and a potential onComplete or onError depending on the result. A subscription which observes an onComplete or an onError is considered finalized and cannot observe any other emissions.

These guidelines will become useful when we implement our own observables.

Add Rx Bindings

You need to add the line below to your app’s gradle script to be able to access or view specific Rx extensions via RxBinding.

implementation 'com.jakewharton.rxbinding3:rxbinding:3.0.0-alpha1'

Our First Observables

Observables are created via creation operators. There are many specialized implementations for creating timers, sequences, and ranges. In our case, we are going to start with the least magical one, namely Observable.create().

In order to keep the example concrete, we are going to wrap an HTTP call made with Volley inside of an observable. Here is the call as documented in the Volley documentation, without the wrapping:

val req = StringRequest(Request.Method.GET, "https://httpbin.org/get",
        Response.Listener<String> { response ->
            Log.d("Response", response)
        },
        Response.ErrorListener { error ->
            Log.e("Error", error)
        })

queue.add(req)

This piece of code makes an async call that has some kind of chaining structure built in for better usability. This could also be an async call to a background thread or some other library that accepts callbacks to notify its results. In our case, the wrapping process would be the same.

Let’s wrap it:

val queue = Volley.newRequestQueue(this)

val httpCallObservable: Observable<String> = Observable.create<String> { sub ->
    val req = StringRequest(Request.Method.GET, "https://httpbin.org/get",
            Response.Listener<String> { response ->
                sub.onNext(response)
            },
            Response.ErrorListener { error ->
                sub.onError(error)
            })

    queue.add(req)
}

What we did here is actually quite easy. We called Observable.create<String>() which accepts a lambda to define what to do when a subscriber subscribes. We used Observable<String>because Volley’s reponse has the type String. If you plan to emit other kinds of values inside your onNext calls, specify the type accordingly.

The lambda we provide is not called unless a subscriber subscribes. In other words, every time a subscriber subscribes, the call will be invoked again. We now have a reusable HTTP service. The subscriber is passed as a single parameter to the lambda. It has the contract methods we discussed earlier.

Inside the Response.Listener, we called sub.onNext with the response data and sub.onCompleted since this HTTP call has done its purpose. If an error occurs, Response.ErrorListener will trigger and without any emission, only sub.onError will happen. The contract is obeyed.

In order to make the call happen, we need to subscribe.

httpCallObservable.subscribe(
        { data ->
            Log.d("Rx", data)
        },
        { error ->
            Log.e("Rx", error.message, error)
        },
        {},
        { _ -> }
)

Great! We have successfully created our first observable. If we investigate whether we have  created a side effect on the source (which is https://httpbin.org/get), it is arguable that we have. But we at least know that this observable can only emit values if someone subscribes, so it is clear by the first guideline that this observable is cold.

Now, let’s create a hot one.

val clicks = textview.clicks()

clicks.subscribe(
        {
            Log.d("Rx", "click")
        },
        { error ->
            Log.e("Rx", error.message, error)
        },
        {},
        { _ -> }
)

RxBinding comes with useful extension helpers to observe existing UI widgets. In this example, we used View extensions to observe screenshot notifications.

This observable is hot because our subscription has no effect on user’s behavior, they can click the text view even if we don’t listen for them. Moreover, no matter how many times we subscribe, all subscriptions will receive the same click event for each click the user makes. We are merely observers of the outside world, thus it is called hot.

How to know what to use?

From now on, as an Rx user, the main question you are going to ask is “Is this an async event that can be implemented with Rx?”. If the answer is yes, chances are it is already implemented in the library. RxKotlin comes with almost all the operators available in the ReactiveX website. It also comes with RxAndroid, which has extension methods for Android operating systems. If you want to subscribe to common widgets, RxBinding is your friend. Many libraries that have nothing to do with Rx also have extension dependencies to add support for Rx. Even Volley has one which we ignored here for the purposes of this post.

In the next post, we are going to learn how to chain multiple operators and also the backbone of Rx, the flatMap operator.

Until then, peace!

Code

References