RXJAVA自定义操作符与扫码枪

本文创作于2019年10月24日 20:33

正是Rxjava丰富的操作符, 简化了我们的日常开发的工作量. 本文将介绍Rxjava自定义操作符在实际开发中的应用.

操作符应用举例

下面是一个使用例子:

//加购凑单逻辑
fun shoppingCartGoods(act: RaiseBuyActBO, goods: List<ActivityGoodsBO>?, limit: Int) = Observable
        .just(goods)
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .flatMapIterable { it }
        .filter { MarketingLogic.isActivityGoods(act.rangeTypeIsAll(), act.filterGoodsIds(), it.goodsId) }
        .distinct { it.goodsId }
        .take(limit)
        .map { it.goodsId }
        .toList()
        .flatMap { GoodsDao.getGoodsById(it) }
        .flatMapIterable { it }
        .take(limit)
        .toList()

Rxjava操作符大全
Rxjava操作符大全

操作符的实现原理

操作符是如何实现的, 它的调度流程是怎么样的?

简化OnSubscribeLift只保留核心逻辑如下:

public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber); //All starting with subscribe()
            newSubscriber.onStart(); //Default do nothing 
            onSubscribe.call(newSubscriber); //Observable.this.onSubscribe
        }
    });
}

Operator的具体实现可以参考OperatorFilter

调用流程的理解可以使用递归思想
Subscriber: 动作的发起者, 结果的获取者
整体过程: 下游流向上游, 上游再流回下游的一个过程

多次的lift的流程图
多次的lift的流程图

在扫码枪中自定义操作符的应用

将扫码枪输入的一个一个的按键事件(KeyEvent), 缓存起来再达到条件时全部发送出去

本来事情很简单, 只要接受到回车键(ENTER)就发送出去, 但是却存在一些扫码枪一次扫码最后不会发送回车键.
所以可以使用扫码枪共有特征, 就是每个按键事件的间隔很短, 可用找到一个比较合适的时间阈值来鉴别是否为一次扫码.

所以, 这个操作符的功能就是在条件下进行缓存, 我们给它起个名字叫BufferWhile

BufferWhile

/**
 * BufferWhile, buffer while some predicate allows it based on the items.
 * Notice: the operator not emit the last item.
 * Changes: auto start loop checker
 * refer:
 * 1. https://github.com/akarnokd/RxJavaExtensions/blob/master/src/main/java/hu/akarnokd/rxjava2/operators/FlowableBufferPredicate.java
 * 2. https://stackoverflow.com/questions/35881227/rxjava-buffer-items-until-some-condition-is-true-for-current-item
 * Created by qbeenslee on 2019-09-06.
 */
@BackpressureSupport(BackpressureKind.FULL)
public class BufferWhile<T> implements Observable.Operator<List<T>, T> {
    private final Func1<T, Boolean> predicate;
    private final long timeout;
    private final TimeUnit timeUnit;
    private final long duration; //NANOSECONDS
    private final Scheduler scheduler;


    public BufferWhile(Func1<T, Boolean> predicate) {
        this(0L, TimeUnit.SECONDS, predicate, Schedulers.computation());
    }

    public BufferWhile(long timeout, TimeUnit unit, Func1<T, Boolean> predicate, @NonNull Scheduler scheduler) {
        this.predicate = predicate;
        this.timeout = timeout;
        this.timeUnit = unit;
        this.duration = timeUnit != null ? timeUnit.toNanos(timeout) : 0;
        this.scheduler = scheduler;
    }

    @Override
    public Subscriber<? super T> call(@NonNull Subscriber<? super List<T>> child) {
        SerializedSubscriber<List<T>> serialized = new SerializedSubscriber<>(child);
        Scheduler.Worker inner = this.scheduler.createWorker();
        BufferWhileSubscriber parent = new BufferWhileSubscriber(serialized, inner);
        parent.add(inner);
        child.add(parent);

        parent.scheduleExact();
        return parent;
    }


    final class BufferWhileSubscriber extends Subscriber<T> {
        /**
         * child
         */
        final Subscriber<? super List<T>> actual;
        /**
         * Loop
         */
        final Scheduler.Worker inner;
        /**
         * 缓存的数据
         */
        @Nullable
        private List<T> buffer;
        /**
         * 是否开启轮询
         */
        private boolean isLoopActivated = false;

        /**
         * 发射次数
         */
        private long emitCount = 0;

        /**
         * 单次初始接收时间
         */
        private long initialTimestamp = 0;

        /**
         * 接收到上次信号的时间
         */
        private long lastTimestamp = 0;


        BufferWhileSubscriber(Subscriber<? super List<T>> actual, Scheduler.Worker inner) {
            this.actual = actual;
            this.inner = inner;
            buffer = new LinkedList<>(); // 由于逻辑特性, 此处采取LinkedList获取更好的性能
        }

        @SuppressWarnings("unstable")
        @Override
        public void onNext(T t) {
            // 每次接收发射后
            if (buffer != null && buffer.isEmpty()) {
                resetPeriodically();
            }

            boolean accept = false;
            try {
                accept = predicate.call(t);
            } catch (Throwable error) {
                Exceptions.throwOrReport(error, this);
            }

            synchronized (this) {
                if (accept) {
                    if (buffer != null) {
                        buffer.add(t); // 不添加最后一个对象
                    }
                } else if (buffer != null && !buffer.isEmpty()) {
                    emit();
                }
            }
            checkAndActivate();

            lastTimestamp = now(); // 刷新每次接收的时间戳
        }

        private long now() {
            return System.nanoTime();
        }

        private void emit() {
            List<T> toEmit = buffer;
            buffer = new LinkedList<>(); // 重置
            try {
                actual.onNext(toEmit);
                emitCount++;
            } catch (Throwable error) {
                Exceptions.throwOrReport(error, this);
            }
        }

        private void resetPeriodically() {
            initialTimestamp = now();
        }

        @Override
        public void onError(Throwable e) {
            buffer = null;
            actual.onError(e);
            this.unsubscribe();
        }

        @Override
        public void onCompleted() {
            this.inner.unsubscribe();
            if (buffer != null && !buffer.isEmpty()) {
                emit();
            }
            actual.onCompleted();
            this.unsubscribe();
            lastTimestamp = 0L;
            initialTimestamp = 0L;
            isLoopActivated = false;
            emitCount = 0L;
        }

        private void checkAndActivate() {
            if (isLoopActivated) return;
            long now = now();
            boolean shouldActivate = emitCount == 0 // 发射次数为0
                    && buffer != null && !buffer.isEmpty() // 缓存数据不为空
                    && initialTimestamp != 0L && duration <= (now - initialTimestamp); // 距离初次接受时间大于指定间隔
            if (shouldActivate && !isLoopActivated) {
                scheduleExact();
                isLoopActivated = true;
            }
        }

        void scheduleExact() {
            this.inner.schedulePeriodically(new Action0() {
                public void call() {
                    if (test()) {
                        emit();
                    }
                }
            }, timeout, timeout, timeUnit);
        }

        private boolean test() {
            synchronized (this) {
                long now = now();
                return buffer != null && !buffer.isEmpty() // 缓存数据不为空
                        && lastTimestamp != 0L && duration < (now - lastTimestamp); //距离上一个接受到信号超过指定间隔
            }
        }

    }
}

思考题: 如何区分扫码枪输入和键盘手动输入?

文章作者: qbeenslee