正是Rxjava丰富的操作符, 简化了我们的日常开发的工作量. 本文将介绍Rxjava自定义操作符在实际开发中的应用.
操作符应用举例
下面是一个使用例子:
//加购凑单逻辑
fun shoppingCartGoods(act: RaiseBuyActBO, goods: List<ActivityGoodsBO>?, limit: Int) = Observable
.just(goods)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.flatMapIterable { it }
.filter { MarketingLogic.isActivityGoods(act.rangeTypeIsAll(), act.filterGoodsIds(), it.goodsId) }
.distinct { it.goodsId }
.take(limit)
.map { it.goodsId }
.toList()
.flatMap { GoodsDao.getGoodsById(it) }
.flatMapIterable { it }
.take(limit)
.toList()
操作符的实现原理
操作符是如何实现的, 它的调度流程是怎么样的?
简化OnSubscribeLift
只保留核心逻辑如下:
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber); //All starting with subscribe()
newSubscriber.onStart(); //Default do nothing
onSubscribe.call(newSubscriber); //Observable.this.onSubscribe
}
});
}
Operator
的具体实现可以参考OperatorFilter
调用流程的理解可以使用递归思想
Subscriber: 动作的发起者, 结果的获取者
整体过程: 下游流向上游, 上游再流回下游的一个过程
在扫码枪中自定义操作符的应用
将扫码枪输入的一个一个的按键事件(KeyEvent), 缓存起来再达到条件时全部发送出去.
本来事情很简单, 只要接受到回车键(ENTER)就发送出去, 但是却存在一些扫码枪一次扫码最后不会发送回车键. 所以可以使用扫码枪共有特征, 就是每个按键事件的间隔很短, 可用找到一个比较合适的时间阈值来鉴别是否为一次扫码.
所以, 这个操作符的功能就是在条件下进行缓存, 我们给它起个名字叫BufferWhile
BufferWhile
/**
* BufferWhile, buffer while some predicate allows it based on the items.
* Notice: the operator not emit the last item.
* Changes: auto start loop checker
* refer:
* 1. https://github.com/akarnokd/RxJavaExtensions/blob/master/src/main/java/hu/akarnokd/rxjava2/operators/FlowableBufferPredicate.java
* 2. https://stackoverflow.com/questions/35881227/rxjava-buffer-items-until-some-condition-is-true-for-current-item
* Created by qbeenslee on 2019-09-06.
*/
@BackpressureSupport(BackpressureKind.FULL)
public class BufferWhile<T> implements Observable.Operator<List<T>, T> {
private final Func1<T, Boolean> predicate;
private final long timeout;
private final TimeUnit timeUnit;
private final long duration; //NANOSECONDS
private final Scheduler scheduler;
public BufferWhile(Func1<T, Boolean> predicate) {
this(0L, TimeUnit.SECONDS, predicate, Schedulers.computation());
}
public BufferWhile(long timeout, TimeUnit unit, Func1<T, Boolean> predicate, @NonNull Scheduler scheduler) {
this.predicate = predicate;
this.timeout = timeout;
this.timeUnit = unit;
this.duration = timeUnit != null ? timeUnit.toNanos(timeout) : 0;
this.scheduler = scheduler;
}
@Override
public Subscriber<? super T> call(@NonNull Subscriber<? super List<T>> child) {
SerializedSubscriber<List<T>> serialized = new SerializedSubscriber<>(child);
Scheduler.Worker inner = this.scheduler.createWorker();
BufferWhileSubscriber parent = new BufferWhileSubscriber(serialized, inner);
parent.add(inner);
child.add(parent);
parent.scheduleExact();
return parent;
}
final class BufferWhileSubscriber extends Subscriber<T> {
/**
* child
*/
final Subscriber<? super List<T>> actual;
/**
* Loop
*/
final Scheduler.Worker inner;
/**
* 缓存的数据
*/
@Nullable
private List<T> buffer;
/**
* 是否开启轮询
*/
private boolean isLoopActivated = false;
/**
* 发射次数
*/
private long emitCount = 0;
/**
* 单次初始接收时间
*/
private long initialTimestamp = 0;
/**
* 接收到上次信号的时间
*/
private long lastTimestamp = 0;
BufferWhileSubscriber(Subscriber<? super List<T>> actual, Scheduler.Worker inner) {
this.actual = actual;
this.inner = inner;
buffer = new LinkedList<>(); // 由于逻辑特性, 此处采取LinkedList获取更好的性能
}
@SuppressWarnings("unstable")
@Override
public void onNext(T t) {
// 每次接收发射后
if (buffer != null && buffer.isEmpty()) {
resetPeriodically();
}
boolean accept = false;
try {
accept = predicate.call(t);
} catch (Throwable error) {
Exceptions.throwOrReport(error, this);
}
synchronized (this) {
if (accept) {
if (buffer != null) {
buffer.add(t); // 不添加最后一个对象
}
} else if (buffer != null && !buffer.isEmpty()) {
emit();
}
}
checkAndActivate();
lastTimestamp = now(); // 刷新每次接收的时间戳
}
private long now() {
return System.nanoTime();
}
private void emit() {
List<T> toEmit = buffer;
buffer = new LinkedList<>(); // 重置
try {
actual.onNext(toEmit);
emitCount++;
} catch (Throwable error) {
Exceptions.throwOrReport(error, this);
}
}
private void resetPeriodically() {
initialTimestamp = now();
}
@Override
public void onError(Throwable e) {
buffer = null;
actual.onError(e);
this.unsubscribe();
}
@Override
public void onCompleted() {
this.inner.unsubscribe();
if (buffer != null && !buffer.isEmpty()) {
emit();
}
actual.onCompleted();
this.unsubscribe();
lastTimestamp = 0L;
initialTimestamp = 0L;
isLoopActivated = false;
emitCount = 0L;
}
private void checkAndActivate() {
if (isLoopActivated) return;
long now = now();
boolean shouldActivate = emitCount == 0 // 发射次数为0
&& buffer != null && !buffer.isEmpty() // 缓存数据不为空
&& initialTimestamp != 0L && duration <= (now - initialTimestamp); // 距离初次接受时间大于指定间隔
if (shouldActivate && !isLoopActivated) {
scheduleExact();
isLoopActivated = true;
}
}
void scheduleExact() {
this.inner.schedulePeriodically(new Action0() {
public void call() {
if (test()) {
emit();
}
}
}, timeout, timeout, timeUnit);
}
private boolean test() {
synchronized (this) {
long now = now();
return buffer != null && !buffer.isEmpty() // 缓存数据不为空
&& lastTimestamp != 0L && duration < (now - lastTimestamp); //距离上一个接受到信号超过指定间隔
}
}
}
}
思考题: 如何区分扫码枪输入和键盘手动输入?
文章作者: qbeenslee