Skip to content

Commit

Permalink
Pool Prioritization (#2)
Browse files Browse the repository at this point in the history
* Added optional pool prioritization functionality.
* Fixed bug in heal process that was preventing heal failures from being detected properly in some cases.
  • Loading branch information
gshively11 authored Nov 9, 2017
1 parent db6b05a commit 03381f9
Show file tree
Hide file tree
Showing 6 changed files with 469 additions and 5 deletions.
2 changes: 2 additions & 0 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ reconnets/retries, and basic health/statistics reporting.
| [config.dsnProvider] | <code>function</code> | | A function returning a promise that resolves with an array of dsn object(s). This option will override config.dsn and config.dsns. Required if dsn and dsns are not provided. |
| [config.connectionPoolFactory] | <code>function</code> | | A function that receives the dsn objects from the dnsProvider and returns a promise that resolves with *connected* instance(s) of ConnectionPool. Use this option if you want to customize how mssql ConnectionPools are instantiated and connected. |
| [config.connectionPoolConfig] | <code>object</code> | | An object containing any configuration you want to attach to the config provided when creating an mssql ConnectionPool. This is useful if you don't want to create a custom dsnProvider or connectionPoolFactory to modify the configuration used to create ConnectionPools. Just keep in mind that any config set here will override the config set in the dsnProvider. Also keep in mind that node-mssql expects some configuration to exists on an "options" property (like timeouts). Check node-mssql README.md for more information. |
| [config.prioritizePools] | <code>boolean</code> | | A flag to enable pool prioritization behavior. If you enable this behavior, your dsns must have a numeric priority property. The lower the number, the higher the priority of the dsn, starting at 0. At a specified interval, the pools collection will be examined to see if the pools are no longer indexed in order of priority. If this is the case, the pools will be healed (if applicable) and re-ordered in terms of their priority. This is a useful behavior if you want to fail back to a "primary" dsn after it becomes healthy again. |
| [config.prioritizeInterval] | <code>number</code> | <code>30000</code> | The interval in milliseconds to run the pool prioritization check. Setting a value below 10000 is not advised, as the pool prioritization check can take significant resources if a pool heal is required. |
| [cb] | <code>function</code> | | Optional callback interface, providing this automatically calls warmup. It is preferable to use the Promise-based interface and call warmup explicitly. |

<a name="module_connection-pool-party.ConnectionPoolParty+warmup"></a>
Expand Down
118 changes: 115 additions & 3 deletions src/connection-pool-party.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import coalesceRequestResults from './coalesce-request-results';
import wrapListeners from './wrap-listeners';
import requestMethodSuccess from './request-method-success';
import requestMethodFailure from './request-method-failure';
import poolPrioritySort from './pool-priority-sort';

const debug = setDebug('mssql-pool-party');

Expand Down Expand Up @@ -57,6 +58,16 @@ const debug = setDebug('mssql-pool-party');
* will override the config set in the dsnProvider. Also keep in mind that node-mssql expects some
* configuration to exists on an "options" property (like timeouts). Check node-mssql README.md
* for more information.
* @param {boolean} [config.prioritizePools] - A flag to enable pool prioritization behavior.
* If you enable this behavior, your dsns must have a numeric priority property.
* The lower the number, the higher the priority of the dsn, starting at 0.
* At a specified interval, the pools collection will be examined to see if the pools
* are no longer indexed in order of priority. If this is the case, the pools will be
* healed (if applicable) and re-ordered in terms of their priority. This is a useful
* behavior if you want to fail back to a "primary" dsn after it becomes healthy again.
* @param {number} [config.prioritizeInterval=30000] - The interval in milliseconds
* to run the pool prioritization check. Setting a value below 10000 is not advised,
* as the pool prioritization check can take significant resources if a pool heal is required.
* @param {function} [cb] - Optional callback interface, providing this automatically calls
* warmup. It is preferable to use the Promise-based interface and call warmup explicitly.
* @class
Expand All @@ -71,6 +82,8 @@ export default class ConnectionPoolParty extends EventEmitter {
this.configDefaults = {
reconnects: 0,
retries: 0,
prioritizePools: false,
prioritizeInterval: 30000,
};

this.config = {
Expand All @@ -97,6 +110,8 @@ export default class ConnectionPoolParty extends EventEmitter {
this.warmupStrategy = this.config.warmupStrategy || raceWarmupStrategy;
this._warmupPromise = null;
this._healingPromise = null;
this._prioritizePromise = null;
this._prioritizeTimer = null;
// we don't want an 'Uncaught, unspecified "error" event.' exception
// so we have a dummy listener here.
this.on('error', () => {});
Expand Down Expand Up @@ -152,6 +167,10 @@ export default class ConnectionPoolParty extends EventEmitter {
.then(() => {
// if we've gotten here, then at least one pool succesfully connected
this.warmedUp = true;
// we only start prioritizing after a successful warmup
if (this.config.prioritizePools) {
this._startPrioritizingPools();
}
})
.catch((err) => {
debug('failed to retrieve dsns! reseting warmup promise so that another attempt can be made');
Expand Down Expand Up @@ -198,10 +217,14 @@ export default class ConnectionPoolParty extends EventEmitter {
this.pools = [];
})
.catch((err) => {
this.emit('error', err);
this.pools = [];
this.emit('error', err);
})
.then(() => {
if (this._prioritizeTimer) {
clearInterval(this._prioritizeTimer);
this._prioritizeTimer = null;
}
if (typeof cb === 'function') {
cb();
}
Expand Down Expand Up @@ -391,10 +414,15 @@ export default class ConnectionPoolParty extends EventEmitter {
}

_promotePool = (poolIndex) => {
// if pools are being healed, we can't mess with this.pools since
// it's mutated during healing. another successful request will
// if pools are being healed or prioritized, we can't mess with this.pools since
// it's mutated during those operations. another successful request will
// have to promote the pool
if (this._healingPromise) {
debug('_promotePool called during heal, skipping promotion');
return;
}
if (this._prioritizePromise) {
debug('_promotePool called during prioritize, skipping promotion');
return;
}
// track some stats
Expand Down Expand Up @@ -474,6 +502,12 @@ export default class ConnectionPoolParty extends EventEmitter {
return this.connectionPoolFactory(updatedDsn)
.then(
(pool) => {
if (pool.error) {
// some connection pool factories may opt to return an error instead
// of rejecting the promise. the existence of an error indicates that
// the pool did not heal successfully
return pool.error;
}
// need to transfer stats from the old unhealthy pool to the new one (mutates)
copyPoolStats(unhealthyPool, pool);
pool.lastHealAt = Date.now();
Expand All @@ -484,4 +518,82 @@ export default class ConnectionPoolParty extends EventEmitter {
err => err,
);
}

_prioritizePools = () => {
if (!this.warmedUp) {
debug('_prioritizePools called before warmup completed. this should not happen.');
return;
}
if (this._healingPromise) {
debug('_prioritizePools called during heal, skipping this run');
return;
}
if (this.pools.length === 0) {
debug('_prioritizePools called when no pools exist, this should not happen');
return;
}
if (this.pools.length === 1) {
debug('_prioritizePools called when only one pool exists, skipping this run');
return;
}
if (this._prioritizePromise) {
debug('_prioritizePools called again while already in progress');
} else {
debug('_prioritizePools called');
}
const firstPoolPriority = this.pools[0].dsn.priority;
if (firstPoolPriority === undefined || firstPoolPriority === 0) {
debug('first pool has top priority, no need to prioritize');
return;
}
const higherPriorityPools = this.pools.filter(
pool => pool.dsn.priority < firstPoolPriority,
);
if (higherPriorityPools.length === 0) {
debug('unexpected priority config on DSNs, unable to prioritize');
return;
}
const unhealthyPriorityPools = higherPriorityPools.filter(
pool => !pool.connection.connecting && !pool.connection.connected,
);
this._prioritizePromise = this._prioritizePromise || Promise.resolve(unhealthyPriorityPools)
.then((unhealthyPools) => {
// If all the pools of a higher priority than index 0 are healthy, we can
// skip the heal. If there are any unhealthy pools,
// we need to heal them before sorting.
if (unhealthyPools.length === 0) {
debug('no unhealthy pools detected during prioritization');
return true;
}
debug(`healing ${unhealthyPools.length} pools during prioritization`);
return this._healPools(unhealthyPools);
})
.then((anyHealthyPools) => {
if (!anyHealthyPools) {
debug('none of the pools eligible for prioritization are healthy, unable to prioritize');
return;
}
this.pools.sort(poolPrioritySort);
debug('prioritized pools');
})
.then(() => {
this._prioritizePromise = null;
})
.catch((err) => {
debug('unexpected error during _prioritizePools');
debug(err);
this._prioritizePromise = null;
});
}

_startPrioritizingPools = () => {
if (this._prioritizeTimer) {
// Prioritizing has already begun
return;
}
debug(`_startPrioritizingPools called with interval ${this.config.prioritizeInterval}`);
this._prioritizeTimer = setInterval(() => {
this._prioritizePools();
}, this.config.prioritizeInterval);
}
}
13 changes: 13 additions & 0 deletions src/pool-priority-sort.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export default function poolPrioritySort(a, b) {
if (a.connection.connected && !b.connection.connected) {
return -1;
}
if (!a.connection.connected && b.connection.connected) {
return 1;
}
if (!a.connection.connected && !b.connection.connected) {
return 0;
}
// if both pools are connected, then we sort by priority
return a.dsn.priority - b.dsn.priority;
}
11 changes: 11 additions & 0 deletions src/serial-warmup-strategy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// The serialWarmupStrategy attempts to create connection pools in series, based
// on the order of the dsns provided.
// This isn't a very good strategy to use in practice, but it can be useful for testing
export default function serialWarmupStrategy(dsns, connectionPoolFactory, onCreation, onError) {
return dsns.reduce((p, dsn) => p.then(
() => connectionPoolFactory(dsn).then(
pool => onCreation(pool),
err => onError(err),
),
), Promise.resolve());
}
6 changes: 4 additions & 2 deletions test/delay.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export default function delay(ms) {
return () => new Promise((resolve) => { setTimeout(resolve, ms); });
// In the event that we use jest.useFakeTimers, it is useful to optionally
// accept a specific implementation of setTimeout.
export default function delay(ms, setTimeoutToUse = setTimeout) {
return () => new Promise((resolve) => { setTimeoutToUse(resolve, ms); });
}
Loading

0 comments on commit 03381f9

Please sign in to comment.