Rxjava常用操作符

Posted by alonealice on 2017-07-27

创建操作符

just操作符

1
Observable observable=Observable.just(10,2);

依次会调用onNext(10)和onNext(2);

interval操作符

按照固定时间间隔发送整数序列的Observable(从0开始)。

1
Observable observable=Observable.interval(10,TimeUnit.SECONDS);

range操作符

创建发送指定范围的整数序列的Observable

1
Observable observable=Observable.range(2,3);

从2开始3个整数

repeat操作符

创建重复N次的发送数据的Onservable

1
Observable observable=Observable.range(2,2).repeat(4);

从2开始2个整数,再重复4次

defer操作符

直到有订阅者订阅时,才通过Observable的工厂方法创建Observable并执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
i=9;
Observable observable=Observable.just(i);
Observable deferObservable = Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource call() throws Exception {
return Observable.just(i);
}
});

i=10;
deferObservable.subscribe(new Consumer() {
@Override
public void accept(@io.reactivex.annotations.NonNull Object o) throws Exception {
Log.d("延迟显示",o.toString());
}
});
observable.subscribe(new Consumer() {
@Override
public void accept(@io.reactivex.annotations.NonNull Object o) throws Exception {
Log.d("显示",o.toString());
}
});

结果

1
2
延迟显示: 10
显示: 9

timer操作符

指定时间后发射一个数字0的Onservable

1
Observable observable = Observable.timer(2,TimeUnit.SECONDS);

变换操作符

map操作符

将Observable转换为新的Observable并发送

1
2
3
4
5
6
7
i=9;
Observable observable = Observable.just(i).map(new Function() {
@Override
public Object apply(@io.reactivex.annotations.NonNull Object o) throws Exception {
return Integer.parseInt(o.toString())+20;
}
});

flatMap操作符

将Observalbes的数据变化为Observable。

1
2
3
4
5
6
7
i=9;
Observable observable = Observable.just(i).flatMap(new Function<Integer, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@io.reactivex.annotations.NonNull Integer integer) throws Exception {
return Observable.range(integer,3);
}
});

它与map的区别是它返回一个Observable,所以你可以在返回时再次进行操作。

cast操作符

可以将发射的数据转换为指定类型(比如子类转化为父类)

1
2
3
4
5
6
Observable observable = Observable.just(i).flatMap(new Function<Integer, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@io.reactivex.annotations.NonNull Integer integer) throws Exception {
return Observable.range(integer,3);
}
}).cast(Integer.class);

concatMap操作符

与flatMap的使用方法基本一样,但是flatMap变换后的输出顺序可能是交错的,concatMap不会交错。

1
2
3
4
5
6
Observable observable = Observable.just(1,2,3,4,5).concatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource apply(@io.reactivex.annotations.NonNull Integer integer) throws Exception {
return Observable.just(integer*10).delay(10-integer,TimeUnit.SECONDS);
}
});

在上面这种情况下,输入依旧是:10,20,30,40,50;而如果是flatMap,则输出是50,40,30,20,10

flatMapIterable操作符

可以将数据打包成Iterable

1
2
3
4
5
6
7
8
Observable observable = Observable.just(1,2,3,4,5).flatMapIterable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable apply(@io.reactivex.annotations.NonNull Integer integer) throws Exception {
List<Integer>list=new ArrayList<>()
list.add(integer);
return list;
}
});

buffer操作符

将传入的数据变成列表传输

1
Observable observable = Observable.just(1,2,3,4,5).buffer(3);

以3个数据为一组。
输出:[1,2,3],[4,5]

groupBy操作符

将传入的数据分组后传出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Person p1=new Person(1);
Person p2=new Person(1);
Person p3=new Person(3);
Person p4=new Person(4);
Person p5=new Person(3);
Person p6=new Person(2);

Observable observable = Observable.just(p1,p2,p3,p4,p5,p6).groupBy(new Function<Person, Integer>() {
@Override
public Integer apply(@io.reactivex.annotations.NonNull Person person) throws Exception {
return person.getAge();
}
});


Observable.concat(observable).subscribe(new Consumer<Person>() {
@Override
public void accept(@io.reactivex.annotations.NonNull Person o) throws Exception {
Log.d("显示",o.getAge()+"");
}
});
}

结果:1,1,3,3,4,2

过滤操作符

filter操作符

对源数据进行过滤

1
2
3
4
5
6
Observable observable = Observable.just(1,3,3,45,67,78).filter(new Predicate<Integer>() {
@Override
public boolean test(@io.reactivex.annotations.NonNull Integer integer) throws Exception {
return integer>50;
}
});

elementAt操作符

返回指定位置的数据

1
Observable observable = Observable.just(1,3,3,45,67,78).elementAt(3).toObservable();

只取45

dustinct操作符

去重

1
Observable observable=Observable.just(1,2,3,5,6,7,8,1,2).distinct();

输出:1,2,3,5,6,7,8

skip操作符

过滤掉n个数据

1
Observable observable=Observable.just(1,2,3,5,6,7,8,1,2).skip(3);

输出:5,6,7,8,1,2

take操作符

取前N个数据

1
Observable observable=Observable.just(1,2,3,5,6,7,8,1,2).take(3);

输出:1,2,3
还有takeLast,取后n个

ignoreElements操作符

将源数据全部忽略掉

1
Observable bservable=Observable.just(1,2,3,5,6,7,8,1,2).ignoreElements().toObservable();

直接执行onCompleted()

throttleFirst操作符

定时发射时间段里收到的第一个数据

1
Observable observable=Observable.interval(1,TimeUnit.SECONDS).throttleFirst(3,TimeUnit.SECONDS);

结果:0,4,8…

throttleWithTimeout操作符

在限定时间内如果有值传出来,就重新及时,超时就传出值

1
Observable observable=Observable.interval(2,TimeUnit.SECONDS).throttleWithTimeout(1,TimeUnit.SECONDS);

结果:0,1,2,3…

组合操作符

startWith在数据源前插入数据

1
2
3
List<Integer>li=new ArrayList<>();
li.add(1);li.add(2);li.add(1);li.add(1);
Observable observable=Observable.just(1,2,3).startWith(li);

结果:1,2,1,1,1,2,3

merge操作符

将多个Observable合并到一个发送,数据可能会交错

1
2
3
Observable observable1=Observable.just(1,2,3);
Observable observable2=Observable.just(3,4,5);
Observable observable= Observable.merge(observable1,observable2);

结果:1,2,3,3,4,5

concat操作符

将多个数据合并发送,数据会严格按照顺序排列

1
2
3
Observable observable1=Observable.just(1,2,3);
Observable observable2=Observable.just(3,4,5);
Observable observable= Observable.concat(observable1,observable2);

结果:1,2,3,3,4,5

zip操作符

合并数据,并根据指定的函数变换

1
2
3
4
5
6
7
8
Observable observable1=Observable.just(1,2,3);
Observable observable2=Observable.just(3,4,5);
Observable observable= Observable.zip(observable1, observable2, new BiFunction<Integer,Integer,String>() {
@Override
public String apply(@io.reactivex.annotations.NonNull Integer o, @io.reactivex.annotations.NonNull Integer o2) throws Exception {
return o+"***"+o2;
}
});

结果:13,24,3***5
当数量不对等时,按最少的那个合并。

combineLastest操作符

将最近发射的数据项合并

1
2
3
4
5
6
7
8
Observable observable1=Observable.just(1,2);
Observable observable2=Observable.just(3,4,5);
Observable observable= Observable.combineLatest(observable1, observable2, new BiFunction<Integer,Integer,String>() {
@Override
public String apply(@io.reactivex.annotations.NonNull Integer o, @io.reactivex.annotations.NonNull Integer o2) throws Exception {
return o+"***"+o2;
}
});

当observable2发射数据时,observable1已经发射完,所有其最近数据项是2,所以结果是:23,24,2***5

辅助操作符

delay操作符

延迟操作

1
Observable observable1=Observable.just(1,2).delay(2,TimeUnit.SECONDS);

Do操作符

Do系列操作符相当于在Observable生命周期时注册回调,比如:
doOnEach:每次发送数据时执行
doOnNext:执行onNext时调用
doOnSubscribe:观察者订阅时执行
doOnUnsubscribe:取消订阅时执行
doOnCompleted:完成时执行
doOnError:报错时执行
doOnTerminate:终止前(结束或异常)执行
finallyDo:终止后(结束或异常)执行

subscribeOn、observeOn操作符

subscribeOn指定Observable在哪个线程上运行,observeOn指定Observer在哪个线程运行

1
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())

Observable在io线程,Observer在主线程

timeout操作符

当指定时间没有数据时,会执行onError

1
Observable observable=Observable.interval(2000,TimeUnit.SECONDS).timeout(100,TimeUnit.MILLISECONDS);

错误处理操作符

catch操作符

catch将拦截onError,将他替换成其他数据。实现有3个操作符:onErrorReturn、onErrorResumeNext、onExceptionResumeNext。
onErrorReturn:错误时返回原来Observable的备用Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
List<Integer>li=new ArrayList<>();
li.add(1);li.add(2);li.add(1);li.add(1);
Observable observable=Observable.just(li).map(new Function<List<Integer>,Integer>() {
@Override
public Integer apply(@io.reactivex.annotations.NonNull List<Integer> o) throws Exception {
return o.get(33);
}
}).onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(@io.reactivex.annotations.NonNull Throwable throwable) throws Exception {
return 1231321;
}
});

结果为:1231321

onErrorResumeNext:与onErrorReturn基本一样,但是它返回的是ObservableSource数据

1
2
3
4
5
6
7
8
9
10
11
12
13
List<Integer>li=new ArrayList<>();
li.add(1);li.add(2);li.add(1);li.add(1);
Observable observable=Observable.just(li).map(new Function<List<Integer>,Integer>() {
@Override
public Integer apply(@io.reactivex.annotations.NonNull List<Integer> o) throws Exception {
return o.get(33);
}
}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(@io.reactivex.annotations.NonNull Throwable throwable) throws Exception {
return Observable.just(11,3);
}
});

结果:11,3

onExceptionResumeNext:它收到后,需要自己重新调用onNext和onCompleted方法逻辑,不会返回Observable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
List<Integer>li=new ArrayList<>();
li.add(1);li.add(2);li.add(1);li.add(1);
Observable observable=Observable.just(li).map(new Function<List<Integer>,Integer>() {
@Override
public Integer apply(@io.reactivex.annotations.NonNull List<Integer> o) throws Exception {
return o.get(33);
}
}).onExceptionResumeNext(new ObservableSource<Integer>() {
@Override
public void subscribe(@io.reactivex.annotations.NonNull Observer<? super Integer> observer) {
observer.onNext(333);
observer.onComplete();
}
});

retry操作符

出错时,它会重新订阅相应的次数。

1
2
3
4
5
6
7
8
9
List<Integer>li=new ArrayList<>();
li.add(1);li.add(2);li.add(1);li.add(1);
Observable observable=Observable.just(li).map(new Function<List<Integer>,Integer>() {
@Override
public Integer apply(@io.reactivex.annotations.NonNull List<Integer> o) throws Exception {
Log.d("显示","错误了");
return o.get(33);
}
}).retry(2);

会打印3次错误了。

条件操作符和布尔操作符

all操作符

会对所有的数据进行判断,全部满足则返回true,否则返回false

1
2
3
4
5
6
Observable observable=Observable.just(1,2,3).all(new Predicate<Integer>() {
@Override
public boolean test(@io.reactivex.annotations.NonNull Integer integer) throws Exception {
return integer>0;
}
}).toObservable();

结果:true

contain、isEmpty

contain:判断是否包含某个值
isEmpty:判断是否为空

1
2
Observable observable=Observable.just(1,2,3).contains(2).toObservable();
Observable observable=Observable.just(1,2,3).isEmpty().toObservable();

结果:true,false

abm、abmArray操作符

发射先发射数据的

1
2
3
Observable observabl1=Observable.just(1);
Observable observabl2=Observable.just(2).delay(10,TimeUnit.SECONDS);
Observable observable=Observable.ambArray(observabl1,observabl2);

结果:1

defaultIfEmpty操作符

发射原来的数据,如果没有数据就发射一个默认的数据。

转换操作符

toList操作符

将多个数据转换为List数据

1
Observable observable=Observable.just(1,2,3).toList().toObservable();

结果为:[1, 2, 3]

toSortedList

用法与toList一样,但是它会自动排列顺序,默认按升序排序,数据项没有定义comparable接口,会抱错。也可以使用toSortedList(Comparator<? super T> comparator)定义顺序。

1
2
3
4
5
6
Observable observable=Observable.just(1,2,3,1,2,5,2).toSortedList(new Comparator<Integer>() {
@Override
public int compare(Integer lhs, Integer rhs) {
return lhs-rhs;
}
}).toObservable();

结果:[1, 1, 2, 2, 2, 3, 5]

toMap操作符

将数据添加到map(HashMap)发射,

1
2
3
4
5
6
Observable observable=Observable.just(1,2,3,1,2,5,2).toMap(new Function<Integer, Object>() {
@Override
public Object apply(@io.reactivex.annotations.NonNull Integer integer) throws Exception {
return integer+"*";
}
}).toObservable();

apply中设置map的key
结果: {5*=5, 3*=3, 1*=1, 2*=2}