Skip to content

Commit

Permalink
Merge branch 'master' into content-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
acolytec3 committed Aug 7, 2023
2 parents f87f8f6 + 1e26eaa commit 70acd3e
Show file tree
Hide file tree
Showing 18 changed files with 202 additions and 137 deletions.
27 changes: 14 additions & 13 deletions packages/cli/src/rpc/modules/portal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ export class portal {
const shortEnr = encodedENR.nodeId.slice(0, 15) + '...'
this.logger(`portal_historyAddEnr request received for ${shortEnr}`)
try {
if (this._history.routingTable.getValue(encodedENR.nodeId)) {
if (this._history.routingTable.getWithPending(encodedENR.nodeId)?.value) {
return true
}
this._history.routingTable.insertOrUpdate(encodedENR, EntryStatus.Disconnected)
Expand Down Expand Up @@ -284,7 +284,7 @@ export class portal {
if (!isValidId(dstId)) {
return 'invalid node id'
}
if (!this._history.routingTable.getValue(dstId)) {
if (!this._history.routingTable.getWithPending(dstId)?.value) {
const pong = await this._history.sendPing(enr)
if (!pong) {
return ''
Expand All @@ -303,7 +303,7 @@ export class portal {
const [dstId, distances] = params
this.logger(`portal_historySendFindNodes`)
try {
const enr = this._history.routingTable.getValue(dstId)
const enr = this._history.routingTable.getWithPending(dstId)?.value
if (!enr) {
return
}
Expand All @@ -317,7 +317,7 @@ export class portal {
const [dstId, enrs, requestId] = params
this.logger(`portal_historySendNodes`)
try {
const enr = this._history.routingTable.getValue(dstId)
const enr = this._history.routingTable.getWithPending(dstId)?.value
if (!enr) {
return
}
Expand Down Expand Up @@ -351,18 +351,19 @@ export class portal {
this.logger(`historyRecursiveFindNodes request returned ${res}`)
return res ?? ''
}
async historyLocalContent(params: [string]): Promise<string> {
async historyLocalContent(params: [string]): Promise<string | undefined> {
const [contentKey] = params
this.logger(`Received historyLocalContent request for ${contentKey}`)

const res = await this._history.findContentLocally(fromHexString(contentKey))
this.logger(`historyLocalContent request returned ${res.length} bytes`)
return toHexString(res)
this.logger.extend(`historyLocalContent`)(`request returned ${res.length} bytes`)
this.logger.extend(`historyLocalContent`)(`${toHexString(res)}`)
return res.length > 0 ? toHexString(res) : undefined
}
async historyFindContent(params: [string, string]) {
const [enr, contentKey] = params
const nodeId = ENR.decodeTxt(enr).nodeId
if (!this._history.routingTable.getValue(nodeId)) {
if (!this._history.routingTable.getWithPending(nodeId)?.value) {
const pong = await this._history.sendPing(enr)
if (!pong) {
return ''
Expand Down Expand Up @@ -393,14 +394,14 @@ export class portal {
res.selector === 0 && this.logger.extend('findContent')('utp')
this.logger.extend('findContent')(content)
return {
content: toHexString(content),
content: content.length > 0 ? toHexString(content) : '',
utpTransfer: res.selector === 0,
}
}
async historySendFindContent(params: [string, string]) {
const [nodeId, contentKey] = params
const res = await this._history.sendFindContent(nodeId, fromHexString(contentKey))
const enr = this._history.routingTable.getValue(nodeId)
const enr = this._history.routingTable.getWithPending(nodeId)?.value
return res && enr && '0x' + enr.seq.toString(16)
}
async historySendContent(params: [string, string]) {
Expand All @@ -409,7 +410,7 @@ export class portal {
selector: 1,
value: fromHexString(content),
})
const enr = this._history.routingTable.getValue(nodeId)
const enr = this._history.routingTable.getWithPending(nodeId)?.value
this._client.sendPortalNetworkResponse(
{ nodeId, socketAddr: enr?.getLocationMultiaddr('udp')! },
enr!.seq,
Expand All @@ -430,7 +431,7 @@ export class portal {
const [enrHex, contentKeyHex, contentValueHex] = params
const enr = ENR.decodeTxt(enrHex)
const contentKey = decodeContentKey(contentKeyHex)
if (this._history.routingTable.getValue(enr.nodeId) === undefined) {
if (this._history.routingTable.getWithPending(enr.nodeId)?.value === undefined) {
const res = await this._history.sendPing(enr)
if (res === undefined) {
return '0x'
Expand All @@ -448,7 +449,7 @@ export class portal {
const [dstId, contentKeys] = params
const keys = contentKeys.map((key) => fromHexString(key))
const res = await this._history.sendOffer(dstId, keys)
const enr = this._history.routingTable.getValue(dstId)
const enr = this._history.routingTable.getWithPending(dstId)?.value
return res && enr && '0x' + enr.seq.toString(16)
}
async historySendAccept(params: [string, string, string[]]) {
Expand Down
4 changes: 2 additions & 2 deletions packages/portalnetwork/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
this.discv5.on('talkReqReceived', this.onTalkReq)
this.discv5.on('talkRespReceived', this.onTalkResp)
this.uTP.on('Send', async (peerId: string, msg: Buffer, protocolId: ProtocolId) => {
const enr = this.protocols.get(protocolId)?.routingTable.getValue(peerId)
const enr = this.protocols.get(protocolId)?.routingTable.getWithPending(peerId)?.value
try {
await this.sendPortalNetworkMessage(enr ?? peerId, msg, protocolId, true)
this.uTP.emit('Sent')
Expand Down Expand Up @@ -369,7 +369,7 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
// If ENR is not provided, look up ENR in protocol routing table by nodeId
const protocol = this.protocols.get(protocolId)
if (protocol) {
nodeAddr = protocol.routingTable.getValue(enr)
nodeAddr = protocol.routingTable.getWithPending(enr)?.value
if (!nodeAddr) {
// Check in unverified sessions cache if no ENR found in routing table
nodeAddr = this.unverifiedSessionCache.get(enr)
Expand Down
4 changes: 2 additions & 2 deletions packages/portalnetwork/src/subprotocols/contentLookup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ export class ContentLookup {
continue
}
// Send a PING request to check liveness of any unknown nodes
if (!this.protocol.routingTable.getValue(decodedEnr.nodeId)) {
if (!this.protocol.routingTable.getWithPending(decodedEnr.nodeId)?.value) {
const ping = await this.protocol.sendPing(decodedEnr)
if (!ping) {
this.protocol.routingTable.evictNode(decodedEnr.nodeId)
continue
}
}
if (!this.protocol.routingTable.getValue(decodedEnr.nodeId)) {
if (!this.protocol.routingTable.getWithPending(decodedEnr.nodeId)?.value) {
continue
}
// Calculate distance and add to list of lookup peers
Expand Down
2 changes: 1 addition & 1 deletion packages/portalnetwork/src/subprotocols/history/history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export class HistoryProtocol extends BaseProtocol {
* @returns the value of the FOUNDCONTENT response or undefined
*/
public sendFindContent = async (dstId: string, key: Uint8Array) => {
const enr = this.routingTable.getValue(dstId)
const enr = this.routingTable.getWithPending(dstId)?.value
if (!enr) {
this.logger(`No ENR found for ${shortId(dstId)}. FINDCONTENT aborted.`)
return
Expand Down
2 changes: 1 addition & 1 deletion packages/portalnetwork/src/subprotocols/nodeLookup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ export class NodeLookup {
const res = await this.protocol.sendPing(enr)
if (res) this.protocol.routingTable.insertOrUpdate(enr, EntryStatus.Connected)
}
return this.protocol.routingTable.getValue(this.nodeSought)?.encodeTxt()
return this.protocol.routingTable.getWithPending(this.nodeSought)?.value.encodeTxt()
}
}
17 changes: 11 additions & 6 deletions packages/portalnetwork/src/subprotocols/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ export abstract class BaseProtocol extends EventEmitter {
abstract store(contentType: any, hashKey: string, value: Uint8Array): Promise<void>

public handle(message: ITalkReqMessage, src: INodeAddress) {
const enr = this.findEnr(src.nodeId)
if (enr !== undefined) {
this.updateRoutingTable(enr)
}
const id = message.id
const protocol = message.protocol
const request = message.request
Expand Down Expand Up @@ -164,7 +168,7 @@ export abstract class BaseProtocol extends EventEmitter {
}

handlePing = async (src: INodeAddress, id: bigint, pingMessage: PingMessage) => {
if (!this.routingTable.getValue(src.nodeId)) {
if (!this.routingTable.getWithPending(src.nodeId)?.value) {
// Check to see if node is already in corresponding network routing table and add if not
const enr = this.findEnr(src.nodeId)
if (enr !== undefined) {
Expand Down Expand Up @@ -207,7 +211,7 @@ export abstract class BaseProtocol extends EventEmitter {
})

try {
const enr = this.routingTable.getValue(dstId)
const enr = this.routingTable.getWithPending(dstId)?.value
if (!enr) {
return
}
Expand All @@ -221,7 +225,7 @@ export abstract class BaseProtocol extends EventEmitter {
(enr) => !this.routingTable.isIgnored(ENR.decode(Buffer.from(enr)).nodeId)
)
const unknown = notIgnored.filter(
(enr) => !this.routingTable.getValue(ENR.decode(Buffer.from(enr)).nodeId)
(enr) => !this.routingTable.getWithPending(ENR.decode(Buffer.from(enr)).nodeId)?.value
)
// Ping node if not currently in subprotocol routing table
for (const enr of unknown) {
Expand Down Expand Up @@ -315,7 +319,7 @@ export abstract class BaseProtocol extends EventEmitter {
selector: MessageCodes.OFFER,
value: offerMsg,
})
const enr = this.routingTable.getValue(dstId)
const enr = this.routingTable.getWithPending(dstId)?.value
if (!enr) {
this.logger(`No ENR found for ${shortId(dstId)}. OFFER aborted.`)
return
Expand All @@ -341,6 +345,7 @@ export abstract class BaseProtocol extends EventEmitter {
this.logger.extend('ACCEPT')(`No content ACCEPTed by ${shortId(dstId)}`)
return []
}
this.logger.extend(`OFFER`)(`ACCEPT message received with uTP id: ${id}`)

const requestedData: Uint8Array[] = []
for await (const key of requestedKeys) {
Expand Down Expand Up @@ -563,7 +568,7 @@ export abstract class BaseProtocol extends EventEmitter {
try {
const nodeId = enr.nodeId
// Only add node to the routing table if we have an ENR
this.routingTable.getValue(enr.nodeId) === undefined &&
this.routingTable.getWithPending(enr.nodeId)?.value === undefined &&
this.logger(`adding ${nodeId} to ${this.protocolName} routing table`)
this.routingTable.insertOrUpdate(enr, EntryStatus.Connected)
if (customPayload) {
Expand All @@ -573,7 +578,7 @@ export abstract class BaseProtocol extends EventEmitter {
} catch (err) {
this.logger(`Something went wrong: ${(err as any).message}`)
try {
this.routingTable.getValue(enr as any) === undefined &&
this.routingTable.getWithPending(enr as any)?.value === undefined &&
this.logger(`adding ${enr as any} to ${this.protocolName} routing table`)
this.routingTable.insertOrUpdate(enr, EntryStatus.Connected)
if (customPayload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
// this.logger(
// `Received Rendezvous FIND request for ${shortId(dstId)} on ${protocolId} network`
// )
// let enr = protocol.routingTable.getValue(dstId)
// let enr = protocol.routingTable.getWithPending(dstId)
// if (!enr) {
// enr = this.client.discv5.getKadValue(dstId)
// if (!enr) {
Expand Down Expand Up @@ -108,7 +108,7 @@
// this.logger(
// `Received Rendezvous SYNC from requestor ${shortId(srcId)} for target ${shortId(dstId)}`
// )
// const srcEnr = protocol.routingTable.getValue(srcId)
// const srcEnr = protocol.routingTable.getWithPending(srcId)
// const payload = Buffer.concat([
// Uint8Array.from([2]),
// Buffer.from(protocolId.slice(2), 'hex'),
Expand Down
4 changes: 2 additions & 2 deletions packages/portalnetwork/src/wire/utp/Packets/PacketManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ export class PacketManager {
...opts,
version: 1,
timestampMicroseconds: Bytes32TimeStamp(),
connectionId: this.rcvConnectionId,
connectionId: opts.connectionId ?? this.rcvConnectionId,
timestampDifferenceMicroseconds: this.congestionControl.reply_micro,
wndSize: Math.min(
2 ** 32 - 1,
this.congestionControl.max_window - this.congestionControl.cur_window
Math.abs(this.congestionControl.max_window - this.congestionControl.cur_window)
),
}
const options: PacketOptions<T> = {
Expand Down
7 changes: 7 additions & 0 deletions packages/portalnetwork/src/wire/utp/Packets/PacketTyping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ interface IHeaderInput<T extends PacketType> {
interface IBasicHeaderInput<T extends PacketType> extends IHeaderInput<T> {
pType: PacketType
extension: HeaderExtension.none
connectionId: Uint16
}
export interface ISelectiveAckHeaderInput extends IHeaderInput<PacketType.ST_STATE> {
pType: PacketType.ST_STATE
extension: HeaderExtension.selectiveAck
bitmask: Uint8Array
connectionId: Uint16
}
export type HeaderInput<T extends PacketType> = IBasicHeaderInput<T> | ISelectiveAckHeaderInput

Expand Down Expand Up @@ -73,21 +75,25 @@ export interface IPacket<T extends PacketType> {
pType: T
seqNr: number
ackNr: number
connectionId: number
payload?: Uint8Array
}
export interface IBasic<T extends PacketType> extends IPacket<T> {
pType: T
extension: HeaderExtension.none
connectionId: number
}
export interface ISelectiveAck extends IPacket<PacketType.ST_STATE> {
pType: PacketType.ST_STATE
extension: HeaderExtension.selectiveAck
bitmask: Uint8Array
connectionId: number
}
export interface IData extends IPacket<PacketType.ST_DATA> {
pType: PacketType.ST_DATA
extension: HeaderExtension.none
payload: Uint8Array
connectionId: number
}
export type ICreate<T extends PacketType> = IBasic<T> | ISelectiveAck | IData

Expand All @@ -105,6 +111,7 @@ export interface UtpSocketOptions {

interface ICreatePacket<T> {
pType: T
connectionId?: number
}
interface ICreateSelectiveAck extends ICreatePacket<PacketType.ST_STATE> {
bitmask: Uint8Array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ export class ContentRequest {
}

async sendSyn(): Promise<void> {
await this.socket.sendSynPacket()
await this.socket.sendSynPacket(
this.requestCode === RequestCode.OFFER_WRITE
? this.socket.sndConnectionId
: this.socket.rcvConnectionId
)
this.socket.state = ConnectionState.SynSent
}
}
Loading

0 comments on commit 70acd3e

Please sign in to comment.