Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactory: remove session usage on Rediscovery #1189

Draft
wants to merge 4 commits into
base: 5.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
* limitations under the License.
*/

import { newError, error, int, Session, internal } from 'neo4j-driver-core'
import { newError, error, int, internal } from 'neo4j-driver-core'
import Rediscovery, { RoutingTable } from '../rediscovery'
import { HostNameResolver } from '../channel'
import SingleConnectionProvider from './connection-provider-single'
import PooledConnectionProvider from './connection-provider-pooled'
import { LeastConnectedLoadBalancingStrategy } from '../load-balancing'
import {
Expand Down Expand Up @@ -538,21 +537,22 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
const [session, error] = await this._createSessionForRediscovery(
currentRouter,
bookmarks,
impersonatedUser,
auth
)
if (session) {
const [connection, sessionContext] = session
try {
return [await this._rediscovery.lookupRoutingTableOnRouter(
session,
connection,
sessionContext,
routingTable.database,
currentRouter,
impersonatedUser
), null]
} catch (error) {
return this._handleRediscoveryError(error, currentRouter)
} finally {
session.close()
await connection.release().then(() => null)
}
} else {
// unable to acquire connection and create session towards the current router
Expand All @@ -564,7 +564,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
)
}

async _createSessionForRediscovery (routerAddress, bookmarks, impersonatedUser, auth) {
async _createSessionForRediscovery (routerAddress, bookmarks, auth) {
try {
const connection = await this._connectionPool.acquire({ auth }, routerAddress)

Expand All @@ -585,24 +585,18 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
? new DelegateConnection(connection, databaseSpecificErrorHandler)
: new DelegateConnection(connection)

const connectionProvider = new SingleConnectionProvider(delegateConnection)

const protocolVersion = connection.protocol().version
if (protocolVersion < 4.0) {
return [new Session({
mode: WRITE,
bookmarks: Bookmarks.empty(),
connectionProvider
}), null]
return [[delegateConnection, {
mode: WRITE, bookmarks: Bookmarks.empty()
}], null]
}

return [new Session({
return [[delegateConnection, {
bookmarks: bookmarks || Bookmarks.empty(),
mode: READ,
database: SYSTEM_DB_NAME,
bookmarks,
connectionProvider,
impersonatedUser
}), null]
database: SYSTEM_DB_NAME
}], null]
} catch (error) {
return this._handleRediscoveryError(error, routerAddress)
}
Expand Down

This file was deleted.

1 change: 0 additions & 1 deletion packages/bolt-connection/src/connection-provider/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export { default as SingleConnectionProvider } from './connection-provider-single'
export { default as PooledConnectionProvider } from './connection-provider-pooled'
export { default as DirectConnectionProvider } from './connection-provider-direct'
export { default as RoutingConnectionProvider } from './connection-provider-routing'
44 changes: 18 additions & 26 deletions packages/bolt-connection/src/rediscovery/rediscovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
* limitations under the License.
*/
import RoutingTable from './routing-table'
// eslint-disable-next-line no-unused-vars
import { Session } from 'neo4j-driver-core'

export default class Rediscovery {
/**
Expand All @@ -29,45 +27,39 @@ export default class Rediscovery {

/**
* Try to fetch new routing table from the given router.
* @param {Session} session the session to use.
* @param {Connection} connection the connection to use.
* @param {any} sessionContext the session context to be used
* @param {string} database the database for which to lookup routing table.
* @param {ServerAddress} routerAddress the URL of the router.
* @param {string} impersonatedUser The impersonated user
* @return {Promise<RoutingTable>} promise resolved with new routing table or null when connection error happened.
*/
lookupRoutingTableOnRouter (session, database, routerAddress, impersonatedUser) {
return session._acquireConnection(connection => {
return this._requestRawRoutingTable(
connection,
session,
lookupRoutingTableOnRouter (connection, sessionContext, database, routerAddress, impersonatedUser) {
return this._requestRawRoutingTable(
connection,
sessionContext,
database,
routerAddress,
impersonatedUser
).then(rawRoutingTable => {
if (rawRoutingTable.isNull) {
return null
}
return RoutingTable.fromRawRoutingTable(
database,
routerAddress,
impersonatedUser
).then(rawRoutingTable => {
if (rawRoutingTable.isNull) {
return null
}
return RoutingTable.fromRawRoutingTable(
database,
routerAddress,
rawRoutingTable
)
})
rawRoutingTable
)
})
}

_requestRawRoutingTable (connection, session, database, routerAddress, impersonatedUser) {
_requestRawRoutingTable (connection, sessionContext, database, routerAddress, impersonatedUser) {
return new Promise((resolve, reject) => {
connection.protocol().requestRoutingInformation({
routingContext: this._routingContext,
databaseName: database,
impersonatedUser,
sessionContext: {
bookmarks: session._lastBookmarks,
mode: session._mode,
database: session._database,
afterComplete: session._onComplete
},
sessionContext,
onCompleted: resolve,
onError: reject
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const { SERVICE_UNAVAILABLE, SESSION_EXPIRED } = error
const READ = 'READ'
const WRITE = 'WRITE'

describe.each([
xdescribe.each([
3,
4.0,
4.1,
Expand Down Expand Up @@ -308,7 +308,7 @@ describe.each([
})
}, 10000)

it.each(usersDataSet)('refreshes stale routing table to get read connection [user=%s]', (user, done) => {
it.only.each(usersDataSet)('refreshes stale routing table to get read connection [user=%s]', (user, done) => {
const pool = newPool()
const updatedRoutingTable = newRoutingTable(
null,
Expand Down Expand Up @@ -3938,7 +3938,11 @@ class FakeConnection extends Connection {
this._version = version
this._protocolVersion = protocolVersion
this.release = release
this.release = jest.fn(() => release(address, this))
this.release = jest.fn(() => {
if (release) {
release(address, this)
}
})
this.resetAndFlush = jest.fn(() => Promise.resolve())
this._server = server
this._authToken = authToken
Expand Down
41 changes: 13 additions & 28 deletions packages/bolt-connection/test/rediscovery/rediscovery.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ const {
const { PROTOCOL_ERROR } = error

const DATABASE_NOT_FOUND_CODE = 'Neo.ClientError.Database.DatabaseNotFound'
const DEFAULT_SESSION_CONTEXT = {
bookmarks: ['lastBook'],
mode: 'READ',
database: 'session db'
}

describe('#unit Rediscovery', () => {
it('should return the routing table when it available', async () => {
Expand Down Expand Up @@ -90,7 +95,6 @@ describe('#unit Rediscovery', () => {
const connection = new FakeConnection().withRequestRoutingInformationMock(
fakeOnError(rawRoutingTable)
)
const session = new FakeSession(connection)

await lookupRoutingTableOnRouter({
database,
Expand All @@ -104,23 +108,18 @@ describe('#unit Rediscovery', () => {
const requestParams = connection.seenRequestRoutingInformation[0]
expect(requestParams.routingContext).toEqual(routingContext)
expect(requestParams.databaseName).toEqual(database)
expect(requestParams.sessionContext).toEqual({
bookmarks: session._lastBookmarks,
mode: session._mode,
database: session._database,
afterComplete: session._onComplete
})
expect(requestParams.sessionContext).toEqual(DEFAULT_SESSION_CONTEXT)
})

it('should reject with DATABASE_NOT_FOUND_CODE when it happens ', async () => {
const expectedError = newError('Laia', DATABASE_NOT_FOUND_CODE)
const connection = new FakeConnection().withRequestRoutingInformationMock(
fakeOnError(expectedError)
)
try {
const initialAddress = '127.0.0.1'
const routingContext = { context: '1234 ' }

const connection = new FakeConnection().withRequestRoutingInformationMock(
fakeOnError(expectedError)
)
await lookupRoutingTableOnRouter({
initialAddress,
routingContext,
Expand Down Expand Up @@ -211,11 +210,11 @@ function lookupRoutingTableOnRouter ({
routerAddress = ServerAddress.fromUrl('bolt://localhost:7687'),
routingContext = {},
initialAddress = 'localhost:1235',
session,
sessionContext,
connection = new FakeConnection(),
rawRoutingTable
} = {}) {
const _session = session || new FakeSession(connection)
const _sessionContext = sessionContext || DEFAULT_SESSION_CONTEXT

if (connection && rawRoutingTable) {
connection.withRequestRoutingInformationMock(
Expand All @@ -226,26 +225,12 @@ function lookupRoutingTableOnRouter ({
const rediscovery = new Rediscovery(routingContext, initialAddress)

return rediscovery.lookupRoutingTableOnRouter(
_session,
connection,
_sessionContext,
database,
routerAddress
)
}
class FakeSession {
constructor (connection) {
this._connection = connection
this._called = 0
this._lastBookmarks = ['lastBook']
this._mode = 'READ'
this._database = 'session db'
this._onComplete = 'moked'
}

_acquireConnection (callback) {
this._called++
return callback(this._connection)
}
}

function fakeOnCompleted (raw = null) {
return ({ onCompleted }) => onCompleted(raw)
Expand Down
16 changes: 0 additions & 16 deletions packages/core/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,22 +219,6 @@ class Session {
return new Result(observerPromise, query, parameters, connectionHolder, watermarks)
}

/**
* This method is used by Rediscovery on the neo4j-driver-bolt-protocol package.
*
* @private
* @param {function()} connectionConsumer The method which will use the connection
* @returns {Promise<T>} A connection promise
*/
_acquireConnection<T> (connectionConsumer: ConnectionConsumer<T>): Promise<T> {
const { connectionHolder, resultPromise } = this._acquireAndConsumeConnection(connectionConsumer)

return resultPromise.then(async (result: T) => {
await connectionHolder.releaseConnection()
return result
})
}

/**
* Acquires a {@link Connection}, consume it and return a promise of the result along with
* the {@link ConnectionHolder} used in the process.
Expand Down
Loading