diff --git a/packages/integration-tests/test/circuit-relay.node.ts b/packages/integration-tests/test/circuit-relay.node.ts index bdfcf6fba1..327732e40c 100644 --- a/packages/integration-tests/test/circuit-relay.node.ts +++ b/packages/integration-tests/test/circuit-relay.node.ts @@ -90,7 +90,7 @@ const echoService = (components: EchoServiceComponents): unknown => { stream, stream ) }, { - runOnTransientConnection: true + runOnLimitedConnection: true }) }, stop () {} @@ -560,7 +560,7 @@ describe('circuit-relay', () => { // open hop stream and try to connect to remote const stream = await local.dialProtocol(ma, RELAY_V2_HOP_CODEC, { - runOnTransientConnection: true + runOnLimitedConnection: true }) const hopStream = pbStream(stream).pb(HopMessage) @@ -697,7 +697,7 @@ describe('circuit-relay', () => { await remote.handle(protocol, ({ stream }) => { void pipe(stream, stream) }, { - runOnTransientConnection: false + runOnLimitedConnection: false }) // discover relay and make reservation @@ -712,7 +712,7 @@ describe('circuit-relay', () => { expect(connection).to.have.property('transient', true) await expect(connection.newStream('/my-protocol/1.0.0', { - runOnTransientConnection: false + runOnLimitedConnection: false })) .to.eventually.be.rejected.with.property('code', 'ERR_TRANSIENT_CONNECTION') }) @@ -724,7 +724,7 @@ describe('circuit-relay', () => { await remote.handle(protocol, ({ stream }) => { void pipe(stream, stream) }, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // discover relay and make reservation @@ -739,7 +739,7 @@ describe('circuit-relay', () => { expect(connection).to.have.property('transient', true) await expect(connection.newStream('/my-protocol/1.0.0', { - runOnTransientConnection: true + runOnLimitedConnection: true })) .to.eventually.be.ok() }) @@ -912,7 +912,7 @@ describe('circuit-relay', () => { } catch {} }) }, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // dial the remote from the local through the relay @@ -920,7 +920,7 @@ describe('circuit-relay', () => { try { const stream = await local.dialProtocol(ma, protocol, { - runOnTransientConnection: true + runOnLimitedConnection: true }) await stream.sink(async function * () { @@ -1056,7 +1056,7 @@ describe('circuit-relay', () => { const ma = getRelayAddress(remote) const stream = await local.dialProtocol(ma, ECHO_PROTOCOL, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // write more than the default data limit @@ -1075,7 +1075,7 @@ describe('circuit-relay', () => { const ma = getRelayAddress(remote) const stream = await local.dialProtocol(ma, ECHO_PROTOCOL, { - runOnTransientConnection: true + runOnLimitedConnection: true }) let finished = false diff --git a/packages/integration-tests/test/dcutr.node.ts b/packages/integration-tests/test/dcutr.node.ts index c6c839a2a4..846b333f1f 100644 --- a/packages/integration-tests/test/dcutr.node.ts +++ b/packages/integration-tests/test/dcutr.node.ts @@ -24,7 +24,7 @@ describe('dcutr', () => { async function waitForOnlyDirectConnections (): Promise { await pRetry(async () => { const connections = libp2pA.getConnections(libp2pB.peerId) - const onlyDirect = connections.filter(conn => !conn.transient) + const onlyDirect = connections.filter(conn => !conn.limits) if (onlyDirect.length === connections.length) { // all connections are direct @@ -109,8 +109,8 @@ describe('dcutr', () => { const relayedAddress = multiaddr(`/ip4/127.0.0.1/tcp/${RELAY_PORT}/p2p/${relay.peerId}/p2p-circuit/p2p/${libp2pB.peerId}`) const connection = await libp2pA.dial(relayedAddress) - // connection should be transient - expect(connection).to.have.property('transient', true) + // connection should be limited + expect(connection).to.have.property('limited', true) // wait for DCUtR unilateral upgrade await waitForOnlyDirectConnections() @@ -166,8 +166,8 @@ describe('dcutr', () => { const relayedAddress = multiaddr(`/ip4/127.0.0.1/tcp/${RELAY_PORT}/p2p/${relay.peerId}/p2p-circuit/p2p/${libp2pB.peerId}`) const connection = await libp2pA.dial(relayedAddress) - // connection should be transient - expect(connection).to.have.property('transient', true) + // connection should be limited + expect(connection).to.have.property('limited', true) // wait for DCUtR unilateral upgrade await waitForOnlyDirectConnections() diff --git a/packages/integration-tests/test/ping.spec.ts b/packages/integration-tests/test/ping.spec.ts index 90e5450cfe..bff84fd143 100644 --- a/packages/integration-tests/test/ping.spec.ts +++ b/packages/integration-tests/test/ping.spec.ts @@ -76,7 +76,7 @@ describe('ping', () => { stream ) }, { - runOnTransientConnection: true + runOnLimitedConnection: true }) const latency = await nodes[0].services.ping.ping(nodes[1].getMultiaddrs()) diff --git a/packages/interface-compliance-tests/src/mocks/connection.ts b/packages/interface-compliance-tests/src/mocks/connection.ts index 8dd0fddaee..fdb605bc4a 100644 --- a/packages/interface-compliance-tests/src/mocks/connection.ts +++ b/packages/interface-compliance-tests/src/mocks/connection.ts @@ -9,7 +9,7 @@ import { Uint8ArrayList } from 'uint8arraylist' import { mockMultiaddrConnection } from './multiaddr-connection.js' import { mockMuxer } from './muxer.js' import { mockRegistrar } from './registrar.js' -import type { AbortOptions, ComponentLogger, Logger, MultiaddrConnection, Connection, Stream, Direction, ConnectionTimeline, ConnectionStatus, PeerId, StreamMuxer, StreamMuxerFactory, NewStreamOptions } from '@libp2p/interface' +import type { AbortOptions, ComponentLogger, Logger, MultiaddrConnection, Connection, Stream, Direction, ConnectionTimeline, ConnectionStatus, PeerId, StreamMuxer, StreamMuxerFactory, NewStreamOptions, ConnectionLimits } from '@libp2p/interface' import type { Registrar } from '@libp2p/interface-internal' import type { Multiaddr } from '@multiformats/multiaddr' import type { Duplex, Source } from 'it-stream-types' @@ -41,7 +41,7 @@ class MockConnection implements Connection { public status: ConnectionStatus public streams: Stream[] public tags: string[] - public transient: boolean + public limits?: ConnectionLimits public log: Logger private readonly muxer: StreamMuxer @@ -64,7 +64,6 @@ class MockConnection implements Connection { this.tags = [] this.muxer = muxer this.maConn = maConn - this.transient = false this.logger = logger this.log = logger.forComponent(this.id) } diff --git a/packages/interface-internal/src/registrar/index.ts b/packages/interface-internal/src/registrar/index.ts index 086bf22323..edddb9daf3 100644 --- a/packages/interface-internal/src/registrar/index.ts +++ b/packages/interface-internal/src/registrar/index.ts @@ -30,9 +30,11 @@ export interface StreamHandlerOptions { /** * If true, allow this protocol to run on limited connections (e.g. * connections with data or duration limits such as circuit relay - * connections) (default: false) + * connections) + * + * @default false */ - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean } export interface StreamHandlerRecord { diff --git a/packages/interface/src/connection/index.ts b/packages/interface/src/connection/index.ts index 9379eab041..c730c0d65c 100644 --- a/packages/interface/src/connection/index.ts +++ b/packages/interface/src/connection/index.ts @@ -185,12 +185,16 @@ export interface NewStreamOptions extends AbortOptions { maxOutboundStreams?: number /** - * Opt-in to running over a transient connection - one that has time/data limits - * placed on it. + * Opt-in to running over a limited connection - one that has restrictions + * on the amount of data that may be transferred or how long it may be open for. + * + * These limits are typically enforced by a relay server, if the protocol + * will be transferring a lot of data or the stream will be open for a long time + * consider upgrading to a direct connection before opening the stream. * * @default false */ - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean /** * By default when negotiating a protocol the dialer writes then protocol name @@ -222,6 +226,11 @@ export interface NewStreamOptions extends AbortOptions { export type ConnectionStatus = 'open' | 'closing' | 'closed' +export interface ConnectionLimits { + bytes?: number + seconds?: number +} + /** * A Connection is a high-level representation of a connection * to a remote peer that may have been secured by encryption and @@ -280,12 +289,11 @@ export interface Connection { status: ConnectionStatus /** - * A transient connection is one that is not expected to be open for very long - * or one that cannot transfer very much data, such as one being used as a - * circuit relay connection. Protocols need to explicitly opt-in to being run - * over transient connections. + * If present, this connection has limits applied to it, perhaps by an + * intermediate relay. Once the limits have been reached the connection will + * be closed by the relay. */ - transient: boolean + limits?: ConnectionLimits /** * Create a new stream on this connection and negotiate one of the passed protocols diff --git a/packages/interface/src/index.ts b/packages/interface/src/index.ts index ca48df06dd..cd81122d82 100644 --- a/packages/interface/src/index.ts +++ b/packages/interface/src/index.ts @@ -331,7 +331,7 @@ export interface IsDialableOptions extends AbortOptions { * because that protocol would not be allowed to run over a data/time limited * connection. */ - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean } export type TransportManagerDialProgressEvents = diff --git a/packages/interface/src/stream-handler/index.ts b/packages/interface/src/stream-handler/index.ts index e4ca91c6b3..df409d5973 100644 --- a/packages/interface/src/stream-handler/index.ts +++ b/packages/interface/src/stream-handler/index.ts @@ -21,10 +21,10 @@ export interface StreamHandlerOptions { maxOutboundStreams?: number /** - * Opt-in to running over a transient connection - one that has time/data limits - * placed on it. + * Opt-in to running over connections with limits on how much data can be + * transferred or how long it can be open for. */ - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean } export interface StreamHandlerRecord { diff --git a/packages/interface/src/topology/index.ts b/packages/interface/src/topology/index.ts index 1df12a601e..ada83dc7a0 100644 --- a/packages/interface/src/topology/index.ts +++ b/packages/interface/src/topology/index.ts @@ -27,12 +27,13 @@ export interface Topology { filter?: TopologyFilter /** - * If true, invoke `onConnect` for this topology on transient (e.g. short-lived - * and/or data-limited) connections + * If true, invoke `onConnect` for this topology on limited connections, e.g. + * ones with limits on how much data can be transferred or how long they can + * be open for. * * @default false */ - notifyOnTransient?: boolean + notifyOnLimitedConnection?: boolean /** * Invoked when a new connection is opened to a peer that supports the diff --git a/packages/interface/src/transport/index.ts b/packages/interface/src/transport/index.ts index 5d7d13a62d..1cb8ab6ce9 100644 --- a/packages/interface/src/transport/index.ts +++ b/packages/interface/src/transport/index.ts @@ -1,4 +1,4 @@ -import type { Connection, MultiaddrConnection } from '../connection/index.js' +import type { Connection, ConnectionLimits, MultiaddrConnection } from '../connection/index.js' import type { TypedEventTarget } from '../event-target.js' import type { AbortOptions } from '../index.js' import type { StreamMuxerFactory } from '../stream-muxer/index.js' @@ -104,12 +104,7 @@ export interface UpgraderOptions ma.toString())), options) - if (options.runOnTransientConnection === false) { + if (options.runOnLimitedConnection === false) { // return true if any resolved multiaddrs are not relay addresses return addresses.find(addr => { return !Circuit.matches(addr.multiaddr) diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index d8c441fc9d..85a34fd748 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -505,10 +505,10 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { if (peerId != null && options.force !== true) { this.log('dial %p', peerId) const existingConnection = this.getConnections(peerId) - .find(conn => !conn.transient) + .find(conn => conn.limits == null) if (existingConnection != null) { - this.log('had an existing non-transient connection to %p', peerId) + this.log('had an existing non-limited connection to %p', peerId) options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected')) return existingConnection diff --git a/packages/libp2p/src/connection/index.ts b/packages/libp2p/src/connection/index.ts index f8cc0a7c23..be504a733c 100644 --- a/packages/libp2p/src/connection/index.ts +++ b/packages/libp2p/src/connection/index.ts @@ -1,5 +1,5 @@ import { connectionSymbol, CodeError, setMaxListeners } from '@libp2p/interface' -import type { AbortOptions, Logger, ComponentLogger, Direction, Connection, Stream, ConnectionTimeline, ConnectionStatus, NewStreamOptions, PeerId } from '@libp2p/interface' +import type { AbortOptions, Logger, ComponentLogger, Direction, Connection, Stream, ConnectionTimeline, ConnectionStatus, NewStreamOptions, PeerId, ConnectionLimits } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' const CLOSE_TIMEOUT = 500 @@ -16,7 +16,7 @@ interface ConnectionInit { timeline: ConnectionTimeline multiplexer?: string encryption?: string - transient?: boolean + limits?: ConnectionLimits logger: ComponentLogger } @@ -45,7 +45,7 @@ export class ConnectionImpl implements Connection { public multiplexer?: string public encryption?: string public status: ConnectionStatus - public transient: boolean + public limits?: ConnectionLimits public readonly log: Logger /** @@ -86,7 +86,7 @@ export class ConnectionImpl implements Connection { this.timeline = init.timeline this.multiplexer = init.multiplexer this.encryption = init.encryption - this.transient = init.transient ?? false + this.limits = init.limits this.log = init.logger.forComponent(`libp2p:connection:${this.direction}:${this.id}`) if (this.remoteAddr.getPeerId() == null) { @@ -127,7 +127,7 @@ export class ConnectionImpl implements Connection { protocols = [protocols] } - if (this.transient && options?.runOnTransientConnection !== true) { + if (this.limits != null && options?.runOnLimitedConnection !== true) { throw new CodeError('Cannot open protocol stream on transient connection', 'ERR_TRANSIENT_CONNECTION') } diff --git a/packages/libp2p/src/registrar.ts b/packages/libp2p/src/registrar.ts index fe9317fffb..c775b14c95 100644 --- a/packages/libp2p/src/registrar.ts +++ b/packages/libp2p/src/registrar.ts @@ -230,7 +230,7 @@ export class DefaultRegistrar implements Registrar { } for (const topology of topologies.values()) { - if (connection.transient && topology.notifyOnTransient !== true) { + if (connection.limits != null && topology.notifyOnLimitedConnection !== true) { continue } diff --git a/packages/libp2p/src/upgrader.ts b/packages/libp2p/src/upgrader.ts index 0c3068ff67..2b2d1f0806 100644 --- a/packages/libp2p/src/upgrader.ts +++ b/packages/libp2p/src/upgrader.ts @@ -6,7 +6,7 @@ import { createConnection } from './connection/index.js' import { INBOUND_UPGRADE_TIMEOUT } from './connection-manager/constants.js' import { codes } from './errors.js' import { DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS } from './registrar.js' -import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions } from '@libp2p/interface' +import type { Libp2pEvents, AbortOptions, ComponentLogger, MultiaddrConnection, Connection, Stream, ConnectionProtector, NewStreamOptions, ConnectionEncrypter, SecuredConnection, ConnectionGater, TypedEventTarget, Metrics, PeerId, PeerStore, StreamMuxer, StreamMuxerFactory, Upgrader, UpgraderOptions, ConnectionLimits } from '@libp2p/interface' import type { ConnectionManager, Registrar } from '@libp2p/interface-internal' const DEFAULT_PROTOCOL_SELECT_TIMEOUT = 30000 @@ -18,7 +18,7 @@ interface CreateConnectionOptions { upgradedConn: MultiaddrConnection remotePeer: PeerId muxerFactory?: StreamMuxerFactory - transient?: boolean + limits?: ConnectionLimits } interface OnStreamOptions { @@ -243,7 +243,7 @@ export class DefaultUpgrader implements Upgrader { upgradedConn, muxerFactory, remotePeer, - transient: opts?.transient + limits: opts?.limits }) } finally { signal.removeEventListener('abort', onAbort) @@ -342,7 +342,7 @@ export class DefaultUpgrader implements Upgrader { upgradedConn, muxerFactory, remotePeer, - transient: opts?.transient + limits: opts?.limits }) } @@ -357,7 +357,7 @@ export class DefaultUpgrader implements Upgrader { upgradedConn, remotePeer, muxerFactory, - transient + limits } = opts let muxer: StreamMuxer | undefined @@ -578,7 +578,7 @@ export class DefaultUpgrader implements Upgrader { timeline: maConn.timeline, multiplexer: muxer?.protocol, encryption: cryptoProtocol, - transient, + limits, logger: this.components.logger, newStream: newStream ?? errConnectionNotMultiplexed, getStreams: () => { if (muxer != null) { return muxer.streams } else { return [] } }, @@ -617,7 +617,7 @@ export class DefaultUpgrader implements Upgrader { const { connection, stream, protocol } = opts const { handler, options } = this.components.registrar.getHandler(protocol) - if (connection.transient && options.runOnTransientConnection !== true) { + if (connection.limits != null && options.runOnLimitedConnection !== true) { throw new CodeError('Cannot open protocol stream on transient connection', 'ERR_TRANSIENT_CONNECTION') } diff --git a/packages/libp2p/test/connection-manager/index.spec.ts b/packages/libp2p/test/connection-manager/index.spec.ts index 5dce1b63e3..d61440224d 100644 --- a/packages/libp2p/test/connection-manager/index.spec.ts +++ b/packages/libp2p/test/connection-manager/index.spec.ts @@ -527,7 +527,9 @@ describe('Connection Manager', () => { const addr = multiaddr(`/ip4/123.123.123.123/tcp/123/p2p/${targetPeer}`) const existingConnection = stubInterface({ - transient: true + limits: { + bytes: 100 + } }) const newConnection = stubInterface() diff --git a/packages/libp2p/test/core/core.spec.ts b/packages/libp2p/test/core/core.spec.ts index 3c45a33431..6c0f646752 100644 --- a/packages/libp2p/test/core/core.spec.ts +++ b/packages/libp2p/test/core/core.spec.ts @@ -57,15 +57,15 @@ describe('core', () => { }) await expect(libp2p.isDialable(multiaddr('/dns4/example.com/tls/ws'), { - runOnTransientConnection: false + runOnLimitedConnection: false })).to.eventually.be.true() await expect(libp2p.isDialable(multiaddr('/dns4/example.com/tls/ws/p2p/12D3KooWSExt8hTzoaHEhn435BTK6BPNSY1LpTc1j2o9Gw53tXE1/p2p-circuit/p2p/12D3KooWSExt8hTzoaHEhn435BTK6BPNSY1LpTc1j2o9Gw53tXE2'), { - runOnTransientConnection: true + runOnLimitedConnection: true })).to.eventually.be.true() await expect(libp2p.isDialable(multiaddr('/dns4/example.com/tls/ws/p2p/12D3KooWSExt8hTzoaHEhn435BTK6BPNSY1LpTc1j2o9Gw53tXE1/p2p-circuit/p2p/12D3KooWSExt8hTzoaHEhn435BTK6BPNSY1LpTc1j2o9Gw53tXE2'), { - runOnTransientConnection: false + runOnLimitedConnection: false })).to.eventually.be.false() }) }) diff --git a/packages/libp2p/test/registrar/registrar.spec.ts b/packages/libp2p/test/registrar/registrar.spec.ts index 3e0671d540..e6c414650c 100644 --- a/packages/libp2p/test/registrar/registrar.spec.ts +++ b/packages/libp2p/test/registrar/registrar.spec.ts @@ -221,7 +221,9 @@ describe('registrar topologies', () => { const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) // connection is transient - conn.transient = true + conn.limits = { + bytes: 100 + } // return connection from connection manager connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) @@ -265,14 +267,16 @@ describe('registrar topologies', () => { const remotePeerId = await createEd25519PeerId() const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - // connection is transient - conn.transient = true + // connection is limited + conn.limits = { + bytes: 100 + } // return connection from connection manager connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) const topology: Topology = { - notifyOnTransient: true, + notifyOnLimitedConnection: true, onConnect: () => { onConnectDefer.resolve() } @@ -298,7 +302,7 @@ describe('registrar topologies', () => { let callCount = 0 const topology: Topology = { - notifyOnTransient: true, + notifyOnLimitedConnection: true, onConnect: () => { callCount++ @@ -314,10 +318,14 @@ describe('registrar topologies', () => { // setup connections before registrar const remotePeerId = await createEd25519PeerId() const transientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - transientConnection.transient = true + transientConnection.limits = { + bytes: 100 + } const nonTransientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - nonTransientConnection.transient = false + nonTransientConnection.limits = { + bytes: 100 + } // return connection from connection manager connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([ diff --git a/packages/protocol-dcutr/src/dcutr.ts b/packages/protocol-dcutr/src/dcutr.ts index 2ebfffbc0c..cd3b02926c 100644 --- a/packages/protocol-dcutr/src/dcutr.ts +++ b/packages/protocol-dcutr/src/dcutr.ts @@ -8,6 +8,7 @@ import { multicodec } from './index.js' import type { DCUtRServiceComponents, DCUtRServiceInit } from './index.js' import type { Logger, Connection, Stream, PeerStore, Startable } from '@libp2p/interface' import type { AddressManager, ConnectionManager, Registrar, TransportManager } from '@libp2p/interface-internal' +import { Circuit } from '@multiformats/multiaddr-matcher' // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#rpc-messages const MAX_DCUTR_MESSAGE_SIZE = 1024 * 4 @@ -70,10 +71,10 @@ export class DefaultDCUtRService implements Startable { // register for notifications of when peers that support DCUtR connect // nb. requires the identify service to be enabled this.topologyId = await this.registrar.register(multicodec, { - notifyOnTransient: true, + notifyOnLimitedConnection: true, onConnect: (peerId, connection) => { - if (!connection.transient) { - // the connection is already direct, no upgrade is required + if (!connection.limits != null) { + // the connection has no limits, no upgrade is required return } @@ -97,7 +98,7 @@ export class DefaultDCUtRService implements Startable { }, { maxInboundStreams: this.maxInboundStreams, maxOutboundStreams: this.maxOutboundStreams, - runOnTransientConnection: true + runOnLimitedConnection: true }) this.started = true @@ -140,7 +141,7 @@ export class DefaultDCUtRService implements Startable { // 1. B opens a stream to A using the /libp2p/dcutr protocol. stream = await relayedConnection.newStream([multicodec], { signal: options.signal, - runOnTransientConnection: true + runOnLimitedConnection: true }) const pb = pbStream(stream, { @@ -256,7 +257,7 @@ export class DefaultDCUtRService implements Startable { force: true }) - if (connection.transient) { + if (connection.limits != null) { throw new Error('Could not open a new, non-transient, connection') } diff --git a/packages/protocol-identify/src/identify-push.ts b/packages/protocol-identify/src/identify-push.ts index b220170576..a966557fe9 100644 --- a/packages/protocol-identify/src/identify-push.ts +++ b/packages/protocol-identify/src/identify-push.ts @@ -82,7 +82,7 @@ export class IdentifyPush extends AbstractIdentify implements Startable, Identif try { stream = await connection.newStream(self.protocol, { signal, - runOnTransientConnection: self.runOnTransientConnection + runOnLimitedConnection: self.runOnLimitedConnection }) const pb = pbStream(stream, { diff --git a/packages/protocol-identify/src/identify.ts b/packages/protocol-identify/src/identify.ts index 1f72534410..a031d2ac1d 100644 --- a/packages/protocol-identify/src/identify.ts +++ b/packages/protocol-identify/src/identify.ts @@ -53,7 +53,7 @@ export class Identify extends AbstractIdentify implements Startable, IdentifyInt try { stream = await connection.newStream(this.protocol, { ...options, - runOnTransientConnection: this.runOnTransientConnection + runOnLimitedConnection: this.runOnLimitedConnection }) const pb = pbStream(stream, { diff --git a/packages/protocol-identify/src/index.ts b/packages/protocol-identify/src/index.ts index 3fd8936339..a7ebc633e0 100644 --- a/packages/protocol-identify/src/index.ts +++ b/packages/protocol-identify/src/index.ts @@ -99,7 +99,7 @@ export interface IdentifyInit { * * @default true */ - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean /** * Whether to automatically run identify on newly opened connections diff --git a/packages/protocol-identify/src/utils.ts b/packages/protocol-identify/src/utils.ts index 5627eebfe0..1f436b3da6 100644 --- a/packages/protocol-identify/src/utils.ts +++ b/packages/protocol-identify/src/utils.ts @@ -19,7 +19,7 @@ export const defaultValues = { maxMessageSize: MAX_IDENTIFY_MESSAGE_SIZE, runOnConnectionOpen: true, runOnSelfUpdate: true, - runOnTransientConnection: true, + runOnLimitedConnection: true, concurrency: MAX_PUSH_CONCURRENCY } @@ -207,7 +207,7 @@ export abstract class AbstractIdentify implements Startable { protected readonly maxMessageSize: number protected readonly maxObservedAddresses: number protected readonly events: TypedEventTarget - protected readonly runOnTransientConnection: boolean + protected readonly runOnLimitedConnection: boolean protected readonly log: Logger constructor (components: IdentifyComponents, init: AbstractIdentifyInit) { @@ -225,7 +225,7 @@ export abstract class AbstractIdentify implements Startable { this.maxOutboundStreams = init.maxOutboundStreams ?? defaultValues.maxOutboundStreams this.maxMessageSize = init.maxMessageSize ?? defaultValues.maxMessageSize this.maxObservedAddresses = init.maxObservedAddresses ?? defaultValues.maxObservedAddresses - this.runOnTransientConnection = init.runOnTransientConnection ?? defaultValues.runOnTransientConnection + this.runOnLimitedConnection = init.runOnLimitedConnection ?? defaultValues.runOnLimitedConnection // Store self host metadata this.host = { @@ -257,7 +257,7 @@ export abstract class AbstractIdentify implements Startable { }, { maxInboundStreams: this.maxInboundStreams, maxOutboundStreams: this.maxOutboundStreams, - runOnTransientConnection: this.runOnTransientConnection + runOnLimitedConnection: this.runOnLimitedConnection }) this.started = true diff --git a/packages/protocol-perf/src/index.ts b/packages/protocol-perf/src/index.ts index 556986562f..f672831b6f 100644 --- a/packages/protocol-perf/src/index.ts +++ b/packages/protocol-perf/src/index.ts @@ -83,7 +83,7 @@ export interface PerfInit { protocolName?: string maxInboundStreams?: number maxOutboundStreams?: number - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean /** * Data sent/received will be sent in chunks of this size (default: 64KiB) diff --git a/packages/protocol-perf/src/perf-service.ts b/packages/protocol-perf/src/perf-service.ts index 9dd64309a9..2d03981945 100644 --- a/packages/protocol-perf/src/perf-service.ts +++ b/packages/protocol-perf/src/perf-service.ts @@ -14,7 +14,7 @@ export class Perf implements Startable, PerfInterface { private readonly writeBlockSize: number private readonly maxInboundStreams: number private readonly maxOutboundStreams: number - private readonly runOnTransientConnection: boolean + private readonly runOnLimitedConnection: boolean constructor (components: PerfComponents, init: PerfInit = {}) { this.components = components @@ -25,7 +25,7 @@ export class Perf implements Startable, PerfInterface { this.databuf = new ArrayBuffer(this.writeBlockSize) this.maxInboundStreams = init.maxInboundStreams ?? MAX_INBOUND_STREAMS this.maxOutboundStreams = init.maxOutboundStreams ?? MAX_OUTBOUND_STREAMS - this.runOnTransientConnection = init.runOnTransientConnection ?? RUN_ON_TRANSIENT_CONNECTION + this.runOnLimitedConnection = init.runOnLimitedConnection ?? RUN_ON_TRANSIENT_CONNECTION } readonly [Symbol.toStringTag] = '@libp2p/perf' @@ -38,7 +38,7 @@ export class Perf implements Startable, PerfInterface { }, { maxInboundStreams: this.maxInboundStreams, maxOutboundStreams: this.maxOutboundStreams, - runOnTransientConnection: this.runOnTransientConnection + runOnLimitedConnection: this.runOnLimitedConnection }) this.started = true } diff --git a/packages/protocol-ping/src/index.ts b/packages/protocol-ping/src/index.ts index de0c1751c5..61a2c06576 100644 --- a/packages/protocol-ping/src/index.ts +++ b/packages/protocol-ping/src/index.ts @@ -35,7 +35,7 @@ export interface PingServiceInit { protocolPrefix?: string maxInboundStreams?: number maxOutboundStreams?: number - runOnTransientConnection?: boolean + runOnLimitedConnection?: boolean /** * How long we should wait for a ping response diff --git a/packages/protocol-ping/src/ping.ts b/packages/protocol-ping/src/ping.ts index a05ae490fa..1b4e8d0acd 100644 --- a/packages/protocol-ping/src/ping.ts +++ b/packages/protocol-ping/src/ping.ts @@ -16,7 +16,7 @@ export class PingService implements Startable, PingServiceInterface { private readonly timeout: number private readonly maxInboundStreams: number private readonly maxOutboundStreams: number - private readonly runOnTransientConnection: boolean + private readonly runOnLimitedConnection: boolean private readonly log: Logger constructor (components: PingServiceComponents, init: PingServiceInit = {}) { @@ -27,7 +27,7 @@ export class PingService implements Startable, PingServiceInterface { this.timeout = init.timeout ?? TIMEOUT this.maxInboundStreams = init.maxInboundStreams ?? MAX_INBOUND_STREAMS this.maxOutboundStreams = init.maxOutboundStreams ?? MAX_OUTBOUND_STREAMS - this.runOnTransientConnection = init.runOnTransientConnection ?? true + this.runOnLimitedConnection = init.runOnLimitedConnection ?? true this.handleMessage = this.handleMessage.bind(this) } @@ -38,7 +38,7 @@ export class PingService implements Startable, PingServiceInterface { await this.components.registrar.handle(this.protocol, this.handleMessage, { maxInboundStreams: this.maxInboundStreams, maxOutboundStreams: this.maxOutboundStreams, - runOnTransientConnection: this.runOnTransientConnection + runOnLimitedConnection: this.runOnLimitedConnection }) this.started = true } @@ -96,7 +96,7 @@ export class PingService implements Startable, PingServiceInterface { try { stream = await connection.newStream(this.protocol, { ...options, - runOnTransientConnection: this.runOnTransientConnection + runOnLimitedConnection: this.runOnLimitedConnection }) onAbort = () => { diff --git a/packages/transport-circuit-relay-v2/src/server/index.ts b/packages/transport-circuit-relay-v2/src/server/index.ts index eac1cfb2ab..f31cef78ec 100644 --- a/packages/transport-circuit-relay-v2/src/server/index.ts +++ b/packages/transport-circuit-relay-v2/src/server/index.ts @@ -144,7 +144,7 @@ class CircuitRelayServer extends TypedEventEmitter implements }, { maxInboundStreams: this.maxInboundHopStreams, maxOutboundStreams: this.maxOutboundHopStreams, - runOnTransientConnection: true + runOnLimitedConnection: true }) this.reservationStore.start() @@ -383,7 +383,7 @@ class CircuitRelayServer extends TypedEventEmitter implements this.log('starting circuit relay v2 stop request to %s', connection.remotePeer) const stream = await connection.newStream([RELAY_V2_STOP_CODEC], { maxOutboundStreams: this.maxOutboundStopStreams, - runOnTransientConnection: true + runOnLimitedConnection: true }) const pbstr = pbStream(stream) const stopstr = pbstr.pb(StopMessage) diff --git a/packages/transport-circuit-relay-v2/src/transport/transport.ts b/packages/transport-circuit-relay-v2/src/transport/transport.ts index c9d0e711c9..c8d9081a11 100644 --- a/packages/transport-circuit-relay-v2/src/transport/transport.ts +++ b/packages/transport-circuit-relay-v2/src/transport/transport.ts @@ -149,7 +149,7 @@ export class CircuitRelayTransport implements Transport }, { maxInboundStreams: this.maxInboundStopStreams, maxOutboundStreams: this.maxOutboundStopStreams, - runOnTransientConnection: true + runOnLimitedConnection: true }) await start(this.discovery, this.reservationStore) @@ -269,8 +269,24 @@ export class CircuitRelayTransport implements Transport }) this.log('new outbound relayed connection %a', maConn.remoteAddr) + + let expires = Date.now() + (status.limit?.duration ?? Infinity) * 1000 + + if (status.limit?.duration === 0) { + expires = Infinity + } + return await this.upgrader.upgradeOutbound(maConn, { - transient: status.limit != null, + limits: { + get bytes (): number { + status.limit != null + + return 5 + }, + get seconds (): number { + return (Date.now() - expires) / 1000 + } + }, onProgress }) } catch (err: any) { @@ -384,9 +400,24 @@ export class CircuitRelayTransport implements Transport logger: this.logger }) + let expires = Date.now() + (request.limit.duration ?? Infinity) * 1000 + + if (request.limit.duration === 0) { + expires = Infinity + } + this.log('new inbound relayed connection %a', maConn.remoteAddr) await this.upgrader.upgradeInbound(maConn, { - transient: request.limit != null + limits: { + get bytes (): number { + request.limit != null + + return 5 + }, + get seconds (): number { + return (Date.now() - expires) / 1000 + } + } }) this.log('%s connection %a upgraded', 'inbound', maConn.remoteAddr) } diff --git a/packages/transport-webrtc/src/private-to-private/initiate-connection.ts b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts index a7bc6220b2..167b1f5110 100644 --- a/packages/transport-webrtc/src/private-to-private/initiate-connection.ts +++ b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts @@ -72,7 +72,7 @@ export async function initiateConnection ({ rtcConfiguration, dataChannel, signa const stream = await connection.newStream(SIGNALING_PROTO_ID, { signal, - runOnTransientConnection: true + runOnLimitedConnection: true }) const messageStream = pbStream(stream).pb(Message) diff --git a/packages/transport-webrtc/src/private-to-private/transport.ts b/packages/transport-webrtc/src/private-to-private/transport.ts index 0744f8d166..b12095b48e 100644 --- a/packages/transport-webrtc/src/private-to-private/transport.ts +++ b/packages/transport-webrtc/src/private-to-private/transport.ts @@ -108,7 +108,7 @@ export class WebRTCTransport implements Transport, Startable { await this.components.registrar.handle(SIGNALING_PROTO_ID, (data: IncomingStreamData) => { this._onProtocol(data).catch(err => { this.log.error('failed to handle incoming connect from %p', data.connection.remotePeer, err) }) }, { - runOnTransientConnection: true + runOnLimitedConnection: true }) this._started = true } diff --git a/packages/transport-webrtc/test/basics.spec.ts b/packages/transport-webrtc/test/basics.spec.ts index c4d01470f5..e8df7d08ba 100644 --- a/packages/transport-webrtc/test/basics.spec.ts +++ b/packages/transport-webrtc/test/basics.spec.ts @@ -71,7 +71,7 @@ describe('basics', () => { await remoteNode.handle(echo, (info) => { streamHandler(info) }, { - runOnTransientConnection: true + runOnLimitedConnection: true }) const connection = await localNode.dial(remoteAddr) @@ -138,7 +138,7 @@ describe('basics', () => { // open a stream on the echo protocol const stream = await connection.newStream(echo, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // send and receive some data @@ -170,7 +170,7 @@ describe('basics', () => { // open a stream on the echo protocol const stream = await connection.newStream(echo, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // close for reading @@ -204,7 +204,7 @@ describe('basics', () => { // open a stream on the echo protocol const stream = await connection.newStream(echo, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // close for reading @@ -241,7 +241,7 @@ describe('basics', () => { // open a stream on the echo protocol const stream = await connection.newStream(echo, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // close the write end immediately @@ -302,7 +302,7 @@ describe('basics', () => { // open a stream on the echo protocol const stream = await connection.newStream(echo, { - runOnTransientConnection: true + runOnLimitedConnection: true }) // keep the remote write end open, this should delay the FIN_ACK reply to the local stream