Skip to content

Commit

Permalink
DispatchObservable improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyofrancis committed Mar 20, 2019
1 parent 2a45eab commit 457740a
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 51 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
Version 1.0.9
- Improvements to using DispatchObservable

Version 1.0.8
- Added ErrorHandler for DispatchCallAdapterFactory

Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[ ![Download](https://api.bintray.com/packages/tonyofrancis/maven/dispatch/images/download.svg?version=1.0.8) ](https://bintray.com/tonyofrancis/maven/dispatch/1.0.8/link)
[ ![Download](https://api.bintray.com/packages/tonyofrancis/maven/dispatch/images/download.svg?version=1.0.9) ](https://bintray.com/tonyofrancis/maven/dispatch/1.0.9/link)

Overview
--------
Expand All @@ -7,12 +7,12 @@ Dispatch is a simple and flexible work scheduler that schedulers work on a backg

To use dispatch add the following to your app's build.gradle file
```java
implementation "com.tonyodev.dispatch:dispatch:1.0.8"
implementation "com.tonyodev.dispatch:dispatch:1.0.9"
```

To use with Retrofit add
```java
implementation "com.tonyodev.dispatch:dispatch-retrofit2-adapter:1.0.8"
implementation "com.tonyodev.dispatch:dispatch-retrofit2-adapter:1.0.9"
```

Example:
Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

buildscript {
ext.kotlin_version = '1.3.21'
ext.library_version = '1.0.8'
ext.library_version_code = 9
ext.library_version = '1.0.9'
ext.library_version_code = 10
ext.retrofit_version = '2.5.0'
ext.gson_version = '2.8.5'
ext.novoda_bintray_version = '0.9'
Expand Down
42 changes: 35 additions & 7 deletions dispatch/src/main/java/com/tonyodev/dispatch/Dispatch.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import java.lang.IllegalArgumentException
* queueId 88 -> dispatch(post) -> dispatch(async)
* queueId 78595 -> dispatch(post)
* */
interface Dispatch<R>: DispatchObservable<R, Dispatch<R>> {
interface Dispatch<R> {

/**
* The dispatch queue id.
Expand Down Expand Up @@ -224,12 +224,6 @@ interface Dispatch<R>: DispatchObservable<R, Dispatch<R>> {
* */
fun cancelOnComplete(cancel: Boolean): Dispatch<R>

/**
* Gets the backing dispatch Observable.
* @return DispatchObservable
* */
fun getDispatchObservable(): DispatchObservable<R, Dispatch<R>>

/**
* Sets the dispatch object id. Use this id to identify where errors occur in the dispatch queue.
* @param id the dispatch object id.
Expand All @@ -244,4 +238,38 @@ interface Dispatch<R>: DispatchObservable<R, Dispatch<R>> {
* */
fun <T> map(func: (R) -> T): Dispatch<T>

/**
* Gets the backing dispatch Observable.
* @return DispatchObservable
* */
fun getDispatchObservable(): DispatchObservable<R>

/**
* Adds a dispatch observer.
* @param dispatchObserver the observer.
* @return the dispatch.
* */
fun addObserver(dispatchObserver: DispatchObserver<R>): Dispatch<R>

/**
* Adds a list of dispatch observers.
* @param dispatchObservers the list of observers.
* @return the dispatch.
* */
fun addObservers(dispatchObservers: List<DispatchObserver<R>>): Dispatch<R>

/**
* Removes a dispatch observer.
* @param dispatchObserver the observer to be removed.
* @return the dispatch.
* */
fun removeObserver(dispatchObserver: DispatchObserver<R>): Dispatch<R>

/**
* Remove a list of dispatch observers.
* @param dispatchObservers the list of observers to be removed.
* @return the dispatch.
* */
fun removeObservers(dispatchObservers: List<DispatchObserver<R>>): Dispatch<R>

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,34 @@ package com.tonyodev.dispatch
/**
* Interface that allows for the attaching of dispatch observers.
* */
interface DispatchObservable<R, T> {
interface DispatchObservable<R> {

/**
* Adds a dispatch observer.
* @param dispatchObserver the observer.
* @return the dispatch.
* @return the dispatchObservable.
* */
fun addObserver(dispatchObserver: DispatchObserver<R>): T
fun addObserver(dispatchObserver: DispatchObserver<R>): DispatchObservable<R>

/**
* Adds a list of dispatch observers.
* @param dispatchObservers the list of observers.
* @return the dispatch.
* @return the dispatchObservable.
* */
fun addObservers(dispatchObservers: List<DispatchObserver<R>>): T
fun addObservers(dispatchObservers: List<DispatchObserver<R>>): DispatchObservable<R>

/**
* Removes a dispatch observer.
* @param dispatchObserver the observer to be removed.
* @return the dispatch.
* @return the dispatchObservable.
* */
fun removeObserver(dispatchObserver: DispatchObserver<R>): T
fun removeObserver(dispatchObserver: DispatchObserver<R>): DispatchObservable<R>

/**
* Remove a list of dispatch observers.
* @param dispatchObservers the list of observers to be removed.
* @return the dispatch.
* @return the dispatchObservable.
* */
fun removeObservers(dispatchObservers: List<DispatchObserver<R>>): T
fun removeObservers(dispatchObservers: List<DispatchObserver<R>>): DispatchObservable<R>

}
114 changes: 84 additions & 30 deletions dispatch/src/main/java/com/tonyodev/dispatch/Dispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,7 @@ object Dispatcher {
private val dispatchType: Int): Dispatch<R> {

private val dispatchSources = ArrayList<Dispatch<*>?>(3)
private val dispatchObserversSet = mutableSetOf<DispatchObserver<R>>()

private val dispatchObservable = DispatchObservableInfo<R>(handler)
private var doOnErrorWorker: ((throwable: Throwable) -> R)? = null

override val queueId: Int
Expand Down Expand Up @@ -484,11 +483,7 @@ object Dispatcher {
@Suppress("UNCHECKED_CAST")
private fun notifyDispatchObservers() {
if (!isCancelled) {
val iterator = dispatchObserversSet.iterator()
val result = results as R
while (iterator.hasNext()) {
iterator.next().onChanged(result)
}
dispatchObservable.notifyObservers(results as R)
}
}

Expand Down Expand Up @@ -560,7 +555,7 @@ object Dispatcher {
if (dispatchQueue.dispatchQueueController == null && enableWarnings
&& this == dispatchQueue.rootDispatch) {
Log.w(TAG, "No DispatchQueueController set for dispatch queue with id: $queueId. " +
"Not setting a DispatchQueueController can cause memory leaks for long running tasks.")
"Not setting a DispatchQueueController can cause memory leaks for long running tasks.")
}
}
}
Expand All @@ -575,9 +570,9 @@ object Dispatcher {
dispatchQueue.completedDispatchQueue = false
dispatchQueue.rootDispatch.runDispatcher()
} else {
if (enableWarnings) {
Log.d(TAG, "Start called on dispatch queue with id: $queueId after it has already been cancelled.")
}
if (enableWarnings) {
Log.d(TAG, "Start called on dispatch queue with id: $queueId after it has already been cancelled.")
}
}
return this
}
Expand Down Expand Up @@ -632,11 +627,7 @@ object Dispatcher {

private fun removeDispatcher() {
handler.removeCallbacks(dispatcher)
val iterator = dispatchObserversSet.iterator()
while (iterator.hasNext()) {
iterator.next()
iterator.remove()
}
dispatchObservable.removeAllObservers()
if (closeHandler) {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.KITKAT) {
handler.looper.quitSafely()
Expand Down Expand Up @@ -898,12 +889,12 @@ object Dispatcher {
delayInMillis = 0,
worker = {
if (!zipDispatchQueue.isCancelled) {
if (zipDispatchQueue.completedDispatchQueue) {
zipDispatchQueue.completedDispatchQueue = false
zipQueueDispatch.runDispatcher()
} else {
zipDispatchQueue.rootDispatch.runDispatcher()
}
if (zipDispatchQueue.completedDispatchQueue) {
zipDispatchQueue.completedDispatchQueue = false
zipQueueDispatch.runDispatcher()
} else {
zipDispatchQueue.rootDispatch.runDispatcher()
}
}
it
},
Expand Down Expand Up @@ -1062,16 +1053,64 @@ object Dispatcher {
}

override fun addObserver(dispatchObserver: DispatchObserver<R>): Dispatch<R> {
dispatchObserversSet.add(dispatchObserver)
dispatchObservable.addObserver(dispatchObserver)
return this
}

override fun addObservers(dispatchObservers: List<DispatchObserver<R>>): Dispatch<R> {
dispatchObserversSet.addAll(dispatchObservers)
dispatchObservable.addObservers(dispatchObservers)
return this
}

override fun removeObserver(dispatchObserver: DispatchObserver<R>): Dispatch<R> {
dispatchObservable.removeObserver(dispatchObserver)
return this
}

override fun removeObservers(dispatchObservers: List<DispatchObserver<R>>): Dispatch<R> {
dispatchObservable.removeObservers(dispatchObservers)
return this
}

override fun getDispatchObservable(): DispatchObservable<R> {
return dispatchObservable
}

override fun setDispatchId(id: String): Dispatch<R> {
this.dispatchId = id
return this
}

}

private class DispatchObservableInfo<R>(private val handler: Handler): DispatchObservable<R> {

private val dispatchObserversSet = mutableSetOf<DispatchObserver<R>>()
private var result: Any? = INVALID_RESULT

override fun addObserver(dispatchObserver: DispatchObserver<R>): DispatchObservable<R> {
dispatchObserversSet.add(dispatchObserver)
if (result != INVALID_RESULT) {
handler.post {
notifyObserver(dispatchObserver)
}
}
return this
}

override fun addObservers(dispatchObservers: List<DispatchObserver<R>>): DispatchObservable<R> {
dispatchObserversSet.addAll(dispatchObservers)
if (result != INVALID_RESULT) {
handler.post {
for (dispatchObserver in dispatchObservers) {
notifyObserver(dispatchObserver)
}
}
}
return this
}

override fun removeObserver(dispatchObserver: DispatchObserver<R>): DispatchObservable<R> {
val iterator = dispatchObserversSet.iterator()
while (iterator.hasNext()) {
if (dispatchObserver == iterator.next()) {
Expand All @@ -1082,7 +1121,7 @@ object Dispatcher {
return this
}

override fun removeObservers(dispatchObservers: List<DispatchObserver<R>>): Dispatch<R> {
override fun removeObservers(dispatchObservers: List<DispatchObserver<R>>): DispatchObservable<R> {
val iterator = dispatchObserversSet.iterator()
var count = 0
while (iterator.hasNext()) {
Expand All @@ -1097,13 +1136,28 @@ object Dispatcher {
return this
}

override fun getDispatchObservable(): DispatchObservable<R, Dispatch<R>> {
return this
fun removeAllObservers() {
val iterator = dispatchObserversSet.iterator()
while (iterator.hasNext()) {
iterator.next()
iterator.remove()
}
}

override fun setDispatchId(id: String): Dispatch<R> {
this.dispatchId = id
return this
@Suppress("UNCHECKED_CAST")
private fun notifyObserver(dispatchObserver: DispatchObserver<R>) {
val r = result
if (r != INVALID_RESULT) {
dispatchObserver.onChanged(r as R)
}
}

fun notifyObservers(result: R) {
this.result = result
val iterator = dispatchObserversSet.iterator()
while (iterator.hasNext()) {
iterator.next().onChanged(result)
}
}

}
Expand Down

0 comments on commit 457740a

Please sign in to comment.