Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update file to reflect ES7 spec #75

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Creating and Subscribing to Simple Observable Sequences #

You do not need to implement the `Observable` class manually to create an observable sequence. Similarly, you do not need to implement `Observer` either to subscribe to a sequence. By installing the Reactive Extension libraries, you can take advantage of the `Observable` type which provides many operators for you to create a simple sequence with zero, one or more elements. In addition, RxJS provides an overloaded `subscribe` method which allows you to pass in `onNext`, `onError` and `onCompleted` function handlers.
You do not need to implement the `Observable` class manually to create an observable sequence. Similarly, you do not need to implement `Observer` either to subscribe to a sequence. By installing the Reactive Extension libraries, you can take advantage of the `Observable` type which provides many operators for you to create a simple sequence with zero, one or more elements. In addition, RxJS provides an overloaded `subscribe` method which allows you to pass in `next`, `error` and `complete` function handlers.

## Creating a sequence from scratch ##

Expand All @@ -23,80 +23,80 @@ In this example, we will simply yield a single value of 42 and then mark it as c
```js
var source = Rx.Observable.create(observer => {
// Yield a single value and complete
observer.onNext(42);
observer.onCompleted();
observer.next(42);
observer.complete();

// Any cleanup logic might go here
return () => console.log('disposed')
return () => console.log('unsubscribed')
});

var subscription = source.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e),
() => console.log('onCompleted'));
x => console.log('next: %s', x),
e => console.log('error: %s', e),
() => console.log('completed'));

// => onNext: 42
// => onCompleted
// => next: 42
// => complete

subscription.dispose();
// => disposed
subscription.unsubscribe();
// => unsubscribed
```

For most operations, this is completely overkill, but shows the very basics of how most RxJS operators work.

## Creating and subscribing to a simple sequence ##

The following sample uses the [`range`](https://github.com/Reactive-Extensions/RxJS/tree/master/doc/api/core/operators/range.md) operator of the `Observable` type to create a simple observable collection of numbers. The observer subscribes to this collection using the Subscribe method of the Observable class, and provides actions that are delegates which handle `onNext`, `onError` and `onCompleted`. In our example, it creates a sequence of integers that starts with x and produces y sequential numbers afterwards.
The following sample uses the [`range`](https://github.com/Reactive-Extensions/RxJS/tree/master/doc/api/core/operators/range.md) operator of the `Observable` type to create a simple observable collection of numbers. The observer subscribes to this collection using the Subscribe method of the Observable class, and provides actions that are delegates which handle `next`, `error` and `complete`. In our example, it creates a sequence of integers that starts with x and produces y sequential numbers afterwards.

As soon as the subscription happens, the values are sent to the observer. The `onNext` function then prints out the values.
As soon as the subscription happens, the values are sent to the observer. The `next` function then prints out the values.

```js
// Creates an observable sequence of 5 integers, starting from 1
var source = Rx.Observable.range(1, 5);

// Prints out each item
var subscription = source.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e),
() => console.log('onCompleted'));

// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 4
// => onNext: 5
// => onCompleted
x => console.log('next: %s', x),
e => console.log('error: %s', e),
() => console.log('complete'));

// => next: 1
// => next: 2
// => next: 3
// => next: 4
// => next: 5
// => complete
```

When an observer subscribes to an observable sequence, the `subscribe` method may be using asynchronous behavior behind the scenes depending on the operator. Therefore, the `subscribe` call is asynchronous in that the caller is not blocked until the observation of the sequence completes. This will be covered in more details in the [Using Schedulers](schedulers.md) topic.

Notice that the [`subscribe`](https://github.com/Reactive-Extensions/RxJS/tree/master/doc/api/core/operators/susbcribe.md) method returns a `Disposable`, so that you can unsubscribe to a sequence and dispose of it easily. When you invoke the `dispose` method on the observable sequence, the observer will stop listening to the observable for data. Normally, you do not need to explicitly call `dispose` unless you need to unsubscribe early, or when the source observable sequence has a longer life span than the observer. Subscriptions in Rx are designed for fire-and-forget scenarios without the usage of a finalizer. Note that the default behavior of the Observable operators is to dispose of the subscription as soon as possible (i.e, when an `onCompleted` or `onError` messages is published). For example, the code will subscribe x to both sequences a and b. If a throws an error, x will immediately be unsubscribed from b.
Notice that the [`subscribe`](https://github.com/Reactive-Extensions/RxJS/tree/master/doc/api/core/operators/susbcribe.md) method returns a `Disposable`, so that you can unsubscribe to a sequence and unsubscribe from it easily. When you invoke the `unsubscribe` method on the observable sequence, the observer will stop listening to the observable for data. Normally, you do not need to explicitly call `unsubscribe` unless you need to unsubscribe early, or when the source observable sequence has a longer life span than the observer. Subscriptions in Rx are designed for fire-and-forget scenarios without the usage of a finalizer. Note that the default behavior of the Observable operators is to unsubscribe of the subscription as soon as possible (i.e, when an `complete` or `error` messages is published). For example, the code will subscribe x to both sequences a and b. If a throws an error, x will immediately be unsubscribed from b.

```js
var x = Rx.Observable.zip(a, b, (a1, b1) => a1 + b1).subscribe();
```

You can also tweak the code sample to use the Create operator of the Observer type, which creates and returns an observer from specified OnNext, OnError, and OnCompleted action delegates. You can then pass this observer to the Subscribe method of the Observable type. The following sample shows how to do this.
You can also tweak the code sample to use the Create operator of the Observer type, which creates and returns an observer from specified next, error, and complete action delegates. You can then pass this observer to the Subscribe method of the Observable type. The following sample shows how to do this.

```js
// Creates an observable sequence of 5 integers, starting from 1
var source = Rx.Observable.range(1, 5);

// Create observer
var observer = Rx.Observer.create(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e),
() => console.log('onCompleted'));
x => console.log('next: %s', x),
e => console.log('error: %s', e),
() => console.log('complete'));

// Prints out each item
var subscription = source.subscribe(observer);

// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 4
// => onNext: 5
// => onCompleted
// => next: 1
// => next: 2
// => next: 3
// => next: 4
// => next: 5
// => complete
```

In addition to creating an observable sequence from scratch, you can convert existing Arrays, events, callbacks and promises into observable sequences. The other topics in this section will show you how to do this.
Expand Down Expand Up @@ -148,16 +148,16 @@ var source = Rx.Observable.from(array);

// Prints out each item
var subscription = source.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e),
() => console.log('onCompleted'));

// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 4
// => onNext: 5
// => onCompleted
x => console.log('next: %s', x),
e => console.log('error: %s', e),
() => console.log('complete'));

// => next: 1
// => next: 2
// => next: 3
// => next: 4
// => next: 5
// => complete
```

You can also convert array-like objects such as objects with a length property and indexed with numbers. In this case, we'll simply have an object with a length of 5.
Expand All @@ -169,16 +169,16 @@ var source = Rx.Observable.from(arrayLike, (v, k) => k);

// Prints out each item
var subscription = source.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e),
() => console.log('onCompleted'));
x => console.log('next: %s', x),
e => console.log('error: %s', e),
() => console.log('complete'));

// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 4
// => onNext: 5
// => onCompleted
// => next: 1
// => next: 2
// => next: 3
// => next: 4
// => next: 5
// => complete

```

Expand All @@ -193,16 +193,16 @@ var source = Rx.Observable.from(set);

// Prints out each item
var subscription = source.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e),
() => console.log('onCompleted'));

// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 4
// => onNext: 5
// => onCompleted
x => console.log('next: %s', x),
e => console.log('error: %s', e),
() => console.log('complete'));

// => next: 1
// => next: 2
// => next: 3
// => next: 4
// => next: 5
// => complete
```

We can also do a `Map` as well by applying the same technique.
Expand All @@ -215,13 +215,13 @@ var source = Rx.Observable.from(map);

// Prints out each item
var subscription = source.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e),
() => console.log('onCompleted'));
x => console.log('next: %s', x),
e => console.log('error: %s', e),
() => console.log('complete'));

// => onNext: key1, 1
// => onNext: key2, 2
// => onCompleted
// => next: key1, 1
// => next: key2, 2
// => complete
```

The `from` method can also support ES6 generators which may already be in your browser, or coming to a browser near you. This allows us to do such things as Fibonacci sequences and so forth and convert them to an observable sequence.
Expand All @@ -243,16 +243,16 @@ var source = Rx.Observable.from(fibonacci()).take(5);

// Prints out each item
var subscription = source.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e),
() => console.log('onCompleted'));

// => onNext: 1
// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 5
// => onCompleted
x => console.log('next: %s', x),
e => console.log('error: %s', e),
() => console.log('complete'));

// => next: 1
// => next: 1
// => next: 2
// => next: 3
// => next: 5
// => complete
```

## Cold vs. Hot Observables ##
Expand All @@ -275,28 +275,28 @@ And now to the example.
var source = Rx.Observable.interval(1000);

var subscription1 = source.subscribe(
x => console.log('Observer 1: onNext: ' + x),
e => console.log('Observer 1: onError: ' + e.message),
() => console.log('Observer 1: onCompleted'));
x => console.log('Observer 1: next: ' + x),
e => console.log('Observer 1: error: ' + e.message),
() => console.log('Observer 1: complete'));

var subscription2 = source.subscribe(
x => console.log('Observer 2: onNext: ' + x),
e => console.log('Observer 2: onError: ' + e.message),
() => console.log('Observer 2: onCompleted'));
x => console.log('Observer 2: next: ' + x),
e => console.log('Observer 2: error: ' + e.message),
() => console.log('Observer 2: complete'));

setTimeout(() => {
subscription1.dispose();
subscription2.dispose();
subscription1.unsubscribe();
subscription2.unsubscribe();
}, 5000);

// => Observer 1: onNext: 0
// => Observer 2: onNext: 0
// => Observer 1: onNext: 1
// => Observer 2: onNext: 1
// => Observer 1: onNext: 2
// => Observer 2: onNext: 2
// => Observer 1: onNext: 3
// => Observer 2: onNext: 3
// => Observer 1: next: 0
// => Observer 2: next: 0
// => Observer 1: next: 1
// => Observer 2: next: 1
// => Observer 1: next: 2
// => Observer 2: next: 2
// => Observer 1: next: 3
// => Observer 2: next: 3
```

In the following example, we convert the previous cold observable sequence source to a hot one using the [`publish`](https://github.com/Reactive-Extensions/RxJS/tree/master/doc/api/core/operators/publish.md) operator, which returns a `ConnectableObservable` instance we name `hot`. The [`publish`](https://github.com/Reactive-Extensions/RxJS/tree/master/doc/api/core/operators/publish.md) operator provides a mechanism to share subscriptions by broadcasting a single subscription to multiple subscribers. The `hot` variable acts as a proxy by subscribing to `source` and, as it receives values from `source`, pushing them to its own subscribers. To establish a subscription to the backing source and start receiving values, we use the [`ConnectableObservable.prototype.connect`](https://github.com/Reactive-Extensions/RxJS/tree/master/doc/api/core/operators/connect.md) method. Since `ConnectableObservable` inherits `Observable`, we can use `subscribe` to subscribe to this hot sequence even before it starts running. Notice that in the example, the hot sequence has not been started when `subscription1` subscribes to it. Therefore, no value is pushed to the subscriber. After calling Connect, values are then pushed to `subscription1`. After a delay of 3 seconds, `subscription2` subscribes to `hot` and starts receiving the values immediately from the current position (3 in this case) until the end. The output looks like this:
Expand All @@ -305,15 +305,15 @@ In the following example, we convert the previous cold observable sequence sourc
// => Current time: 1382562433256
// => Current Time after 1st subscription: 1382562433260
// => Current Time after connect: 1382562436261
// => Observer 1: onNext: 0
// => Observer 1: onNext: 1
// => Observer 1: next: 0
// => Observer 1: next: 1
// => Current Time after 2nd subscription: 1382562439262
// => Observer 1: onNext: 2
// => Observer 2: onNext: 2
// => Observer 1: onNext: 3
// => Observer 2: onNext: 3
// => Observer 1: onNext: 4
// => Observer 2: onNext: 4
// => Observer 1: next: 2
// => Observer 2: next: 2
// => Observer 1: next: 3
// => Observer 2: next: 3
// => Observer 1: next: 4
// => Observer 2: next: 4
```

First, we need to ensure we reference the proper files if in the browser. Note that the RxJS NPM Package already includes all operators by default.
Expand All @@ -335,9 +335,9 @@ var hot = source.publish();

// No value is pushed to 1st subscription at this point
var subscription1 = hot.subscribe(
x => console.log('Observer 1: onNext: %s', x),
e => console.log('Observer 1: onError: %s', e),
() => console.log('Observer 1: onCompleted'));
x => console.log('Observer 1: next: %s', x),
e => console.log('Observer 1: error: %s', e),
() => console.log('Observer 1: complete'));

console.log('Current Time after 1st subscription: ' + Date.now());

Expand All @@ -355,24 +355,24 @@ setTimeout(() => {
console.log('Current Time after 2nd subscription: ' + Date.now());

var subscription2 = hot.subscribe(
x => console.log('Observer 2: onNext: %s', x),
e => console.log('Observer 2: onError: %s', e),
() => console.log('Observer 2: onCompleted'));
x => console.log('Observer 2: next: %s', x),
e => console.log('Observer 2: error: %s', e),
() => console.log('Observer 2: complete'));

}, 3000);
}, 3000);

// => Current Time after connect: 1431197578426
// => Observer 1: onNext: 0
// => Observer 1: onNext: 1
// => Observer 1: onNext: 2
// => Observer 1: next: 0
// => Observer 1: next: 1
// => Observer 1: next: 2
// => Current Time after 2nd subscription: 1431197581434
// => Observer 1: onNext: 3
// => Observer 2: onNext: 3
// => Observer 1: onNext: 4
// => Observer 2: onNext: 4
// => Observer 1: onNext: 5
// => Observer 2: onNext: 5
// => Observer 1: next: 3
// => Observer 2: next: 3
// => Observer 1: next: 4
// => Observer 2: next: 4
// => Observer 1: next: 5
// => Observer 2: next: 5
// => ...
```

Expand Down