Skip to content

Commit

Permalink
Remove usages of rxscala and replace with Java based code.
Browse files Browse the repository at this point in the history
  • Loading branch information
markusthoemmes committed Nov 21, 2019
1 parent 94043db commit ed34677
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
6 changes: 3 additions & 3 deletions common/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ dependencies {
compile "io.zipkin.reporter2:zipkin-sender-okhttp3:2.6.1"
compile "io.zipkin.reporter2:zipkin-reporter:2.6.1"

compile "io.reactivex:rxscala_${gradle.scala.depVersion}:0.26.5"
compile "io.reactivex:rxjava-reactive-streams:1.2.1"
compile "com.microsoft.azure:azure-cosmosdb:2.6.2"
compile "io.reactivex:rxjava:1.3.8"
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
compile ('com.microsoft.azure:azure-cosmosdb:2.6.2')

compile ("com.lightbend.akka:akka-stream-alpakka-s3_${gradle.scala.depVersion}:1.0.1") {
exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.openwhisk.core.database.cosmosdb

import com.microsoft.azure.cosmosdb.{FeedResponse, Resource, ResourceResponse}
import rx.lang.scala.JavaConverters._
import rx.Observable
import rx.functions.Action1

import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
Expand All @@ -34,8 +34,12 @@ private[cosmosdb] trait RxObservableImplicits {
* @return the head result of the [[Observable]].
*/
def head(): Future[T] = {
def toHandler[T](f: (T) => Unit): Action1[T] = new Action1[T] {
def call(t: T) = f(t)
}

val promise = Promise[T]()
observable.asScala.single.subscribe(x => promise.success(x), e => promise.failure(e))
observable.single.subscribe(toHandler(promise.success), toHandler(promise.failure))
promise.future
}
}
Expand All @@ -46,8 +50,8 @@ private[cosmosdb] trait RxObservableImplicits {

implicit class RxScalaFeedObservable[T <: Resource](observable: Observable[FeedResponse[T]]) {
def blockingOnlyResult(): Option[T] = {
val value = observable.asScala.toList.toBlocking.single
val results = value.head.getResults.asScala
val value = observable.toBlocking.single
val results = value.getResults.asScala
require(results.isEmpty || results.size == 1, s"More than one result found $results")
results.headOption
}
Expand Down

0 comments on commit ed34677

Please sign in to comment.