B
B
BitNeBolt2020-11-10 22:27:37
Java
BitNeBolt, 2020-11-10 22:27:37

Am I creating the Flowable in RxJava3 correctly?

Just started to get acquainted with this framework, and I feel like a monkey with glasses.

In general, I need a Flowable that will create objects and pass them to the observer. The documentation says that the recommended way right now is to use Flowable.create(). As I understand it, this is not the best way, because before executing the listener methods, you need to check whether it is signed.

Here is the implementation itself

Flowable<Person> source = Flowable.create(new FlowableOnSubscribe<Person>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<Person> emitter) throws Throwable {
                for (int i = 0; i < 100000; i++){
                    try {
                        Person person = new Person("Name " + i, i);

                        if (!emitter.isCancelled()) {
                            emitter.onNext(person);
                        }

                    } catch (Throwable t) {
                        if (! emitter.isCancelled())
                            emitter.onError(t);
                    }
                }

                if (!emitter.isCancelled())
                    emitter.onComplete();
            }
        }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.computation());



The listener itself must implement the appropriate interface. Since I need to unsubscribe it from the data stream, I need a method for unsubscribing, so I did it in a separate class.
The class itself

private static class FSubscriber implements FlowableSubscriber<Person>{
        private Subscription subscription;

        public void unsubscribe() {
            subscription.cancel();
        }

        @Override
        public void onSubscribe(@NonNull Subscription s) {
            this.subscription = s;
            this.subscription.request(1);

            System.out.println("On subs");
        }

        @Override
        public void onNext(Person person) {
            System.out.println("On next: " + person.toString());
            subscription.request(1);
        }

        @Override
        public void onError(Throwable t) {
            System.err.println(t.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("On complete");
        }
    };



The result is a lot of code that will be used every time. Hence my questions:
  1. Did I do everything right in terms of using the framework?
  2. Should I make an abstract class for that will be the parent of all similar listeners?
  3. Since Flowable must check whether the listener is signed every time, is it worth writing such a check in the methods of the listener itself, how will this affect readability and debuggability?

Answer the question

In order to leave comments, you need to log in

1 answer(s)
D
Denis Zagaevsky, 2020-11-11
@BitNeBolt

In your case, it's generally better to use Flowable.fromIterable .
If you implement it by hand, checks for cancellation can be omitted. They are under the hood.
About subscribers. You have chosen the most difficult path. Look from here and down. Usually subscribe is used with one or two consumers (onNext and onError respectively). Then you won't have to write all this bunch of code.
As a piece of advice - still drag lambdas, either by taking a newer Java (8+) or use RetroLambda.
Honestly, with anonymous classes, Rx looks like a complete tin, you won’t wish it on your enemy.

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question