diff --git a/Practice5.java b/Practice5.java new file mode 100644 index 0000000..539bb04 --- /dev/null +++ b/Practice5.java @@ -0,0 +1,114 @@ +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.Single; + +/** + * @author Baoyi Chen + */ +public class Practice5 { + + /* + * example: + * param: Observable["a","b","c"] + * return: Single[3] + */ + public Single count(Observable source) { + return source.reduce(0L, (r, s) -> r + 1); + } + + /* + * example: + * param: Observable[["a", "b", "c"], ["b", "c", "d"]] + * return: Observable["a", "b", "c","b", "c", "d"] + */ + public Observable convert(Observable> source) { + return source.flatMap(Observable::fromIterable); + } + + /* + * example: + * param: Observable["a", "a", "b", "b", "c"] + * return: Observable["a", "b", "c"] + */ + public Observable distinct(Observable source) { + return source.groupBy(g->g).map(x->x.getKey()); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 + * return: Observable[3, 4] + */ + public Observable filter(Observable source, Predicate conditon) { + return source.concatMap(x -> { + if(conditon.test(x)) return Observable.just(x); else return Observable.empty(); + }); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , index = 2 + * return: Maybe[3] + */ + public Maybe elementAt(Observable source, int index) { + return source.skip(index).take(1).firstElement(); + } + + /* + * example: + * param: Observable["a", "b"] , count = 2 + * return: Observable["a", "b", "a", "b"] + */ + public Observable repeat(Observable source, int count) { + return Observable.range(0, count).concatMap(x -> source); + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable concat(List> source) { + return Observable.fromIterable(source).concatMap(x -> x); + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable merge(List> source) { + return Observable.fromIterable(source).flatMap(x -> x); + } + + /* + * example: + * param: Observable["a", "b", "c"], 1, SECONDS + * return: Observable["a", "b", "c"], 每个元素都延迟1秒 + */ + public Observable delayAll(Observable source, long delay, TimeUnit unit) { + return source.concatMap(x ->Observable.just(x).delay(delay, unit)); + } +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..3c2a7be 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -30,6 +30,8 @@ public class Practice1 { * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.map(x -> new Tuple2<>(1, x)).scan((a, b) -> { + return new Tuple2<>(a.getV1() + 1, b.getV2()); + }); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..1045764 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -17,12 +17,12 @@ package cn.nextop.rxjava.share.practices; +import java.util.Map; + import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; import io.reactivex.Single; -import java.util.Map; - /** * @author Baoyi Chen */ @@ -34,7 +34,10 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.groupBy(x -> x) + .flatMap(g -> { + return g.count().map(count -> new Tuple2<>(g.getKey(), count.intValue())).toObservable(); + }); } /* @@ -43,7 +46,6 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); + return wordCount1(words).toMap(v1 -> v1.getV1(), v2 -> v2.getV2()); } - } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..6b21a76 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -28,7 +28,7 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return iterate(observable).scan((a, b) -> a + b).lastElement(); } /* @@ -42,7 +42,11 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.flatMap(x -> { + Observable o1 = (x.left == null) ? Observable.empty() : iterate(Observable.just(x.left)); + Observable o2 = (x.right == null) ? Observable.empty() : iterate(Observable.just(x.right)); + return Observable.merge(o1, o2, Observable.just(x.value)); + }); } public static class Node { @@ -56,5 +60,4 @@ public Node(Node left, Node right, int value) { this.value = value; } } - } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..0653b1c 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -18,6 +18,7 @@ import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; /** @@ -44,7 +45,6 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.flatMap(x -> Observable.just(x).subscribeOn(Schedulers.newThread())); } - } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..0bd9a43 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -16,14 +16,14 @@ package cn.nextop.rxjava.share.practices; -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; - import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.Single; + /** * @author Baoyi Chen */ @@ -35,7 +35,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.reduce(0L, (r, s) -> r + 1); } /* @@ -44,16 +44,16 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); + return source.flatMap(Observable::fromIterable); } - + /* * example: * param: Observable["a", "a", "b", "b", "c"] * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.groupBy(g -> g).map(x -> x.getKey()); } /* @@ -62,7 +62,9 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); + return source.concatMap(x -> { + if(conditon.test(x)) return Observable.just(x); else return Observable.empty(); + }); } /* @@ -71,7 +73,7 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); + return source.skip(index).firstElement(); } /* @@ -80,7 +82,7 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); + return Observable.range(0, count).concatMap(x -> source); } /* @@ -89,7 +91,7 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).concatMap(x -> x); } /* @@ -98,7 +100,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).flatMap(x -> x); } /* @@ -107,7 +109,6 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); + return source.concatMap(x ->Observable.just(x).delay(delay, unit)); } - }