From 674a647bd1951f122c335ea57a5c865b9aee9637 Mon Sep 17 00:00:00 2001 From: xwei Date: Mon, 26 Mar 2018 17:12:21 +0800 Subject: [PATCH 1/2] u --- .../rxjava/share/practices/Practice1.java | 17 +- .../rxjava/share/practices/Practice2.java | 45 +++--- .../rxjava/share/practices/Practice3.java | 8 +- .../rxjava/share/practices/Practice4.java | 29 ++-- .../rxjava/share/practices/Practice5.java | 152 +++++++++--------- 5 files changed, 130 insertions(+), 121 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..e314597 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -24,12 +24,13 @@ */ public class Practice1 { - /* - * 举例如下: - * 参数 Observable["a","b","c"] - * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 - */ - public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } + /* + * 举例如下: 参数 Observable["a","b","c"] 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] + * 注意index从1开始 + */ + public Observable> indexable(Observable observable) { + return observable + .map(p -> new Tuple2(1, p)) + .scan((p1, p2) -> new Tuple2(1 + p1.getV1(), p2.getV2())); + } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..591a44b 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -16,34 +16,39 @@ package cn.nextop.rxjava.share.practices; +import java.util.HashMap; +import java.util.Map; +import cn.nextop.rxjava.share.util.Tuples; import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; import io.reactivex.Single; -import java.util.Map; - /** * @author Baoyi Chen */ public class Practice2 { - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] - */ - public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Single[Map{a=2, b=1, c=2}] - */ - public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); - } - + /* + * 举例: words = Observable["a", "a", "b", "c", "c"] 返回: Observable[("a", 2), + * ("b", 1), ("c", 2)] + */ + public Observable> wordCount1(Observable words) { + return words.groupBy(e -> e).flatMap(e -> e.count().toObservable().map(x -> Tuples.of(e.getKey(), x.intValue()))); + } + + /* + * 举例: words = Observable["a", "a", "b", "c", "c"] 返回: Single[Map{a=2, b=1, + * c=2}] + */ + public Single> wordCount2(Observable words) { + return words.reduce(new HashMap(), this::reducerFun); + } + + private Map reducerFun(Map map, String s) { + Integer integer = map.get(s); + if (integer == null) integer = 0; + map.put(s, ++integer); + return map; + } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..7b70d06 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -28,7 +28,7 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return iterate(observable).reduce((x, y) -> x + y); } /* @@ -42,7 +42,11 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.flatMap(n -> { + Observable left = n.left == null ? Observable.empty() : iterate(Observable.just(n.left)); + Observable right = n.right == null ? Observable.empty() : iterate(Observable.just(n.right)); + return Observable.merge(left, right, Observable.just(n.value)); + }); } public static class Node { diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..1d2c296 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -18,8 +18,7 @@ import io.reactivex.Observable; - - +import io.reactivex.schedulers.Schedulers; /** * @author Baoyi Chen */ @@ -30,21 +29,21 @@ public class Practice4 { * 参数observable = Observable["a", "b", "c"] * 参数observer在消费observable时,每个元素都在独立的线程 * - * thread 1 --------------- - * |-----------| ["a"] | - * | --------------- - * | - * ------------------------- ---------- |thread 2 --------------- - * |Observable["a","b","c"]|----|Observer|----|-----------| ["b"] | - * ------------------------- ---------- | --------------- - * | - * |thread 3 --------------- - * |-----------| ["c"] | - * --------------- + * thread 1 --------------- + * |-----------|Observer["a"]| + * | --------------- + * | + * ------------------------- |thread 2 --------------- + * |Observable["a","b","c"]|--------|-----------|Observer["b"]| + * ------------------------- | --------------- + * | + * |thread 3 --------------- + * |-----------|Observer["c"]| + * --------------- * */ - public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + public Observable runInMultiThread(Observable observable) { + return observable.flatMap(e -> Observable.just(e).subscribeOn(Schedulers.io())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..84fa615 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -16,98 +16,98 @@ package cn.nextop.rxjava.share.practices; -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; - import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import cn.nextop.rxjava.share.util.Tuples; +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; +import io.reactivex.Single; + /** * @author Baoyi Chen */ public class Practice5 { - /* - * example: - * param: Observable["a","b","c"] - * return: Single[3] - */ - public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: param: Observable["a","b","c"] return: Single[3] + */ + public Single count(Observable source) { + return source.reduce(0L, (a, b) -> a + 1); + } - /* - * example: - * param: Observable[["a", "b", "c"], ["b", "c", "d"]] - * return: Observable["a", "b", "c","b", "c", "d"] - */ - public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: param: Observable[["a", "b", "c"], ["b", "c", "d"]] return: + * Observable["a", "b", "c","b", "c", "d"] + */ + public Observable convert(Observable> source) { + return source.concatMap(e -> Observable.fromIterable(e)); + } - /* - * example: - * param: Observable["a", "a", "b", "b", "c"] - * return: Observable["a", "b", "c"] - */ - public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: param: Observable["a", "a", "b", "b", "c"] return: Observable["a", + * "b", "c"] + */ + public Observable distinct(Observable source) { + return source.groupBy(e -> e).map(e -> e.getKey()); + } - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 - * return: Observable[3, 4] - */ - public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 + * return: Observable[3, 4] + */ + public Observable filter(Observable source, Predicate conditon) { + return source.concatMap(e -> { + if (conditon.test(e)) return Observable.just(e); + else return Observable.empty(); + }); + } - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , index = 2 - * return: Maybe[3] - */ - public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: param: Observable[1, 2, 3, 4, 5] , index = 2 return: Maybe[3] + */ + public Maybe elementAt(Observable source, int index) { + return source.zipWith(Observable.range(0, Integer.MAX_VALUE), (a, b) -> Tuples.of(a, b)).filter(x -> x.getV2() == index).map(e -> e.getV1()).firstElement(); + } - /* - * example: - * param: Observable["a", "b"] , count = 2 - * return: Observable["a", "b", "a", "b"] - */ - public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: param: Observable["a", "b"] , count = 2 return: Observable["a", "b", + * "a", "b"] + */ + public Observable repeat(Observable source, int count) { + return Observable.range(0, count).concatMap(e -> source); + } - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: param: Observable["a"], Observable["b"] return: Observable["a", "b"] + */ + private void concat(List> source, ObservableEmitter emitter) { + if (source.isEmpty()) { + emitter.onComplete(); + } else { + source.get(0).subscribe(e -> { + emitter.onNext(e); + }, e -> emitter.onError(e), () -> { + concat(source.subList(1, source.size()), emitter); + }); + } + } - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: param: Observable["a"], Observable["b"] return: Observable["a", "b"] + */ + public Observable merge(List> source) { + return Observable.fromIterable(source).concatMap(s -> s); + } - /* - * example: - * param: Observable["a", "b", "c"], 1, SECONDS - * return: Observable["a", "b", "c"], 每个元素都延迟1秒 - */ - public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: param: Observable["a", "b", "c"], 1, SECONDS return: Observable["a", + * "b", "c"], 每个元素都延迟1秒 + */ + public Observable delayAll(Observable source, long delay, TimeUnit unit) { + return source.flatMap(e -> Observable.just(e).delay(delay, unit)); + } } From c3447da8112eb876b1fb3fe75408e8d4e3357dcc Mon Sep 17 00:00:00 2001 From: xwei Date: Mon, 26 Mar 2018 17:20:06 +0800 Subject: [PATCH 2/2] u --- .../rxjava/share/practices/Practice5.java | 162 ++++++++++-------- 1 file changed, 91 insertions(+), 71 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 84fa615..c22df6f 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -31,83 +31,103 @@ */ public class Practice5 { - /* - * example: param: Observable["a","b","c"] return: Single[3] - */ - public Single count(Observable source) { - return source.reduce(0L, (a, b) -> a + 1); - } + /* + * example: + * param: Observable["a","b","c"] + * return: Single[3] + */ + public Single count(Observable source) { +// return source.count(); + return source.reduce(0L, (a, b) -> a + 1); + } - /* - * example: param: Observable[["a", "b", "c"], ["b", "c", "d"]] return: - * Observable["a", "b", "c","b", "c", "d"] - */ - public Observable convert(Observable> source) { - return source.concatMap(e -> Observable.fromIterable(e)); - } + /* + * example: + * param: Observable[["a", "b", "c"], ["b", "c", "d"]] + * return: Observable["a", "b", "c","b", "c", "d"] + */ + public Observable convert(Observable> source) { + return source.flatMap(e -> Observable.fromIterable(e)); + } - /* - * example: param: Observable["a", "a", "b", "b", "c"] return: Observable["a", - * "b", "c"] - */ - public Observable distinct(Observable source) { - return source.groupBy(e -> e).map(e -> e.getKey()); - } + /* + * example: + * param: Observable["a", "a", "b", "b", "c"] + * return: Observable["a", "b", "c"] + */ + public Observable distinct(Observable source) { +// return source.distinct(); + return source.groupBy(e -> e).map(e -> e.getKey()); + } - /* - * example: param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 - * return: Observable[3, 4] - */ - public Observable filter(Observable source, Predicate conditon) { - return source.concatMap(e -> { - if (conditon.test(e)) return Observable.just(e); - else return Observable.empty(); - }); - } + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 + * return: Observable[3, 4] + */ + public Observable filter(Observable source, Predicate conditon) { + return source.concatMap(e -> { + if (conditon.test(e)) return Observable.just(e); else return Observable.empty(); + }); + } - /* - * example: param: Observable[1, 2, 3, 4, 5] , index = 2 return: Maybe[3] - */ - public Maybe elementAt(Observable source, int index) { - return source.zipWith(Observable.range(0, Integer.MAX_VALUE), (a, b) -> Tuples.of(a, b)).filter(x -> x.getV2() == index).map(e -> e.getV1()).firstElement(); - } + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , index = 2 + * return: Maybe[3] + */ + public Maybe elementAt(Observable source, int index) { +// return source.elementAt(index); + return source.zipWith(Observable.range(0, Integer.MAX_VALUE), (a, b) -> Tuples.of(a, b)).filter(x -> x.getV2() == index).map(e -> e.getV1()).firstElement(); + } - /* - * example: param: Observable["a", "b"] , count = 2 return: Observable["a", "b", - * "a", "b"] - */ - public Observable repeat(Observable source, int count) { - return Observable.range(0, count).concatMap(e -> source); - } + /* + * example: + * param: Observable["a", "b"] , count = 2 + * return: Observable["a", "b", "a", "b"] + */ + public Observable repeat(Observable source, int count) { +// return source.repeat(count); + return Observable.range(0, count).concatMap(e -> source); + } - /* - * example: param: Observable["a"], Observable["b"] return: Observable["a", "b"] - */ - private void concat(List> source, ObservableEmitter emitter) { - if (source.isEmpty()) { - emitter.onComplete(); - } else { - source.get(0).subscribe(e -> { - emitter.onNext(e); - }, e -> emitter.onError(e), () -> { - concat(source.subList(1, source.size()), emitter); - }); - } - } + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable concat(List> source) { +// return Observable.concat(source); + return Observable.create(emitter -> { concat(source, emitter); }); + } + private void concat(List> source, ObservableEmitter emitter) { + if (source.isEmpty()) { + emitter.onComplete(); + } else { + source.get(0).subscribe(e -> { + emitter.onNext(e); + }, e -> emitter.onError(e), () -> { + concat(source.subList(1, source.size()), emitter); + }); + } + } + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable merge(List> source) { + return Observable.fromIterable(source).flatMap(e -> e); + } - /* - * example: param: Observable["a"], Observable["b"] return: Observable["a", "b"] - */ - public Observable merge(List> source) { - return Observable.fromIterable(source).concatMap(s -> s); - } - - /* - * example: param: Observable["a", "b", "c"], 1, SECONDS return: Observable["a", - * "b", "c"], 每个元素都延迟1秒 - */ - public Observable delayAll(Observable source, long delay, TimeUnit unit) { - return source.flatMap(e -> Observable.just(e).delay(delay, unit)); - } + /* + * example: + * param: Observable["a", "b", "c"], 1, SECONDS + * return: Observable["a", "b", "c"], 每个元素都延迟1秒 + */ + public Observable delayAll(Observable source, long delay, TimeUnit unit) { +// return source.delay(delay, unit); + return source.concatMap(e -> Observable.just(e).delay(delay, unit)); + } }