Skip to content

Commit

Permalink
fix!: rename "transient" connections to "limited" (#2645)
Browse files Browse the repository at this point in the history
To better align with [[email protected]](https://github.com/libp2p/go-libp2p/releases/tag/v0.34.0)
rename "transient" connections to "limited".

BREAKING CHANGE: There are three breaking API changes:
  * Connections have an optional `.limits` property
  * The `runOnTransientConnection` property of `libp2p.handle` and `libp2p.dialProtocol` has been renamed to `runOnLimitedConnection`
  * The `notifyOnTransient` property of `libp2p.register` has been renamed `notifyOnLimitedConnection`

Refs: #2622
  • Loading branch information
achingbrain committed Sep 6, 2024
1 parent 9ba7d92 commit b0be85b
Show file tree
Hide file tree
Showing 40 changed files with 335 additions and 170 deletions.
74 changes: 37 additions & 37 deletions packages/integration-tests/test/circuit-relay.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ const echoService = (components: EchoServiceComponents): unknown => {
stream, stream
)
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})
},
stop () {}
Expand Down Expand Up @@ -560,7 +560,7 @@ describe('circuit-relay', () => {
// open hop stream and try to connect to remote
const stream = await local.dialProtocol(ma, RELAY_V2_HOP_CODEC, {
runOnTransientConnection: true
runOnLimitedConnection: true
})
const hopStream = pbStream(stream).pb(HopMessage)
Expand Down Expand Up @@ -638,43 +638,43 @@ describe('circuit-relay', () => {
expect(circuitListener[0].relayStore.listenerCount('relay:removed')).to.equal(1)
})

it('should mark an outgoing relayed connection as transient', async () => {
it('should mark an outgoing relayed connection as limited', async () => {
// discover relay and make reservation
const connectionToRelay = await remote.dial(relay1.getMultiaddrs()[0])

// connection to relay should not be marked transient
expect(connectionToRelay).to.have.property('transient', false)
// connection to relay should not be limited
expect(connectionToRelay).to.have.property('limits').that.is.undefined()

await usingAsRelay(remote, relay1)

// dial the remote through the relay
const ma = getRelayAddress(remote)
const connection = await local.dial(ma)

// connection to remote through relay should be marked transient
expect(connection).to.have.property('transient', true)
// connection to remote through relay should be limited
expect(connection).to.have.property('limits').that.is.ok()
})

it('should mark an incoming relayed connection as transient', async () => {
it('should mark an incoming relayed connection as limited', async () => {
// discover relay and make reservation
const connectionToRelay = await remote.dial(relay1.getMultiaddrs()[0])

// connection to relay should not be marked transient
expect(connectionToRelay).to.have.property('transient', false)
// connection to relay should not be limited
expect(connectionToRelay).to.have.property('limits').that.is.undefined()

await usingAsRelay(remote, relay1)

// dial the remote through the relay
const ma = getRelayAddress(remote)
await local.dial(ma)

// connection from local through relay should be marked transient
// connection from local through relay should be limited
const connections = remote.getConnections(local.peerId)
expect(connections).to.have.lengthOf(1)
expect(connections).to.have.nested.property('[0].transient', true)
expect(connections).to.have.nested.property('[0].limits').that.is.ok()
})

it('should not open streams on a transient connection', async () => {
it('should not open streams on a limited connection', async () => {
// discover relay and make reservation
await remote.dial(relay1.getMultiaddrs()[0])
await usingAsRelay(remote, relay1)
Expand All @@ -683,21 +683,21 @@ describe('circuit-relay', () => {
const ma = getRelayAddress(remote)
const connection = await local.dial(ma)

// connection should be marked transient
expect(connection).to.have.property('transient', true)
// connection should be marked limited
expect(connection).to.have.property('limits').that.is.ok()

await expect(connection.newStream('/my-protocol/1.0.0'))
.to.eventually.be.rejected.with.property('code', 'ERR_TRANSIENT_CONNECTION')
.to.eventually.be.rejected.with.property('code', 'ERR_LIMITED_CONNECTION')
})

it('should not allow incoming streams on a transient connection', async () => {
it('should not allow incoming streams on a limited connection', async () => {
const protocol = '/my-protocol/1.0.0'

// remote registers handler, disallow running over transient streams
// remote registers handler, disallow running over limited connections
await remote.handle(protocol, ({ stream }) => {
void pipe(stream, stream)
}, {
runOnTransientConnection: false
runOnLimitedConnection: false
})

// discover relay and make reservation
Expand All @@ -708,23 +708,23 @@ describe('circuit-relay', () => {
const ma = getRelayAddress(remote)
const connection = await local.dial(ma)

// connection should be marked transient
expect(connection).to.have.property('transient', true)
// connection should be marked limited
expect(connection).to.have.property('limits').that.is.ok()

await expect(connection.newStream('/my-protocol/1.0.0', {
runOnTransientConnection: false
runOnLimitedConnection: false
}))
.to.eventually.be.rejected.with.property('code', 'ERR_TRANSIENT_CONNECTION')
.to.eventually.be.rejected.with.property('code', 'ERR_LIMITED_CONNECTION')
})

it('should open streams on a transient connection when told to do so', async () => {
it('should open streams on a limited connection when told to do so', async () => {
const protocol = '/my-protocol/1.0.0'

// remote registers handler, allow running over transient streams
// remote registers handler, allow running over limited streams
await remote.handle(protocol, ({ stream }) => {
void pipe(stream, stream)
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// discover relay and make reservation
Expand All @@ -735,11 +735,11 @@ describe('circuit-relay', () => {
const ma = getRelayAddress(remote)
const connection = await local.dial(ma)

// connection should be marked transient
expect(connection).to.have.property('transient', true)
// connection should be marked limited
expect(connection).to.have.property('limits').that.is.ok()

await expect(connection.newStream('/my-protocol/1.0.0', {
runOnTransientConnection: true
runOnLimitedConnection: true
}))
.to.eventually.be.ok()
})
Expand Down Expand Up @@ -912,15 +912,15 @@ describe('circuit-relay', () => {
} catch {}
})
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// dial the remote from the local through the relay
const ma = getRelayAddress(remote)

try {
const stream = await local.dialProtocol(ma, protocol, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

await stream.sink(async function * () {
Expand Down Expand Up @@ -1056,7 +1056,7 @@ describe('circuit-relay', () => {
const ma = getRelayAddress(remote)

const stream = await local.dialProtocol(ma, ECHO_PROTOCOL, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// write more than the default data limit
Expand All @@ -1075,7 +1075,7 @@ describe('circuit-relay', () => {
const ma = getRelayAddress(remote)

const stream = await local.dialProtocol(ma, ECHO_PROTOCOL, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

let finished = false
Expand Down Expand Up @@ -1107,21 +1107,21 @@ describe('circuit-relay', () => {
expect(finish - start).to.be.greaterThan(defaultDurationLimit)
})

it('should not mark an outgoing connection as transient', async () => {
it('should not mark an outgoing connection as limited', async () => {
const ma = getRelayAddress(remote)

const connection = await local.dial(ma)
expect(connection).to.have.property('transient', false)
expect(connection).to.have.property('limits').that.is.undefined()
})

it('should not mark an incoming connection as transient', async () => {
it('should not mark an incoming connection as limited', async () => {
const ma = getRelayAddress(remote)

await local.dial(ma)

const connections = remote.getConnections(local.peerId)
expect(connections).to.have.lengthOf(1)
expect(connections).to.have.nested.property('[0].transient', false)
expect(connections).to.have.nested.property('[0].limits').that.is.undefined()
})
})
})
10 changes: 5 additions & 5 deletions packages/integration-tests/test/dcutr.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe('dcutr', () => {
async function waitForOnlyDirectConnections (): Promise<void> {
await pRetry(async () => {
const connections = libp2pA.getConnections(libp2pB.peerId)
const onlyDirect = connections.filter(conn => !conn.transient)
const onlyDirect = connections.filter(conn => conn.limits == null)

if (onlyDirect.length === connections.length) {
// all connections are direct
Expand Down Expand Up @@ -109,8 +109,8 @@ describe('dcutr', () => {
const relayedAddress = multiaddr(`/ip4/127.0.0.1/tcp/${RELAY_PORT}/p2p/${relay.peerId}/p2p-circuit/p2p/${libp2pB.peerId}`)
const connection = await libp2pA.dial(relayedAddress)

// connection should be transient
expect(connection).to.have.property('transient', true)
// connection should be limited
expect(connection).to.have.property('limits').that.is.ok()

// wait for DCUtR unilateral upgrade
await waitForOnlyDirectConnections()
Expand Down Expand Up @@ -166,8 +166,8 @@ describe('dcutr', () => {
const relayedAddress = multiaddr(`/ip4/127.0.0.1/tcp/${RELAY_PORT}/p2p/${relay.peerId}/p2p-circuit/p2p/${libp2pB.peerId}`)
const connection = await libp2pA.dial(relayedAddress)

// connection should be transient
expect(connection).to.have.property('transient', true)
// connection should be limited
expect(connection).to.have.property('limits').that.is.ok()

// wait for DCUtR unilateral upgrade
await waitForOnlyDirectConnections()
Expand Down
2 changes: 1 addition & 1 deletion packages/integration-tests/test/fetch.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async function createNode (): Promise<Libp2p<{ fetch: Fetch }>> {

describe('fetch', () => {
if (isWebWorker) {
it.skip('tests are skipped because WebWorkers can only have transient connections', () => {
it.skip('tests are skipped because WebWorkers can only have limited connections', () => {

})
return
Expand Down
2 changes: 1 addition & 1 deletion packages/integration-tests/test/ping.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ describe('ping', () => {
stream
)
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

const latency = await nodes[0].services.ping.ping(nodes[1].getMultiaddrs())
Expand Down
5 changes: 2 additions & 3 deletions packages/interface-compliance-tests/src/mocks/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Uint8ArrayList } from 'uint8arraylist'
import { mockMultiaddrConnection } from './multiaddr-connection.js'
import { mockMuxer } from './muxer.js'
import { mockRegistrar } from './registrar.js'
import type { AbortOptions, ComponentLogger, Logger, MultiaddrConnection, Connection, Stream, Direction, ConnectionTimeline, ConnectionStatus, PeerId, StreamMuxer, StreamMuxerFactory, NewStreamOptions } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger, Logger, MultiaddrConnection, Connection, Stream, Direction, ConnectionTimeline, ConnectionStatus, PeerId, StreamMuxer, StreamMuxerFactory, NewStreamOptions, ConnectionLimits } from '@libp2p/interface'
import type { Registrar } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Duplex, Source } from 'it-stream-types'
Expand Down Expand Up @@ -41,7 +41,7 @@ class MockConnection implements Connection {
public status: ConnectionStatus
public streams: Stream[]
public tags: string[]
public transient: boolean
public limits?: ConnectionLimits
public log: Logger

private readonly muxer: StreamMuxer
Expand All @@ -64,7 +64,6 @@ class MockConnection implements Connection {
this.tags = []
this.muxer = muxer
this.maConn = maConn
this.transient = false
this.logger = logger
this.log = logger.forComponent(this.id)
}
Expand Down
6 changes: 4 additions & 2 deletions packages/interface-internal/src/registrar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ export interface StreamHandlerOptions {
/**
* If true, allow this protocol to run on limited connections (e.g.
* connections with data or duration limits such as circuit relay
* connections) (default: false)
* connections)
*
* @default false
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean
}

export interface StreamHandlerRecord {
Expand Down
42 changes: 34 additions & 8 deletions packages/interface/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,16 @@ export interface NewStreamOptions extends AbortOptions {
maxOutboundStreams?: number

/**
* Opt-in to running over a transient connection - one that has time/data limits
* placed on it.
* Opt-in to running over a limited connection - one that has restrictions
* on the amount of data that may be transferred or how long it may be open for.
*
* These limits are typically enforced by a relay server, if the protocol
* will be transferring a lot of data or the stream will be open for a long time
* consider upgrading to a direct connection before opening the stream.
*
* @default false
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean

/**
* By default when negotiating a protocol the dialer writes then protocol name
Expand Down Expand Up @@ -222,6 +226,29 @@ export interface NewStreamOptions extends AbortOptions {

export type ConnectionStatus = 'open' | 'closing' | 'closed'

/**
* Connection limits are present on connections that are only allowed to
* transfer a certain amount of bytes or be open for a certain number
* of seconds.
*
* These limits are applied by Circuit Relay v2 servers, for example and
* the connection will normally be closed abruptly if the limits are
* exceeded.
*/
export interface ConnectionLimits {
/**
* If present this is the number of bytes remaining that may be
* transferred over this connection
*/
bytes?: bigint

/**
* If present this is the number of seconds that this connection will
* remain open for
*/
seconds?: number
}

/**
* A Connection is a high-level representation of a connection
* to a remote peer that may have been secured by encryption and
Expand Down Expand Up @@ -280,12 +307,11 @@ export interface Connection {
status: ConnectionStatus

/**
* A transient connection is one that is not expected to be open for very long
* or one that cannot transfer very much data, such as one being used as a
* circuit relay connection. Protocols need to explicitly opt-in to being run
* over transient connections.
* If present, this connection has limits applied to it, perhaps by an
* intermediate relay. Once the limits have been reached the connection will
* be closed by the relay.
*/
transient: boolean
limits?: ConnectionLimits

/**
* The time in milliseconds it takes to make a round trip to the remote peer.
Expand Down
2 changes: 1 addition & 1 deletion packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ export interface IsDialableOptions extends AbortOptions {
* because that protocol would not be allowed to run over a data/time limited
* connection.
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean
}

export type TransportManagerDialProgressEvents =
Expand Down
6 changes: 3 additions & 3 deletions packages/interface/src/stream-handler/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ export interface StreamHandlerOptions {
maxOutboundStreams?: number

/**
* Opt-in to running over a transient connection - one that has time/data limits
* placed on it.
* Opt-in to running over connections with limits on how much data can be
* transferred or how long it can be open for.
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean
}

export interface StreamHandlerRecord {
Expand Down
Loading

0 comments on commit b0be85b

Please sign in to comment.