Skip to content

Commit

Permalink
fix!: rename "transient" connections to "limited"
Browse files Browse the repository at this point in the history
To better align with [[email protected]](https://github.com/libp2p/go-libp2p/releases/tag/v0.34.0)
rename "transient" connections to "limited".

BREAKING CHANGE: There are three breaking API changes:
  * Connections have an optional `.limits` property
  * The `runOnTransientConnection` property of `libp2p.handle` and `libp2p.dialProtocol` has been renamed to `runOnLimitedConnection`
  * The `notifyOnTransient` property of `libp2p.register` has been renamed `notifyOnLimitedConnection`

Refs: #2622
  • Loading branch information
achingbrain committed Jul 31, 2024
1 parent 944935f commit 692bdb4
Show file tree
Hide file tree
Showing 32 changed files with 152 additions and 105 deletions.
20 changes: 10 additions & 10 deletions packages/integration-tests/test/circuit-relay.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ const echoService = (components: EchoServiceComponents): unknown => {
stream, stream
)
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})
},
stop () {}
Expand Down Expand Up @@ -560,7 +560,7 @@ describe('circuit-relay', () => {
// open hop stream and try to connect to remote
const stream = await local.dialProtocol(ma, RELAY_V2_HOP_CODEC, {
runOnTransientConnection: true
runOnLimitedConnection: true
})
const hopStream = pbStream(stream).pb(HopMessage)
Expand Down Expand Up @@ -697,7 +697,7 @@ describe('circuit-relay', () => {
await remote.handle(protocol, ({ stream }) => {
void pipe(stream, stream)
}, {
runOnTransientConnection: false
runOnLimitedConnection: false
})

// discover relay and make reservation
Expand All @@ -712,7 +712,7 @@ describe('circuit-relay', () => {
expect(connection).to.have.property('transient', true)

await expect(connection.newStream('/my-protocol/1.0.0', {
runOnTransientConnection: false
runOnLimitedConnection: false
}))
.to.eventually.be.rejected.with.property('code', 'ERR_TRANSIENT_CONNECTION')
})
Expand All @@ -724,7 +724,7 @@ describe('circuit-relay', () => {
await remote.handle(protocol, ({ stream }) => {
void pipe(stream, stream)
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// discover relay and make reservation
Expand All @@ -739,7 +739,7 @@ describe('circuit-relay', () => {
expect(connection).to.have.property('transient', true)

await expect(connection.newStream('/my-protocol/1.0.0', {
runOnTransientConnection: true
runOnLimitedConnection: true
}))
.to.eventually.be.ok()
})
Expand Down Expand Up @@ -912,15 +912,15 @@ describe('circuit-relay', () => {
} catch {}
})
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// dial the remote from the local through the relay
const ma = getRelayAddress(remote)

try {
const stream = await local.dialProtocol(ma, protocol, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

await stream.sink(async function * () {
Expand Down Expand Up @@ -1056,7 +1056,7 @@ describe('circuit-relay', () => {
const ma = getRelayAddress(remote)

const stream = await local.dialProtocol(ma, ECHO_PROTOCOL, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

// write more than the default data limit
Expand All @@ -1075,7 +1075,7 @@ describe('circuit-relay', () => {
const ma = getRelayAddress(remote)

const stream = await local.dialProtocol(ma, ECHO_PROTOCOL, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

let finished = false
Expand Down
10 changes: 5 additions & 5 deletions packages/integration-tests/test/dcutr.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe('dcutr', () => {
async function waitForOnlyDirectConnections (): Promise<void> {
await pRetry(async () => {
const connections = libp2pA.getConnections(libp2pB.peerId)
const onlyDirect = connections.filter(conn => !conn.transient)
const onlyDirect = connections.filter(conn => !conn.limits)

if (onlyDirect.length === connections.length) {
// all connections are direct
Expand Down Expand Up @@ -109,8 +109,8 @@ describe('dcutr', () => {
const relayedAddress = multiaddr(`/ip4/127.0.0.1/tcp/${RELAY_PORT}/p2p/${relay.peerId}/p2p-circuit/p2p/${libp2pB.peerId}`)
const connection = await libp2pA.dial(relayedAddress)

// connection should be transient
expect(connection).to.have.property('transient', true)
// connection should be limited
expect(connection).to.have.property('limited', true)

// wait for DCUtR unilateral upgrade
await waitForOnlyDirectConnections()
Expand Down Expand Up @@ -166,8 +166,8 @@ describe('dcutr', () => {
const relayedAddress = multiaddr(`/ip4/127.0.0.1/tcp/${RELAY_PORT}/p2p/${relay.peerId}/p2p-circuit/p2p/${libp2pB.peerId}`)
const connection = await libp2pA.dial(relayedAddress)

// connection should be transient
expect(connection).to.have.property('transient', true)
// connection should be limited
expect(connection).to.have.property('limited', true)

// wait for DCUtR unilateral upgrade
await waitForOnlyDirectConnections()
Expand Down
2 changes: 1 addition & 1 deletion packages/integration-tests/test/ping.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ describe('ping', () => {
stream
)
}, {
runOnTransientConnection: true
runOnLimitedConnection: true
})

const latency = await nodes[0].services.ping.ping(nodes[1].getMultiaddrs())
Expand Down
5 changes: 2 additions & 3 deletions packages/interface-compliance-tests/src/mocks/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Uint8ArrayList } from 'uint8arraylist'
import { mockMultiaddrConnection } from './multiaddr-connection.js'
import { mockMuxer } from './muxer.js'
import { mockRegistrar } from './registrar.js'
import type { AbortOptions, ComponentLogger, Logger, MultiaddrConnection, Connection, Stream, Direction, ConnectionTimeline, ConnectionStatus, PeerId, StreamMuxer, StreamMuxerFactory, NewStreamOptions } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger, Logger, MultiaddrConnection, Connection, Stream, Direction, ConnectionTimeline, ConnectionStatus, PeerId, StreamMuxer, StreamMuxerFactory, NewStreamOptions, ConnectionLimits } from '@libp2p/interface'
import type { Registrar } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Duplex, Source } from 'it-stream-types'
Expand Down Expand Up @@ -41,7 +41,7 @@ class MockConnection implements Connection {
public status: ConnectionStatus
public streams: Stream[]
public tags: string[]
public transient: boolean
public limits?: ConnectionLimits
public log: Logger

private readonly muxer: StreamMuxer
Expand All @@ -64,7 +64,6 @@ class MockConnection implements Connection {
this.tags = []
this.muxer = muxer
this.maConn = maConn
this.transient = false
this.logger = logger
this.log = logger.forComponent(this.id)
}
Expand Down
6 changes: 4 additions & 2 deletions packages/interface-internal/src/registrar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ export interface StreamHandlerOptions {
/**
* If true, allow this protocol to run on limited connections (e.g.
* connections with data or duration limits such as circuit relay
* connections) (default: false)
* connections)
*
* @default false
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean
}

export interface StreamHandlerRecord {
Expand Down
24 changes: 16 additions & 8 deletions packages/interface/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,16 @@ export interface NewStreamOptions extends AbortOptions {
maxOutboundStreams?: number

/**
* Opt-in to running over a transient connection - one that has time/data limits
* placed on it.
* Opt-in to running over a limited connection - one that has restrictions
* on the amount of data that may be transferred or how long it may be open for.
*
* These limits are typically enforced by a relay server, if the protocol
* will be transferring a lot of data or the stream will be open for a long time
* consider upgrading to a direct connection before opening the stream.
*
* @default false
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean

/**
* By default when negotiating a protocol the dialer writes then protocol name
Expand Down Expand Up @@ -222,6 +226,11 @@ export interface NewStreamOptions extends AbortOptions {

export type ConnectionStatus = 'open' | 'closing' | 'closed'

export interface ConnectionLimits {
bytes?: number
seconds?: number
}

/**
* A Connection is a high-level representation of a connection
* to a remote peer that may have been secured by encryption and
Expand Down Expand Up @@ -280,12 +289,11 @@ export interface Connection {
status: ConnectionStatus

/**
* A transient connection is one that is not expected to be open for very long
* or one that cannot transfer very much data, such as one being used as a
* circuit relay connection. Protocols need to explicitly opt-in to being run
* over transient connections.
* If present, this connection has limits applied to it, perhaps by an
* intermediate relay. Once the limits have been reached the connection will
* be closed by the relay.
*/
transient: boolean
limits?: ConnectionLimits

/**
* Create a new stream on this connection and negotiate one of the passed protocols
Expand Down
2 changes: 1 addition & 1 deletion packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ export interface IsDialableOptions extends AbortOptions {
* because that protocol would not be allowed to run over a data/time limited
* connection.
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean
}

export type TransportManagerDialProgressEvents =
Expand Down
6 changes: 3 additions & 3 deletions packages/interface/src/stream-handler/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ export interface StreamHandlerOptions {
maxOutboundStreams?: number

/**
* Opt-in to running over a transient connection - one that has time/data limits
* placed on it.
* Opt-in to running over connections with limits on how much data can be
* transferred or how long it can be open for.
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean
}

export interface StreamHandlerRecord {
Expand Down
7 changes: 4 additions & 3 deletions packages/interface/src/topology/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ export interface Topology {
filter?: TopologyFilter

/**
* If true, invoke `onConnect` for this topology on transient (e.g. short-lived
* and/or data-limited) connections
* If true, invoke `onConnect` for this topology on limited connections, e.g.
* ones with limits on how much data can be transferred or how long they can
* be open for.
*
* @default false
*/
notifyOnTransient?: boolean
notifyOnLimitedConnection?: boolean

/**
* Invoked when a new connection is opened to a peer that supports the
Expand Down
9 changes: 2 additions & 7 deletions packages/interface/src/transport/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Connection, MultiaddrConnection } from '../connection/index.js'
import type { Connection, ConnectionLimits, MultiaddrConnection } from '../connection/index.js'
import type { TypedEventTarget } from '../event-target.js'
import type { AbortOptions } from '../index.js'
import type { StreamMuxerFactory } from '../stream-muxer/index.js'
Expand Down Expand Up @@ -104,12 +104,7 @@ export interface UpgraderOptions<ConnectionUpgradeEvents extends ProgressEvent =
skipEncryption?: boolean
skipProtection?: boolean
muxerFactory?: StreamMuxerFactory

/**
* The passed MultiaddrConnection has limits place on duration and/or data
* transfer amounts so is not expected to be open for very long.
*/
transient?: boolean
limits?: ConnectionLimits
}

export type InboundConnectionUpgradeEvents =
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ export class DialQueue {
try {
const addresses = await this.calculateMultiaddrs(undefined, new Set(multiaddr.map(ma => ma.toString())), options)

if (options.runOnTransientConnection === false) {
if (options.runOnLimitedConnection === false) {
// return true if any resolved multiaddrs are not relay addresses
return addresses.find(addr => {
return !Circuit.matches(addr.multiaddr)
Expand Down
4 changes: 2 additions & 2 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,10 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
if (peerId != null && options.force !== true) {
this.log('dial %p', peerId)
const existingConnection = this.getConnections(peerId)
.find(conn => !conn.transient)
.find(conn => conn.limits == null)

if (existingConnection != null) {
this.log('had an existing non-transient connection to %p', peerId)
this.log('had an existing non-limited connection to %p', peerId)

options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
return existingConnection
Expand Down
10 changes: 5 additions & 5 deletions packages/libp2p/src/connection/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { connectionSymbol, CodeError, setMaxListeners } from '@libp2p/interface'
import type { AbortOptions, Logger, ComponentLogger, Direction, Connection, Stream, ConnectionTimeline, ConnectionStatus, NewStreamOptions, PeerId } from '@libp2p/interface'
import type { AbortOptions, Logger, ComponentLogger, Direction, Connection, Stream, ConnectionTimeline, ConnectionStatus, NewStreamOptions, PeerId, ConnectionLimits } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'

const CLOSE_TIMEOUT = 500
Expand All @@ -16,7 +16,7 @@ interface ConnectionInit {
timeline: ConnectionTimeline
multiplexer?: string
encryption?: string
transient?: boolean
limits?: ConnectionLimits
logger: ComponentLogger
}

Expand Down Expand Up @@ -45,7 +45,7 @@ export class ConnectionImpl implements Connection {
public multiplexer?: string
public encryption?: string
public status: ConnectionStatus
public transient: boolean
public limits?: ConnectionLimits
public readonly log: Logger

/**
Expand Down Expand Up @@ -86,7 +86,7 @@ export class ConnectionImpl implements Connection {
this.timeline = init.timeline
this.multiplexer = init.multiplexer
this.encryption = init.encryption
this.transient = init.transient ?? false
this.limits = init.limits
this.log = init.logger.forComponent(`libp2p:connection:${this.direction}:${this.id}`)

if (this.remoteAddr.getPeerId() == null) {
Expand Down Expand Up @@ -127,7 +127,7 @@ export class ConnectionImpl implements Connection {
protocols = [protocols]
}

if (this.transient && options?.runOnTransientConnection !== true) {
if (this.limits != null && options?.runOnLimitedConnection !== true) {
throw new CodeError('Cannot open protocol stream on transient connection', 'ERR_TRANSIENT_CONNECTION')
}

Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/src/registrar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ export class DefaultRegistrar implements Registrar {
}

for (const topology of topologies.values()) {
if (connection.transient && topology.notifyOnTransient !== true) {
if (connection.limits != null && topology.notifyOnLimitedConnection !== true) {
continue
}

Expand Down
Loading

0 comments on commit 692bdb4

Please sign in to comment.