Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CompositeSubscription #42

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ RxLua
- [Subscription](#subscription)
- [create](#createaction)
- [unsubscribe](#unsubscribe)
- [CompositeSubscription](#compositesubscription)
- [create](#createsubscriptions)
- [unsubscribe](#unsubscribe)
- [add](#addsubscriptions)
- [clear](#clear)
- [Observer](#observer)
- [create](#createonnext-onerror-oncompleted)
- [onNext](#onnextvalues)
Expand Down Expand Up @@ -129,6 +134,42 @@ Creates a new Subscription.

Unsubscribes the subscription, performing any necessary cleanup work.

# CompositeSubscription

A Subscription that is composed of other subscriptions and can be used to unsubscribe multiple subscriptions at once.

---

#### `.create(subscriptions)`

Creates a new CompositeSubscription. It may be initialized empty or with a set of Subscriptions.

| Name | Type | Default | Description |
|------|------|---------|-------------|
| `subscriptions` | Subscription... | | A set of subscriptions to initialize the object with. |

---

#### `:unsubscribe()`

Unsubscribes all subscriptions that were added to this CompositeSubscription and removes them from this CompositeSubscription.

---

#### `:add(subscriptions)`

Adds one or more Subscriptions to this CompositeSubscription. If this subscription has already unsubscribed, then any added subscriptions will be immediately disposed.

| Name | Type | Default | Description |
|------|------|---------|-------------|
| `subscriptions` | Subscription... | | The list of Subscriptions to add. |

---

#### `:clear()`

Removes all subscriptions from this CompositeSubscription and calls `Subscription:unsubscribe()` on each one. More subscriptions can be added to this CompositeSubscription in the future.

# Observer

Observers are simple objects that receive values from Observables.
Expand Down
57 changes: 57 additions & 0 deletions rx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,62 @@ function Subscription:unsubscribe()
self.unsubscribed = true
end

--- @class CompositeSubscription
-- @description A Subscription that is composed of other subscriptions and can be used to
-- unsubscribe multiple subscriptions at once.
local CompositeSubscription = setmetatable({}, Subscription)
CompositeSubscription.__index = CompositeSubscription
CompositeSubscription.__tostring = util.constant('CompositeSubscription')

--- Creates a new CompositeSubscription. It may be initialized empty or with a set of Subscriptions.
-- @arg {Subscription...} subscriptions - A set of subscriptions to initialize the object with.
-- @returns {CompositeSubscription}
function CompositeSubscription.create(...)
local self = {
subscriptions = util.pack(...),
unsubscribed = false,
}

return setmetatable(self, CompositeSubscription)
end

--- Unsubscribes all subscriptions that were added to this CompositeSubscription and removes them
-- from this CompositeSubscription.
-- @returns {nil}
function CompositeSubscription:unsubscribe()
if not self.unsubscribed then
self.unsubscribed = true
for _,subscription in ipairs(self.subscriptions) do
subscription:unsubscribe()
end
self.subscriptions = {}
end
end

--- Adds one or more Subscriptions to this CompositeSubscription. If this subscription has already
-- unsubscribed, then any added subscriptions will be immediately disposed.
-- @arg {Subscription...} subscriptions - The list of Subscriptions to add.
-- @returns {nil}
function CompositeSubscription:add(...)
for _,subscription in ipairs(util.pack(...)) do
if not self.unsubscribed then
table.insert(self.subscriptions, subscription)
else
subscription:unsubscribe()
end
end
end

--- Removes all subscriptions from this CompositeSubscription and calls `Subscription:unsubscribe()`
-- on each one. More subscriptions can be added to this CompositeSubscription in the future.
-- @returns {nil}
function CompositeSubscription:clear()
for _,subscription in ipairs(self.subscriptions) do
subscription:unsubscribe()
self.subscriptions = {}
end
end

--- @class Observer
-- @description Observers are simple objects that receive values from Observables.
local Observer = {}
Expand Down Expand Up @@ -2301,6 +2357,7 @@ Observable['repeat'] = Observable.replicate
return {
util = util,
Subscription = Subscription,
CompositeSubscription = CompositeSubscription,
Observer = Observer,
Observable = Observable,
ImmediateScheduler = ImmediateScheduler,
Expand Down
58 changes: 58 additions & 0 deletions src/subscriptions/compositesubscription.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
local Subscription = require 'subscription'
local util = require 'util'

--- @class CompositeSubscription
-- @description A Subscription that is composed of other subscriptions and can be used to
-- unsubscribe multiple subscriptions at once.
local CompositeSubscription = setmetatable({}, Subscription)
CompositeSubscription.__index = CompositeSubscription
CompositeSubscription.__tostring = util.constant('CompositeSubscription')

--- Creates a new CompositeSubscription. It may be initialized empty or with a set of Subscriptions.
-- @arg {Subscription...} subscriptions - A set of subscriptions to initialize the object with.
-- @returns {CompositeSubscription}
function CompositeSubscription.create(...)
local self = {
subscriptions = util.pack(...),
unsubscribed = false,
}

return setmetatable(self, CompositeSubscription)
end

--- Unsubscribes all subscriptions that were added to this CompositeSubscription and removes them
-- from this CompositeSubscription.
-- @returns {nil}
function CompositeSubscription:unsubscribe()
if not self.unsubscribed then
self.unsubscribed = true
for _,subscription in ipairs(self.subscriptions) do
subscription:unsubscribe()
end
self.subscriptions = {}
end
end

--- Adds one or more Subscriptions to this CompositeSubscription. If this subscription has already
-- unsubscribed, then any added subscriptions will be immediately disposed.
-- @arg {Subscription...} subscriptions - The list of Subscriptions to add.
-- @returns {nil}
function CompositeSubscription:add(...)
for _,subscription in ipairs(util.pack(...)) do
if not self.unsubscribed then
table.insert(self.subscriptions, subscription)
else
subscription:unsubscribe()
end
end
end

--- Removes all subscriptions from this CompositeSubscription and calls `Subscription:unsubscribe()`
-- on each one. More subscriptions can be added to this CompositeSubscription in the future.
-- @returns {nil}
function CompositeSubscription:clear()
for _,subscription in ipairs(self.subscriptions) do
subscription:unsubscribe()
self.subscriptions = {}
end
end
File renamed without changes.
168 changes: 168 additions & 0 deletions tests/compositesubscription.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
describe('CompositeSubscription', function()
describe('create', function()
it('returns a CompositeSubscription', function()
local compositeSubscription = Rx.CompositeSubscription.create()
expect(compositeSubscription).to.be.an(Rx.CompositeSubscription)
end)
end)

describe('unsubscribe', function()
describe('with subscriptions composed at initialization', function()
it('unsubscribes composed subscriptions', function()
local subscriptions = {
Rx.Subscription.create(function() end),
Rx.Subscription.create(function() end),
}
local spies = {
spy(subscriptions[1], 'unsubscribe'),
spy(subscriptions[2], 'unsubscribe'),
}

local compositeSubscription = Rx.CompositeSubscription.create(
subscriptions[1], subscriptions[2])
compositeSubscription:unsubscribe()

expect(#spies[1]).to.equal(1)
expect(#spies[2]).to.equal(1)
end)

it('only invokes unsubscribe once', function()
local subscriptions = {
Rx.Subscription.create(function() end),
Rx.Subscription.create(function() end),
}
local spies = {
spy(subscriptions[1], 'unsubscribe'),
spy(subscriptions[2], 'unsubscribe'),
}

local compositeSubscription = Rx.CompositeSubscription.create(subscriptions[1], subscriptions[2])
compositeSubscription:unsubscribe()
compositeSubscription:unsubscribe()

expect(#spies[1]).to.equal(1)
expect(#spies[2]).to.equal(1)
end)
end)

describe('with subscriptions composed dynamically', function()
it('unsubscribes composed subscriptions', function()
local subscriptions = {
Rx.Subscription.create(function() end),
Rx.Subscription.create(function() end),
}
local spies = {
spy(subscriptions[1], 'unsubscribe'),
spy(subscriptions[2], 'unsubscribe'),
}

local compositeSubscription = Rx.CompositeSubscription.create()
compositeSubscription:add(subscriptions[1], subscriptions[2])
compositeSubscription:unsubscribe()

expect(#spies[1]).to.equal(1)
expect(#spies[2]).to.equal(1)
end)

it('only invokes unsubscribe once', function()
local subscriptions = {
Rx.Subscription.create(function() end),
Rx.Subscription.create(function() end),
}
local spies = {
spy(subscriptions[1], 'unsubscribe'),
spy(subscriptions[2], 'unsubscribe'),
}

local compositeSubscription = Rx.CompositeSubscription.create()
compositeSubscription:add(subscriptions[1], subscriptions[2])
compositeSubscription:unsubscribe()
compositeSubscription:unsubscribe()

expect(#spies[1]).to.equal(1)
expect(#spies[2]).to.equal(1)
end)
end)
end)

describe('add', function()
it('does not unsubscribe composed subscriptions', function()
local subscriptions = {
Rx.Subscription.create(function() end),
Rx.Subscription.create(function() end),
}
local spies = {
spy(subscriptions[1], 'unsubscribe'),
spy(subscriptions[2], 'unsubscribe'),
}

local compositeSubscription = Rx.CompositeSubscription.create()
compositeSubscription:add(subscriptions[1], subscriptions[2])

expect(#spies[1]).to.equal(0)
expect(#spies[2]).to.equal(0)
end)

describe('if CompositeSubscription is already unsubscribed', function()
it('immediately unsubscribes subscriptions', function()
local subscriptions = {
Rx.Subscription.create(function() end),
Rx.Subscription.create(function() end),
}
local spies = {
spy(subscriptions[1], 'unsubscribe'),
spy(subscriptions[2], 'unsubscribe'),
}

local compositeSubscription = Rx.CompositeSubscription.create()
compositeSubscription:unsubscribe()

compositeSubscription:add(subscriptions[1], subscriptions[2])

expect(#spies[1]).to.equal(1)
expect(#spies[2]).to.equal(1)
end)
end)

describe('if CompositeSubscription was cleared', function()
it('does not unsubscribe composed subscriptions', function()
local subscriptions = {
Rx.Subscription.create(function() end),
Rx.Subscription.create(function() end),
}
local spies = {
spy(subscriptions[1], 'unsubscribe'),
spy(subscriptions[2], 'unsubscribe'),
}

local compositeSubscription = Rx.CompositeSubscription.create(
Rx.Subscription.create(function() end))
compositeSubscription:clear()

compositeSubscription:add(subscriptions[1], subscriptions[2])

expect(#spies[1]).to.equal(0)
expect(#spies[2]).to.equal(0)
end)
end)
end)

describe('clear', function()
it('unsubscribes composed subscriptions', function()
local subscriptions = {
Rx.Subscription.create(function() end),
Rx.Subscription.create(function() end),
}
local spies = {
spy(subscriptions[1], 'unsubscribe'),
spy(subscriptions[2], 'unsubscribe'),
}

local compositeSubscription = Rx.CompositeSubscription.create(subscriptions[1], subscriptions[2])
compositeSubscription:clear()

expect(#spies[1]).to.equal(1)
expect(#spies[2]).to.equal(1)
end)
end)
end)
1 change: 1 addition & 0 deletions tests/runner.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ else
'observer',
'observable',
'subscription',
'compositesubscription',
'subject',
'asyncsubject',
'behaviorsubject',
Expand Down
4 changes: 3 additions & 1 deletion tools/build.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

local files = {
'src/util.lua',
'src/subscription.lua',
'src/subscriptions/subscription.lua',
'src/subscriptions/compositesubscription.lua',
'src/observer.lua',
'src/observable.lua',
'src/operators/all.lua',
Expand Down Expand Up @@ -90,6 +91,7 @@ exports.homepage = 'https://github.com/bjornbytes/rxlua'
local footer = [[return {
util = util,
Subscription = Subscription,
CompositeSubscription = CompositeSubscription,
Observer = Observer,
Observable = Observable,
ImmediateScheduler = ImmediateScheduler,
Expand Down