正是Rxjava丰富的操作符, 简化了我们的日常开发的工作量. 本文将介绍Rxjava自定义操作符在实际开发中的应用.

操作符应用举例

下面是一个使用例子:

//加购凑单逻辑 fun shoppingCartGoods(act: RaiseBuyActBO, goods: List<ActivityGoodsBO>?, limit: Int) = Observable .just(goods) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .flatMapIterable { it } .filter { MarketingLogic.isActivityGoods(act.rangeTypeIsAll(), act.filterGoodsIds(), it.goodsId) } .distinct { it.goodsId } .take(limit) .map { it.goodsId } .toList() .flatMap { GoodsDao.getGoodsById(it) } .flatMapIterable { it } .take(limit) .toList()

Rxjava操作符大全
Rxjava操作符大全

操作符的实现原理

操作符是如何实现的, 它的调度流程是怎么样的?

简化OnSubscribeLift 只保留核心逻辑如下:

public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { return Observable.create(new OnSubscribe<R>() { @Override public void call(Subscriber subscriber) { Subscriber newSubscriber = operator.call(subscriber); //All starting with subscribe() newSubscriber.onStart(); //Default do nothing onSubscribe.call(newSubscriber); //Observable.this.onSubscribe } }); }

Operator的具体实现可以参考OperatorFilter

调用流程的理解可以使用递归思想
Subscriber: 动作的发起者, 结果的获取者
整体过程: 下游流向上游, 上游再流回下游的一个过程

多次的lift的流程图
多次的lift的流程图

在扫码枪中自定义操作符的应用

将扫码枪输入的一个一个的按键事件(KeyEvent), 缓存起来再达到条件时全部发送出去.

本来事情很简单, 只要接受到回车键(ENTER)就发送出去, 但是却存在一些扫码枪一次扫码最后不会发送回车键. 所以可以使用扫码枪共有特征, 就是每个按键事件的间隔很短, 可用找到一个比较合适的时间阈值来鉴别是否为一次扫码.

所以, 这个操作符的功能就是在条件下进行缓存, 我们给它起个名字叫BufferWhile

BufferWhile

/** * BufferWhile, buffer while some predicate allows it based on the items. * Notice: the operator not emit the last item. * Changes: auto start loop checker * refer: * 1. https://github.com/akarnokd/RxJavaExtensions/blob/master/src/main/java/hu/akarnokd/rxjava2/operators/FlowableBufferPredicate.java * 2. https://stackoverflow.com/questions/35881227/rxjava-buffer-items-until-some-condition-is-true-for-current-item * Created by qbeenslee on 2019-09-06. */ @BackpressureSupport(BackpressureKind.FULL) public class BufferWhile<T> implements Observable.Operator<List<T>, T> { private final Func1<T, Boolean> predicate; private final long timeout; private final TimeUnit timeUnit; private final long duration; //NANOSECONDS private final Scheduler scheduler; public BufferWhile(Func1<T, Boolean> predicate) { this(0L, TimeUnit.SECONDS, predicate, Schedulers.computation()); } public BufferWhile(long timeout, TimeUnit unit, Func1<T, Boolean> predicate, @NonNull Scheduler scheduler) { this.predicate = predicate; this.timeout = timeout; this.timeUnit = unit; this.duration = timeUnit != null ? timeUnit.toNanos(timeout) : 0; this.scheduler = scheduler; } @Override public Subscriber<? super T> call(@NonNull Subscriber<? super List<T>> child) { SerializedSubscriber<List<T>> serialized = new SerializedSubscriber<>(child); Scheduler.Worker inner = this.scheduler.createWorker(); BufferWhileSubscriber parent = new BufferWhileSubscriber(serialized, inner); parent.add(inner); child.add(parent); parent.scheduleExact(); return parent; } final class BufferWhileSubscriber extends Subscriber<T> { /** * child */ final Subscriber<? super List<T>> actual; /** * Loop */ final Scheduler.Worker inner; /** * 缓存的数据 */ @Nullable private List<T> buffer; /** * 是否开启轮询 */ private boolean isLoopActivated = false; /** * 发射次数 */ private long emitCount = 0; /** * 单次初始接收时间 */ private long initialTimestamp = 0; /** * 接收到上次信号的时间 */ private long lastTimestamp = 0; BufferWhileSubscriber(Subscriber<? super List<T>> actual, Scheduler.Worker inner) { this.actual = actual; this.inner = inner; buffer = new LinkedList<>(); // 由于逻辑特性, 此处采取LinkedList获取更好的性能 } @SuppressWarnings("unstable") @Override public void onNext(T t) { // 每次接收发射后 if (buffer != null && buffer.isEmpty()) { resetPeriodically(); } boolean accept = false; try { accept = predicate.call(t); } catch (Throwable error) { Exceptions.throwOrReport(error, this); } synchronized (this) { if (accept) { if (buffer != null) { buffer.add(t); // 不添加最后一个对象 } } else if (buffer != null && !buffer.isEmpty()) { emit(); } } checkAndActivate(); lastTimestamp = now(); // 刷新每次接收的时间戳 } private long now() { return System.nanoTime(); } private void emit() { List<T> toEmit = buffer; buffer = new LinkedList<>(); // 重置 try { actual.onNext(toEmit); emitCount++; } catch (Throwable error) { Exceptions.throwOrReport(error, this); } } private void resetPeriodically() { initialTimestamp = now(); } @Override public void onError(Throwable e) { buffer = null; actual.onError(e); this.unsubscribe(); } @Override public void onCompleted() { this.inner.unsubscribe(); if (buffer != null && !buffer.isEmpty()) { emit(); } actual.onCompleted(); this.unsubscribe(); lastTimestamp = 0L; initialTimestamp = 0L; isLoopActivated = false; emitCount = 0L; } private void checkAndActivate() { if (isLoopActivated) return; long now = now(); boolean shouldActivate = emitCount == 0 // 发射次数为0 && buffer != null && !buffer.isEmpty() // 缓存数据不为空 && initialTimestamp != 0L && duration <= (now - initialTimestamp); // 距离初次接受时间大于指定间隔 if (shouldActivate && !isLoopActivated) { scheduleExact(); isLoopActivated = true; } } void scheduleExact() { this.inner.schedulePeriodically(new Action0() { public void call() { if (test()) { emit(); } } }, timeout, timeout, timeUnit); } private boolean test() { synchronized (this) { long now = now(); return buffer != null && !buffer.isEmpty() // 缓存数据不为空 && lastTimestamp != 0L && duration < (now - lastTimestamp); //距离上一个接受到信号超过指定间隔 } } } }

思考题: 如何区分扫码枪输入和键盘手动输入?

文章作者: qbeenslee

CC BY-NC 4.0