Code app using RxJava

RxJava Illustration

Posted by Mr.Humorous 🥘 on October 10, 2018

1. RxJava Operator Process Explanation

RxJava Operator Process Explanation

2. Create Observable

Observable Just Operator

 Observable.just(1, 2, 3, 4, 5)

Observable Filter Operator

 Observable
    .just(1, 2, 3, 4, 5)
    .filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            //check if the number is odd? If the number is odd return true, to emmit that object.
            return integer % 2 != 0;
        }
    });

3. Create Observer

Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onCompleted() {
        System.out.println("All data emitted.");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Error received: " + e.getMessage());
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("New data received: " + integer);
    }
};

When you don’t care about onCompleted() or onError(), you can use Action1 class (call() == onNext()).

Action1<Integer> onNextAction = new Action1<Integer>() {
    @Override
    public void call(Integer s) { //This is eqivelent to onNext()
        System.out.println(s);
    }
};

4. Create Scheduler

Observable<Integer> observable = Observable
  .just(1, 2, 3, 4, 5)
  .filter(new Func1<Integer, Boolean>() {
      @Override
      public Boolean call(Integer integer) {
          //check if the number is odd? If the number is odd return true, to emmit that object.
          return integer % 2 != 0;
      }
  });

Observer<Integer> observer = new Observer<Integer>() {
  @Override
  public void onCompleted() {
      System.out.println("All data emitted.");
  }

  @Override
  public void onError(Throwable e) {
      System.out.println("Error received: " + e.getMessage());
  }

  @Override
  public void onNext(Integer integer) {
      System.out.println("New data received: " + integer);
  }
};

Subscription subscription = observable
  .subscribeOn(Schedulers.io())       //observable will run on IO thread.
  .observeOn(AndroidSchedulers.mainThread())      //Observer will run on main thread.
  .subscribe(observer);               //subscribe the observer

Output of the above program is: Program Output

5. Unsubscribe

5.1 Unsubscribe from Single Subscription

subscription.unsubscribe();

5.2 Unsubscribe from Multiple Subscriptions

public class MainActivity extends BaseActivity {
    private CompositeSubscription mSubscription = new CompositeSubscription();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
      //...
      //...
      //...

      mSubscription.add(subscription1); //Add subscription 1
      mSubscription.add(subscription2); //Add subscription 2
    }

  @Override
  public void onDestroy() {
      super.onDestroy();

      //Unsubscribe both subscriptions.
      mSubscription.unsubscribe();
  }
}