RxJava: Consume result of Observable on another thread

137
February 15, 2018, at 11:23 PM

I need to perform some work on background thread and then deliver result on main thread. I do the following:

Observable.just(new Object()).subscribeOn(Schedulers.newThread()).subscribeWith(new DisposableObserver<Object>() {
            @Override
            public void onNext(Object s) {
                try {
                    doSomething()
                    Observable.just(new Object())
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe(new Observer<Object>() {
                                @Override
                                public void onSubscribe(Disposable d) {
                                }
                                @Override
                                public void onNext(Object o) {
                                    completion.deliverResult()
                                    onComplete();
                                }
                                @Override
                                public void onError(Throwable e) {
                                }
                                @Override
                                public void onComplete() {
                                }
                            });
                } catch (DriverException e) {
                    badThingsHappened()
                    onError(e);
                }
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

But I don't like this code, it seems complex and many things are not used.

Is there a way to do it more elegant?

Answer 1

Your code can be converted to something similar to this one:


    Observable
            .create(emitter -> {
                Result result;
                try {
                    result = doSomething();
                } catch (Exception error) {
                    emitter.onError(error);
                    return;
                } finally {
                    if (result == null) {
                        emitter.onError(new IllegalStateException("Result cannot be null"));
                        return;
                    }
                }
                emitter.onNext(result);
                emitter.onComplete();
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(result -> completion.deliverResult(result),
                    throwable -> badThingsHappened(throwable));

But with this approach you are mixing traditional callbacks with reactive programming: completion.deliverResult() is a standard callback. Instead of doing that, return the observable stream itself and let the interested client to observe the stream itself.

Answer 2

you can do what you want in few lines of codes.

In this snippet I'm creating a Observable object from a Callable. The computation will be done on IO thread, but the result (or error eventually) will be observed on Android main thread

void execute() {
    // here you get your observable (api, db, other repositories). I'll create a simple String observable
    Observable<String> observable = Observable.fromCallable(
            () -> "This method will be executed on IO Thread"
    );
    // here you create your observer. I'll observe a string, so I need a Observer<String> obj
    Observer<String> stringObserver = new StringObserver();
    //now the observable will do his job on IO thread, but the result is emitted on mainThread
    observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(stringObserver);
}
class StringObserver extends DisposableObserver<String> {
    @Override
    public void onNext(String s) {
        //will be executed on main thread
    }
    @Override
    public void onError(Throwable e) {
        //will be executed on main thread
    }
    @Override
    public void onComplete() {
        //will be executed on main thread
    }
}
READ ALSO
Unable to use WHERE with Azure Mobile Offline Sync

Unable to use WHERE with Azure Mobile Offline Sync

I am using Azure Mobile Services to store data on the device for Offline sync purposes, which works fine but I am having an issue with using WHERE when running the sync

118
UrlConnection with Timer convert to RxJava

UrlConnection with Timer convert to RxJava

I think there should be a fancier way to do this with RxJava?

205
How do I reset ImageView memory in Android?

How do I reset ImageView memory in Android?

My app is a question and answer type thingSo I load specific string into an activity and that activity display it

182
OCR not working when background color &amp; text color almost simillar

OCR not working when background color & text color almost simillar

We are facing one issue when we try to scan bagThe main reason behind that the text color on the bag which are embroidered are almost same as bag's color

94