乐虎游戏|乐虎国际登录|欢迎你

Rxjava2~zip~学渣带你扣rxjava2~ map操作符到底干了什么

日期:2020-03-01编辑作者:计算机资讯

<pre>static Observable<String> sampleObservable() {return Observable.defer(new Callable<ObservableSource<? extends String>>() {@Overridepublic ObservableSource<? extends String> call() throws Exception {// Do some long running operationSystemClock.sleep;return Observable.just("one", "two", "three", "four", "five");}});}</pre>一上来我们就举个例子可以看出来两个操作符。 有了前面的经验我们来简单的分析一下。这几个创建操作符的不同

这不是源码分析篇只是想聊一聊这些哪些地方可以用到

take()

图片 1

<pre>
Observable.just(1, 2, 3, 4, 5)
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.take(3)
.subscribe(getObserver())

</pre>

输出没错是123

我们面来看看源码
直接来看ObservableTake的subscribeActual,[不懂的同学请看我前面的学渣系列]
<pre>
protected void subscribeActual(Observer<? super T> observer) {
source.subscribe(new TakeObserver<T>(observer, limit));
}
</pre>
这个source是ObservableSource的对象。 那么我们去找实现他的Observable
好吧 又回到了。
<pre>
public final void subscribe(Observer<? super T> observer)
subscribeActual(observer);
其他的省略了

</pre>
关键点一步,这回调用了谁的方法呢? 下面来揭晓
是ObservableObserveOn的subscribeActual
<pre>
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

</pre>
看到了吗 又会调用
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
然后 又要调用的是ObservableSubscribeOn的subscribeActual
<pre>
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }));
}

</pre>

大家会好奇这两个地方为什么会被调用呢?
下面我给大家看一个地方
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
O(∩_∩)O
你没有看错
<pre>
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
</pre>
大家可以看到不。 这两个方法返回的也是Observable对象。 所以 他们会分别调用这两个对象subscribeActual方法。好吧,让我们来像下进行。
【下面是一个小扩展 给大家一个小小的感觉】
<pre>

Observable.just(1, 2, 3, 4, 5)

            .observeOn(AndroidSchedulers.mainThread())
            .take(3)
            .subscribe(getObserver())

看到有什么不同了吗? 我注释掉了一个方法。我为什么要这么干?我注视掉了那么
source.subscribe 会调用谁呢? 我直接给出来答案。大家可以思考一个 当我直接注释之后会调用just的subscribeActual

public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

    s.onSubscribe(d);

    if (d.fusionMode) {
        return;
    }

    d.run();
}

相信大家看过我之前的应该可以看懂。
</pre>

让我们回归正题当执行到ObservableSubscribeOn的subscribeActual的方法的时候

  public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }));

source.subscribe(parent); 看到这个方法了吗、首先它是异步的。另外执行
.source.subscribe(parent);的时候 ,实际上就执行了ObservableFromArray的subscribeActual
<pre>
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();

</pre>
剩下的就好理解了,都是分别执行onnext等方法。

到这里task的大体思路介绍完毕

<pre>public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier) {ObjectHelper.requireNonNull(supplier, "supplier is null");return RxJavaPlugins.onAssembly(new ObservableDefer<T>);}</pre>

1just
<pre>
Observable.just("Cricket", "Football")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver())
private Observer<String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
</pre>
大家想想这个能用到哪种情况? just(参数最多10个)
我说一种情况(控件中获取的值,然后我们会对这个值去判定,比如请假两个时间是比对,登录判断是不是为null)

2下面开始timer 定时器

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    w.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                decoratedRun.run();
            } finally {
                w.dispose();
            }
        }
    }, delay, unit);

    return w;
}

重复的就不贴了。 都是差不多重复的。 只是给大家贴上关键代码
看到这里面了吗。delay 就是大家贴上的时间。 详细这个大家都是可以看明白的。,

和以前一样Observable的静态方法。<pre>public final class ObservableDefer<T> extends Observable<T> {final Callable<? extends ObservableSource<? extends T>> supplier;public ObservableDefer(Callable<? extends ObservableSource<? extends T>> supplier) {this.supplier = supplier;}@Overridepublic void subscribeActual(Observer<? super T> s) {ObservableSource<? extends T> pub;try {pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied");} catch (Throwable t) {Exceptions.throwIfFatal;EmptyDisposable.error;return;}

2map

3interval

做周期性操作,从翻译上大家就应该可以看明白
ComputationScheduler的schedulePeriodicallyDirect的方法
<pre>
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
PoolWorker w = pool.get().getEventLoop();
return w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
}
</pre>

<pre>
NewThreadWorker的schedulePeriodicallyDirect的方法
public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
Future<?> f = executor.scheduleAtFixedRate(decoratedRun, initialDelay, period, unit);
return Disposables.fromFuture(f);
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}

</pre>

分别设置了 什么时候开始。多长时间执行一次

 pub.subscribe;}

<pre>
.map(new Function<List<你有的数据类型r>, List<你希望的数据类型>>() {

4buffer

Observable<List<String>> buffered = getObservable().buffer(3, 2);

    buffered.subscribe(getObserver());

ObservableBuffer的subscribeActual的方法
<pre>

protected void subscribeActual(Observer<? super U> t) {
if (skip == count) {
BufferExactObserver<T, U> bes = new BufferExactObserver<T, U>(t, count, bufferSupplier);
if (bes.createBuffer()) {
source.subscribe(bes);
}
} else {
source.subscribe(new BufferSkipObserver<T, U>(t, count, skip, bufferSupplier));
}
}
</pre>
好吧到了关键的地方 source.subscribe是调用谁的地方
Observable.just("one", "two", "three", "four", "five");
所以是ObservableFromArray的subscribeActual方法

<pre>
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

    s.onSubscribe(d);

    if (d.fusionMode) {
        return;
    }

    d.run();
}

void run() {
T[] a = array;
int n = a.length;

        for (int i = 0; i < n && !isDisposed(); i++) {
            T value = a[i];
            if (value == null) {
                actual.onError(new NullPointerException("The " + i + "th element is null"));
                return;
            }
            actual.onNext(value);
        }
        if (!isDisposed()) {
            actual.onComplete();
        }
    }

</pre>

看到这个for方法了吗 这个就是决定你跳过的数量的。

}

@Override
public List<你希望的数据类型> apply(List<你有的数据类型> apiUsers) throws Exception {
    return (转化的类型);
}

5filter

图片 2

Paste_Image.png

这个相信大家很熟悉,对就是过滤
<pre>
fromArray(1, 0, 6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer.intValue() > 5;
}
})
</pre>

这里只是放出来关键代码
ObservableFilter的onNext
<pre>
public void onNext(T t) {
if (sourceMode == NONE) {
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
if (b) {
actual.onNext(t);
}
} else {
actual.onNext(null);
}
}

</pre>
这个b就是你的过滤条件。 下面的就是判断。 不符合的就不执行 actual.onNext(t);其实很简单的方式

</pre>

})
</pre>
想一想什么情况下会用? 当你想转的时候(举个例子bitmap换流)

6skip

和上面同理关键部分ObservableSkip的onNext方法
<pre>

public void onNext(T t) {
if (remaining != 0L) {
remaining--;
} else {
actual.onNext(t);
}
}
</pre>

这是完整ObservableDefer代码。可以看到subscribeActual这个方法不?看到这个我想大家一定马上就想到另一个方法Observable的静态方法subscribe,中最重要的部分 subscribeActual;。吼吼 看到了吗。 只有当你调用的时候才会整理数据。<pre>a = 12;Observable<String> o2 =Observable.defer(new Func0<Observable<String>>() {

好吧 我放弃 这篇连源码一起分析了 要不不知道怎么去开始

7 scan

图片 3

Paste_Image.png

RxJava的scan()函数可以看做是一个累加器函数。scan()函数对原始Observable发射的每一项数据都应用一个函数,它将函数的结果填充回可观测序列,等待和下一次发射的数据一起使用。

关键代码
<pre>
@Override
public void onNext(T t) {
if (done) {
return;
}
final Observer<? super T> a = actual;
T v = value;
if (v == null) {
value = t;
a.onNext(t);
} else {
T u;

            try {
                u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The value returned by the accumulator is null");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                s.dispose();
                onError(e);
                return;
            }

            value = u;
            a.onNext(u);
        }
    }

</pre>
执行的时候value 会累加。 a.onNext(u);在发射出去

@Overridepublic Observable<String> call() { return Observable.just("defer result: " + a);}

第一篇我会尽量详细点
<pre>
public static <T1, T2, R> Observable<R> zip(
ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> zipper) {
return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2);
}
</pre>

8 replay

<pre>
PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(2); // bufferSize = 3 to retain 3 values to replay
connectableObservable.connect(); // connecting the connectableObservable
connectableObservable.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();

   /*
     * it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
     */
    connectableObservable.subscribe(getSecondObserver());

</pre>
replay 这个是缓存操作。
第二次订阅之后,就是缓存后面两个数据

});a = 20;

这是Observable的静态方法,
ObservableSource和Observable的关系:Observable实现了ObservableSource接口的抽象类
<pre>
public static <T1, T2, R> Function<Object[], R> toFunction(final BiFunction<? super T1, ? super T2, ? extends R> f) {
ObjectHelper.requireNonNull(f, "f is null");
return new Function<Object[], R>() {
@Override
public R apply(Object[] a) throws Exception {
if (a.length != 2) {
throw new IllegalArgumentException("Array of size 2 expected but got " + a.length);
}
return ((BiFunction<Object, Object, R>)f).apply(a[0], a[1]);
}
};
}
</pre>
发现了吗
public interface BiFunction<T1, T2, R> {
R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
}
返回值都是R

9concat

图片 4

Paste_Image.png

<pre>
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};

    final Observable<String> aObservable = Observable.fromArray(aStrings);
    final Observable<String> bObservable = Observable.fromArray(bStrings);

    Observable.concat(aObservable, bObservable)
            .subscribe(getObserver());

</pre>

他的过程是
<pre>
return RxJavaPlugins.onAssembly(new ObservableConcatMap(fromArray(sources), Functions.identity(), bufferSize(), ErrorMode.BOUNDARY));
</pre>

concat操作符肯定也是有序的,实际上fromArray(sources)这么一个过程。

o2.subscribe(new Action1<String>() {

这样BiFunction就和Function联系起来了

10merge

图片 5

Paste_Image.png

<pre>
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};

    final Observable<String> aObservable = Observable.fromArray(aStrings);
    final Observable<String> bObservable = Observable.fromArray(bStrings);

    Observable.merge(aObservable, bObservable)
            .subscribe(getObserver());

</pre>

无序的合并

本文由乐虎游戏发布于计算机资讯,转载请注明出处:Rxjava2~zip~学渣带你扣rxjava2~ map操作符到底干了什么

关键词:

RXJava的使用<二>

vankeservice 前言 RxJava和Retrofit也火了一段时间了,不过最近一直在学习ReactNative和Node相关的姿势,一直没有时间研究这...

详细>>

OkHttp源码深入解读

简介 目前在HTTP协议请求库中,OKHttp应当是非常火的,使用也非常的简单。网上有很多文章写了关于OkHttp的特点以及使...

详细>>

Retrofit--使用Retrofit时怎样去设置OKHttp

builder.cookieJar(new CookieJar() { private final HashMapHttpUrl, ListCookie cookieStore = new HashMap(); @Override public void saveFromResponse(HttpUr...

详细>>

先练哪个字帖才能快速入门?

文/晨风暮溪 不晓得我们在练字的经过中有未有蒙受那么些主题材料 问:先练哪个字帖手艺赶快入门? 图表源于互联...

详细>>