Skip to content

Commit

Permalink
feat: add IxJS operator
Browse files Browse the repository at this point in the history
  • Loading branch information
aikoven committed Aug 2, 2021
1 parent fe98c90 commit 5781d90
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 2 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Automatically retry subscriptions with exponential backoff.
- [Installation](#installation)
- [Usage](#usage)
- [Collection subscriptions](#collection-subscriptions)
- [IxJS operators](#ixjs-operators)

## Installation

Expand Down Expand Up @@ -125,5 +126,11 @@ type RetryCollectionSubscriptionOptions<Value, Revision = Value> = {
};
```

### IxJS operators

All functions are also exported in form of
[`IxJS`](https://github.com/ReactiveX/IxJS) operators from
`retry-subscription/ix` module.

[npm-image]: https://badge.fury.io/js/retry-subscription.svg
[npm-url]: https://badge.fury.io/js/retry-subscription
7 changes: 7 additions & 0 deletions ix/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name": "retry-subscription-ix",
"version": "0.0.0",
"private": true,
"main": "../lib/ix/index.js",
"typings": "../lib/ix/index.d.ts"
}
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"main": "lib/index.js",
"typings": "lib/index.d.ts",
"files": [
"ix",
"lib",
"src",
"!src/**/*.test.ts",
Expand All @@ -33,7 +34,6 @@
"@tsconfig/recommended": "^1.0.1",
"@types/jest": "^26.0.24",
"@types/lodash.isequal": "^4.0.0",
"ix": "^4.5.0",
"jest": "^27.0.6",
"jest-mock-random": "^1.1.1",
"prettier": "^2.3.2",
Expand All @@ -43,6 +43,7 @@
},
"dependencies": {
"abort-controller-x": "^0.2.6",
"ix": "^4.5.0",
"lodash.isequal": "^4.0.0",
"node-abort-controller": "^2.0.0"
}
Expand Down
1 change: 1 addition & 0 deletions src/ix/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './retryCollectionSubscription';
109 changes: 109 additions & 0 deletions src/ix/retryCollectionSubscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import {AsyncIterableX} from 'ix/asynciterable';
import {wrapWithAbort} from 'ix/asynciterable/operators';
import {AbortSignal} from 'node-abort-controller';
import {
retryCollectionSubscription as retry,
CollectionSubscriptionUpdate,
} from '../retryCollectionSubscription';

export type RetryCollectionSubscriptionOptions<Value, Revision = Value> = {
/**
* Starting delay before first retry attempt in milliseconds.
*
* Defaults to 1000.
*
* Example: if `baseMs` is 100, then retries will be attempted in 100ms,
* 200ms, 400ms etc (not counting jitter).
*/
baseMs?: number;
/**
* Maximum delay between attempts in milliseconds.
*
* Defaults to 15 seconds.
*
* Example: if `baseMs` is 1000 and `maxDelayMs` is 3000, then retries will be
* attempted in 1000ms, 2000ms, 3000ms, 3000ms etc (not counting jitter).
*/
maxDelayMs?: number;
/**
* Maximum for the total number of attempts.
*
* Defaults to `Infinity`.
*/
maxAttempts?: number;
/**
* Called when an error is thrown by inner subscription, before setting delay
* timer.
*
* If at the time of error the inner subscription was initialized (i.e. has
* had initial emission), then the `attempt` and `delayMs` will be
* `undefined`, and the retry will happen immediately.
*
* If the error happened before initialization, then the `attempt` will start
* from 0 and will be incremented with each attempt, and the retry will happen
* after exponential backoff.
*
* Rethrow error from this callback to prevent further retries.
*/
onError?: (
error: unknown,
attempt: number | undefined,
delayMs: number | undefined,
) => void;
/**
* If the value has a field that is changed each time the collection item
* changes, consider returning supplying `getRevision` function that returns
* it. This way, less memory will be needed to store the state, and less CPU
* will be needed for diffing algorithm on resubscription.
*
* Defaults to identity function, i.e. the revision is the whole value.
*/
getRevision?: (value: Value) => Revision;
/**
* Equality function used by diffing algorithm on resubscription.
*
* Defaults to deep equality.
*/
equality?: (a: Revision, b: Revision) => boolean;
};

export {CollectionSubscriptionUpdate};

class RetryCollectionSubscriptionAsyncIterable<
Key extends string | number,
Value,
Revision = Value,
> extends AsyncIterableX<Array<CollectionSubscriptionUpdate<Key, Value>>> {
constructor(
private _source: AsyncIterable<
Array<CollectionSubscriptionUpdate<Key, Value>>
>,
private _options: RetryCollectionSubscriptionOptions<Value, Revision>,
) {
super();
}

[Symbol.asyncIterator](
signal?: AbortSignal,
): AsyncIterator<Array<CollectionSubscriptionUpdate<Key, Value>>> {
return retry(signal => wrapWithAbort(this._source, signal), {
...this._options,
signal,
})[Symbol.asyncIterator]();
}
}

export function retryCollectionSubscription<
Key extends string | number,
Value,
Revision = Value,
>(options: RetryCollectionSubscriptionOptions<Value, Revision>) {
return function retryCollectionSubscriptionOperatorFunction(
source: AsyncIterable<Array<CollectionSubscriptionUpdate<Key, Value>>>,
): AsyncIterableX<Array<CollectionSubscriptionUpdate<Key, Value>>> {
return new RetryCollectionSubscriptionAsyncIterable<Key, Value, Revision>(
source,
options,
);
};
}
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
"declaration": true,
"stripInternal": true
},
"files": ["src/index.ts"],
"files": ["src/index.ts", "src/ix/index.ts"],
"include": ["src/**/*.test.ts", "src/**/__tests__/**/*.ts"]
}

0 comments on commit 5781d90

Please sign in to comment.