创建操作符
just操作符
1
| Observable observable=Observable.just(10,2);
|
依次会调用onNext(10)和onNext(2);
interval操作符
按照固定时间间隔发送整数序列的Observable(从0开始)。
1
| Observable observable=Observable.interval(10,TimeUnit.SECONDS);
|
range操作符
创建发送指定范围的整数序列的Observable
1
| Observable observable=Observable.range(2,3);
|
从2开始3个整数
repeat操作符
创建重复N次的发送数据的Onservable
1
| Observable observable=Observable.range(2,2).repeat(4);
|
从2开始2个整数,再重复4次
defer操作符
直到有订阅者订阅时,才通过Observable的工厂方法创建Observable并执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| i=9; Observable observable=Observable.just(i); Observable deferObservable = Observable.defer(new Callable<ObservableSource<Integer>>() { @Override public ObservableSource call() throws Exception { return Observable.just(i); } });
i=10; deferObservable.subscribe(new Consumer() { @Override public void accept(@io.reactivex.annotations.NonNull Object o) throws Exception { Log.d("延迟显示",o.toString()); } }); observable.subscribe(new Consumer() { @Override public void accept(@io.reactivex.annotations.NonNull Object o) throws Exception { Log.d("显示",o.toString()); } });
|
结果
timer操作符
指定时间后发射一个数字0的Onservable
1
| Observable observable = Observable.timer(2,TimeUnit.SECONDS);
|
变换操作符
map操作符
将Observable转换为新的Observable并发送
1 2 3 4 5 6 7
| i=9; Observable observable = Observable.just(i).map(new Function() { @Override public Object apply(@io.reactivex.annotations.NonNull Object o) throws Exception { return Integer.parseInt(o.toString())+20; } });
|
flatMap操作符
将Observalbes的数据变化为Observable。
1 2 3 4 5 6 7
| i=9; Observable observable = Observable.just(i).flatMap(new Function<Integer, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@io.reactivex.annotations.NonNull Integer integer) throws Exception { return Observable.range(integer,3); } });
|
它与map的区别是它返回一个Observable,所以你可以在返回时再次进行操作。
cast操作符
可以将发射的数据转换为指定类型(比如子类转化为父类)
1 2 3 4 5 6
| Observable observable = Observable.just(i).flatMap(new Function<Integer, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@io.reactivex.annotations.NonNull Integer integer) throws Exception { return Observable.range(integer,3); } }).cast(Integer.class);
|
concatMap操作符
与flatMap的使用方法基本一样,但是flatMap变换后的输出顺序可能是交错的,concatMap不会交错。
1 2 3 4 5 6
| Observable observable = Observable.just(1,2,3,4,5).concatMap(new Function<Integer, ObservableSource<Integer>>() { @Override public ObservableSource apply(@io.reactivex.annotations.NonNull Integer integer) throws Exception { return Observable.just(integer*10).delay(10-integer,TimeUnit.SECONDS); } });
|
在上面这种情况下,输入依旧是:10,20,30,40,50;而如果是flatMap,则输出是50,40,30,20,10
flatMapIterable操作符
可以将数据打包成Iterable
1 2 3 4 5 6 7 8
| Observable observable = Observable.just(1,2,3,4,5).flatMapIterable(new Function<Integer, Iterable<Integer>>() { @Override public Iterable apply(@io.reactivex.annotations.NonNull Integer integer) throws Exception { List<Integer>list=new ArrayList<>() list.add(integer); return list; } });
|
buffer操作符
将传入的数据变成列表传输
1
| Observable observable = Observable.just(1,2,3,4,5).buffer(3);
|
以3个数据为一组。
输出:[1,2,3],[4,5]
groupBy操作符
将传入的数据分组后传出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| Person p1=new Person(1); Person p2=new Person(1); Person p3=new Person(3); Person p4=new Person(4); Person p5=new Person(3); Person p6=new Person(2);
Observable observable = Observable.just(p1,p2,p3,p4,p5,p6).groupBy(new Function<Person, Integer>() { @Override public Integer apply(@io.reactivex.annotations.NonNull Person person) throws Exception { return person.getAge(); } });
Observable.concat(observable).subscribe(new Consumer<Person>() { @Override public void accept(@io.reactivex.annotations.NonNull Person o) throws Exception { Log.d("显示",o.getAge()+""); } }); }
|
结果:1,1,3,3,4,2
过滤操作符
filter操作符
对源数据进行过滤
1 2 3 4 5 6
| Observable observable = Observable.just(1,3,3,45,67,78).filter(new Predicate<Integer>() { @Override public boolean test(@io.reactivex.annotations.NonNull Integer integer) throws Exception { return integer>50; } });
|
elementAt操作符
返回指定位置的数据
1
| Observable observable = Observable.just(1,3,3,45,67,78).elementAt(3).toObservable();
|
只取45
dustinct操作符
去重
1
| Observable observable=Observable.just(1,2,3,5,6,7,8,1,2).distinct();
|
输出:1,2,3,5,6,7,8
skip操作符
过滤掉n个数据
1
| Observable observable=Observable.just(1,2,3,5,6,7,8,1,2).skip(3);
|
输出:5,6,7,8,1,2
take操作符
取前N个数据
1
| Observable observable=Observable.just(1,2,3,5,6,7,8,1,2).take(3);
|
输出:1,2,3
还有takeLast,取后n个
ignoreElements操作符
将源数据全部忽略掉
1
| Observable bservable=Observable.just(1,2,3,5,6,7,8,1,2).ignoreElements().toObservable();
|
直接执行onCompleted()
throttleFirst操作符
定时发射时间段里收到的第一个数据
1
| Observable observable=Observable.interval(1,TimeUnit.SECONDS).throttleFirst(3,TimeUnit.SECONDS);
|
结果:0,4,8…
throttleWithTimeout操作符
在限定时间内如果有值传出来,就重新及时,超时就传出值
1
| Observable observable=Observable.interval(2,TimeUnit.SECONDS).throttleWithTimeout(1,TimeUnit.SECONDS);
|
结果:0,1,2,3…
组合操作符
startWith在数据源前插入数据
1 2 3
| List<Integer>li=new ArrayList<>(); li.add(1);li.add(2);li.add(1);li.add(1); Observable observable=Observable.just(1,2,3).startWith(li);
|
结果:1,2,1,1,1,2,3
merge操作符
将多个Observable合并到一个发送,数据可能会交错
1 2 3
| Observable observable1=Observable.just(1,2,3); Observable observable2=Observable.just(3,4,5); Observable observable= Observable.merge(observable1,observable2);
|
结果:1,2,3,3,4,5
concat操作符
将多个数据合并发送,数据会严格按照顺序排列
1 2 3
| Observable observable1=Observable.just(1,2,3); Observable observable2=Observable.just(3,4,5); Observable observable= Observable.concat(observable1,observable2);
|
结果:1,2,3,3,4,5
zip操作符
合并数据,并根据指定的函数变换
1 2 3 4 5 6 7 8
| Observable observable1=Observable.just(1,2,3); Observable observable2=Observable.just(3,4,5); Observable observable= Observable.zip(observable1, observable2, new BiFunction<Integer,Integer,String>() { @Override public String apply(@io.reactivex.annotations.NonNull Integer o, @io.reactivex.annotations.NonNull Integer o2) throws Exception { return o+"***"+o2; } });
|
结果:13,24,3***5
当数量不对等时,按最少的那个合并。
combineLastest操作符
将最近发射的数据项合并
1 2 3 4 5 6 7 8
| Observable observable1=Observable.just(1,2); Observable observable2=Observable.just(3,4,5); Observable observable= Observable.combineLatest(observable1, observable2, new BiFunction<Integer,Integer,String>() { @Override public String apply(@io.reactivex.annotations.NonNull Integer o, @io.reactivex.annotations.NonNull Integer o2) throws Exception { return o+"***"+o2; } });
|
当observable2发射数据时,observable1已经发射完,所有其最近数据项是2,所以结果是:23,24,2***5
辅助操作符
delay操作符
延迟操作
1
| Observable observable1=Observable.just(1,2).delay(2,TimeUnit.SECONDS);
|
Do操作符
Do系列操作符相当于在Observable生命周期时注册回调,比如:
doOnEach:每次发送数据时执行
doOnNext:执行onNext时调用
doOnSubscribe:观察者订阅时执行
doOnUnsubscribe:取消订阅时执行
doOnCompleted:完成时执行
doOnError:报错时执行
doOnTerminate:终止前(结束或异常)执行
finallyDo:终止后(结束或异常)执行
subscribeOn、observeOn操作符
subscribeOn指定Observable在哪个线程上运行,observeOn指定Observer在哪个线程运行
1
| observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
|
Observable在io线程,Observer在主线程
timeout操作符
当指定时间没有数据时,会执行onError
1
| Observable observable=Observable.interval(2000,TimeUnit.SECONDS).timeout(100,TimeUnit.MILLISECONDS);
|
错误处理操作符
catch操作符
catch将拦截onError,将他替换成其他数据。实现有3个操作符:onErrorReturn、onErrorResumeNext、onExceptionResumeNext。
onErrorReturn:错误时返回原来Observable的备用Observable
1 2 3 4 5 6 7 8 9 10 11 12 13
| List<Integer>li=new ArrayList<>(); li.add(1);li.add(2);li.add(1);li.add(1); Observable observable=Observable.just(li).map(new Function<List<Integer>,Integer>() { @Override public Integer apply(@io.reactivex.annotations.NonNull List<Integer> o) throws Exception { return o.get(33); } }).onErrorReturn(new Function<Throwable, Integer>() { @Override public Integer apply(@io.reactivex.annotations.NonNull Throwable throwable) throws Exception { return 1231321; } });
|
结果为:1231321
onErrorResumeNext:与onErrorReturn基本一样,但是它返回的是ObservableSource数据
1 2 3 4 5 6 7 8 9 10 11 12 13
| List<Integer>li=new ArrayList<>(); li.add(1);li.add(2);li.add(1);li.add(1); Observable observable=Observable.just(li).map(new Function<List<Integer>,Integer>() { @Override public Integer apply(@io.reactivex.annotations.NonNull List<Integer> o) throws Exception { return o.get(33); } }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(@io.reactivex.annotations.NonNull Throwable throwable) throws Exception { return Observable.just(11,3); } });
|
结果:11,3
onExceptionResumeNext:它收到后,需要自己重新调用onNext和onCompleted方法逻辑,不会返回Observable。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| List<Integer>li=new ArrayList<>(); li.add(1);li.add(2);li.add(1);li.add(1); Observable observable=Observable.just(li).map(new Function<List<Integer>,Integer>() { @Override public Integer apply(@io.reactivex.annotations.NonNull List<Integer> o) throws Exception { return o.get(33); } }).onExceptionResumeNext(new ObservableSource<Integer>() { @Override public void subscribe(@io.reactivex.annotations.NonNull Observer<? super Integer> observer) { observer.onNext(333); observer.onComplete(); } });
|
retry操作符
出错时,它会重新订阅相应的次数。
1 2 3 4 5 6 7 8 9
| List<Integer>li=new ArrayList<>(); li.add(1);li.add(2);li.add(1);li.add(1); Observable observable=Observable.just(li).map(new Function<List<Integer>,Integer>() { @Override public Integer apply(@io.reactivex.annotations.NonNull List<Integer> o) throws Exception { Log.d("显示","错误了"); return o.get(33); } }).retry(2);
|
会打印3次错误了。
条件操作符和布尔操作符
all操作符
会对所有的数据进行判断,全部满足则返回true,否则返回false
1 2 3 4 5 6
| Observable observable=Observable.just(1,2,3).all(new Predicate<Integer>() { @Override public boolean test(@io.reactivex.annotations.NonNull Integer integer) throws Exception { return integer>0; } }).toObservable();
|
结果:true
contain、isEmpty
contain:判断是否包含某个值
isEmpty:判断是否为空
1 2
| Observable observable=Observable.just(1,2,3).contains(2).toObservable(); Observable observable=Observable.just(1,2,3).isEmpty().toObservable();
|
结果:true,false
abm、abmArray操作符
发射先发射数据的
1 2 3
| Observable observabl1=Observable.just(1); Observable observabl2=Observable.just(2).delay(10,TimeUnit.SECONDS); Observable observable=Observable.ambArray(observabl1,observabl2);
|
结果:1
defaultIfEmpty操作符
发射原来的数据,如果没有数据就发射一个默认的数据。
转换操作符
toList操作符
将多个数据转换为List数据
1
| Observable observable=Observable.just(1,2,3).toList().toObservable();
|
结果为:[1, 2, 3]
toSortedList
用法与toList一样,但是它会自动排列顺序,默认按升序排序,数据项没有定义comparable接口,会抱错。也可以使用toSortedList(Comparator<? super T> comparator)定义顺序。
1 2 3 4 5 6
| Observable observable=Observable.just(1,2,3,1,2,5,2).toSortedList(new Comparator<Integer>() { @Override public int compare(Integer lhs, Integer rhs) { return lhs-rhs; } }).toObservable();
|
结果:[1, 1, 2, 2, 2, 3, 5]
toMap操作符
将数据添加到map(HashMap)发射,
1 2 3 4 5 6
| Observable observable=Observable.just(1,2,3,1,2,5,2).toMap(new Function<Integer, Object>() { @Override public Object apply(@io.reactivex.annotations.NonNull Integer integer) throws Exception { return integer+"*"; } }).toObservable();
|
apply中设置map的key
结果: {5*=5, 3*=3, 1*=1, 2*=2}