Skip to content

Commit

Permalink
Merge branch 'farcasterxyz:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
sigeshuo authored Jul 1, 2024
2 parents fa3d593 + 59bcfe7 commit 30e1f95
Show file tree
Hide file tree
Showing 17 changed files with 492 additions and 48 deletions.
10 changes: 10 additions & 0 deletions apps/hubble/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# @farcaster/hubble

## 1.13.5

### Patch Changes

- 224e75fa: fix: gossip contact info every 30 minutes instead of every minute, avoid gossiping contact info on peer connect, skip contact info updates that happen too frequently
- c723f655: feat: Add endpoints to control sync
- 667a5b30: feat: add experimental HTTP APIs to control sync
- Updated dependencies [c723f655]
- @farcaster/hub-nodejs@0.11.19

## 1.13.4

### Patch Changes
Expand Down
4 changes: 2 additions & 2 deletions apps/hubble/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@farcaster/hubble",
"version": "1.13.4",
"version": "1.13.5",
"description": "Farcaster Hub",
"author": "",
"license": "",
Expand Down Expand Up @@ -75,7 +75,7 @@
"@chainsafe/libp2p-noise": "^11.0.0 ",
"@datastructures-js/priority-queue": "^6.3.1",
"@faker-js/faker": "~7.6.0",
"@farcaster/hub-nodejs": "^0.11.18",
"@farcaster/hub-nodejs": "^0.11.19",
"@fastify/cors": "^8.4.0",
"@figma/hot-shots": "^9.0.0-figma.1",
"@grpc/grpc-js": "~1.8.22",
Expand Down
21 changes: 13 additions & 8 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ export const FARCASTER_VERSIONS_SCHEDULE: VersionSchedule[] = [
{ version: "2024.6.12", expiresAt: 1722988800000 }, // expires at 8/7/24 00:00 UTC
];

const MAX_CONTACT_INFO_AGE_MS = GOSSIP_SEEN_TTL;
const MAX_CONTACT_INFO_AGE_MS = 1000 * 60 * 60; // 60 minutes
const CONTACT_INFO_UPDATE_THRESHOLD_MS = 1000 * 60 * 30; // 30 minutes

export interface HubInterface {
engine: Engine;
Expand Down Expand Up @@ -349,7 +350,6 @@ export class Hub implements HubInterface {
private adminServer: AdminServer;
private httpApiServer: HttpAPIServer;

private contactTimer?: NodeJS.Timer;
private rocksDB: RocksDB;
private syncEngine: SyncEngine;
private allowedPeerIds: string[] | undefined;
Expand Down Expand Up @@ -790,7 +790,9 @@ export class Hub implements HubInterface {
this.pruneEventsJobScheduler.start(this.options.pruneEventsJobCron);
this.checkFarcasterVersionJobScheduler.start();
this.validateOrRevokeMessagesJobScheduler.start();
this.gossipContactInfoJobScheduler.start("*/1 * * * *"); // Every minute

const randomMinute = Math.floor(Math.random() * 30);
this.gossipContactInfoJobScheduler.start(`${randomMinute} */30 * * * *`); // Random minute every 30 minutes
this.checkIncomingPortsJobScheduler.start();

// Mainnet only jobs
Expand Down Expand Up @@ -1176,7 +1178,6 @@ export class Hub implements HubInterface {
/** Stop the GossipNode and RPC Server */
async stop(reason: HubShutdownReason, terminateGossipWorker = true) {
log.info("Stopping Hubble...");
clearInterval(this.contactTimer);

// First, stop the RPC/Gossip server so we don't get any more messages
if (!this.options.httpServerDisabled) {
Expand Down Expand Up @@ -1515,7 +1516,7 @@ export class Hub implements HubInterface {
log.debug({ identity: this.identity, peer: peerId, message }, "received peer ContactInfo");

// Check if we already have this client
const result = this.syncEngine.addContactInfoForPeerId(peerId, message);
const result = this.syncEngine.addContactInfoForPeerId(peerId, message, CONTACT_INFO_UPDATE_THRESHOLD_MS);
if (result.isOk() && !this.performedFirstSync) {
// Sync with the first peer so we are upto date on startup.
this.performedFirstSync = true;
Expand Down Expand Up @@ -1642,11 +1643,15 @@ export class Hub implements HubInterface {
});

this.gossipNode.on("peerConnect", async () => {
// NB: Gossiping our own contact info is commented out, since at the time of
// writing this the p2p network has overwhelming number of peers and spends more
// time processing contact info than messages. We may uncomment in the future
// if peer counts drop.
// When we connect to a new node, gossip out our contact info 1 second later.
// The setTimeout is to ensure that we have a chance to receive the peer's info properly.
setTimeout(async () => {
await this.gossipContactInfo();
}, 1 * 1000);
// setTimeout(async () => {
// await this.gossipContactInfo();
// }, 1 * 1000);
statsd().increment("peer_connect.count");
});

Expand Down
16 changes: 12 additions & 4 deletions apps/hubble/src/network/sync/syncEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,8 @@ describe("SyncEngine", () => {
const peerId = await createEd25519PeerId();
expect(syncEngine.getContactInfoForPeerId(peerId.toString())).toBeUndefined();

expect(syncEngine.addContactInfoForPeerId(peerId, contactInfo)).toBeInstanceOf(Ok);
const updateThresholdMilliseconds = 0;
expect(syncEngine.addContactInfoForPeerId(peerId, contactInfo, updateThresholdMilliseconds)).toBeInstanceOf(Ok);
expect(syncEngine.getContactInfoForPeerId(peerId.toString())?.contactInfo).toEqual(contactInfo);
expect(syncEngine.getContactInfoForPeerId(peerId.toString())?.peerId).toEqual(peerId);
});
Expand All @@ -596,15 +597,22 @@ describe("SyncEngine", () => {
const newerContactInfo = NetworkFactories.GossipContactInfoContent.build({ timestamp: now + 10 });
const peerId = await createEd25519PeerId();

expect(syncEngine.addContactInfoForPeerId(peerId, contactInfo)).toBeInstanceOf(Ok);
// NB: We set update value to 0, but there may non-determinism if test runs too quickly. If the tests start getting
// too flaky, we can sleep for a millisecond between function calls to make sure the time elapsed is greater than 0.
const updateThresholdMilliseconds = 0;
expect(syncEngine.addContactInfoForPeerId(peerId, contactInfo, updateThresholdMilliseconds)).toBeInstanceOf(Ok);
expect(syncEngine.getContactInfoForPeerId(peerId.toString())?.contactInfo).toEqual(contactInfo);

// Adding an older contact info should not replace the existing one
expect(syncEngine.addContactInfoForPeerId(peerId, olderContactInfo)).toBeInstanceOf(Err);
expect(syncEngine.addContactInfoForPeerId(peerId, olderContactInfo, updateThresholdMilliseconds)).toBeInstanceOf(
Err,
);
expect(syncEngine.getContactInfoForPeerId(peerId.toString())?.contactInfo).toEqual(contactInfo);

// Adding a newer contact info should replace the existing one
expect(syncEngine.addContactInfoForPeerId(peerId, newerContactInfo)).toBeInstanceOf(Ok);
expect(syncEngine.addContactInfoForPeerId(peerId, newerContactInfo, updateThresholdMilliseconds)).toBeInstanceOf(
Ok,
);
expect(syncEngine.getContactInfoForPeerId(peerId.toString())?.contactInfo).toEqual(newerContactInfo);
});
});
Expand Down
108 changes: 85 additions & 23 deletions apps/hubble/src/network/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ const log = logger.child({
component: "SyncEngine",
});

type NonNegativeInteger<T extends number> = `${T}` extends `-${string}` | `${string}.${string}` ? never : T;

interface SyncEvents {
/** Emit an event when diff starts */
syncStart: () => void;
Expand Down Expand Up @@ -493,6 +495,18 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
}

public async stop() {
await this.stopSync();
await this._trie.stop();

this._started = false;
this.curSync.interruptSync = false;
log.info("Sync engine stopped");
}

public async stopSync() {
if (!this.isSyncing()) {
return true;
}
// Interrupt any ongoing sync
this.curSync.interruptSync = true;

Expand All @@ -502,13 +516,9 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
await sleepWhile(() => this.syncTrieQSize > 0, SYNC_INTERRUPT_TIMEOUT);
} catch (e) {
log.error({ err: e }, "Interrupting sync timed out");
return false;
}

await this._trie.stop();

this._started = false;
this.curSync.interruptSync = false;
log.info("Sync engine stopped");
return true;
}

public getBadPeerIds(): string[] {
Expand Down Expand Up @@ -539,27 +549,39 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
return this.currentHubPeerContacts.values();
}

public addContactInfoForPeerId(peerId: PeerId, contactInfo: ContactInfoContentBody) {
public addContactInfoForPeerId(
peerId: PeerId,
contactInfo: ContactInfoContentBody,
updateThresholdMilliseconds: NonNegativeInteger<number>,
) {
const existingPeerInfo = this.getContactInfoForPeerId(peerId.toString());
if (existingPeerInfo && contactInfo.timestamp <= existingPeerInfo.contactInfo.timestamp) {
return err(new HubError("bad_request.duplicate", "peer already exists"));
}
log.info(
{
peerInfo: contactInfo,
theirMessages: contactInfo.count,
peerNetwork: contactInfo.network,
peerVersion: contactInfo.hubVersion,
peerAppVersion: contactInfo.appVersion,
connectedPeers: this.getPeerCount(),
peerId: peerId.toString(),
isNew: !!existingPeerInfo,
gossipDelay: (Date.now() - contactInfo.timestamp) / 1000,
},
"Updated Peer ContactInfo",
);
this.currentHubPeerContacts.set(peerId.toString(), { peerId, contactInfo });
return ok(undefined);
const previousTimestamp = existingPeerInfo ? existingPeerInfo.contactInfo.timestamp : -Infinity;
const elapsed = Date.now() - previousTimestamp;

// only update if contact info was updated more than ${updateThresholdMilliseconds} ago
if (elapsed > updateThresholdMilliseconds) {
log.info(
{
peerInfo: contactInfo,
theirMessages: contactInfo.count,
peerNetwork: contactInfo.network,
peerVersion: contactInfo.hubVersion,
peerAppVersion: contactInfo.appVersion,
connectedPeers: this.getPeerCount(),
peerId: peerId.toString(),
isNew: !!existingPeerInfo,
gossipDelay: (Date.now() - contactInfo.timestamp) / 1000,
},
"Updated Peer ContactInfo",
);
this.currentHubPeerContacts.set(peerId.toString(), { peerId, contactInfo });
return ok(undefined);
} else {
return err(new HubError("bad_request.duplicate", "recent contact update found for peer"));
}
}

public removeContactInfoForPeerId(peerId: string) {
Expand Down Expand Up @@ -795,6 +817,46 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
}
}

public async forceSyncWithPeer(peerId: string) {
if (this.isSyncing()) {
return err(new HubError("bad_request", "Already syncing"));
}

const contactInfo = this.getContactInfoForPeerId(peerId);
if (!contactInfo) {
return err(new HubError("bad_request", "Peer not found"));
}

const rpcClient = await this._hub.getRPCClientForPeer(contactInfo.peerId, contactInfo.contactInfo);
if (!rpcClient) {
return err(new HubError("bad_request", "Unreachable peer"));
}

log.info({ peerId }, "Force sync: Starting sync");

const peerStateResult = await rpcClient.getSyncSnapshotByPrefix(
TrieNodePrefix.create({ prefix: new Uint8Array() }),
new Metadata(),
rpcDeadline(),
);

if (peerStateResult.isErr()) {
return err(peerStateResult.error);
}

const syncStatus = await this.syncStatus(peerId, peerStateResult.value);
if (syncStatus.isErr()) {
return err(syncStatus.error);
}

// Ignore sync status because we always want to sync, we return it to the clients can get visibility into the peer's state
// Intentionally not available here, so the grpc call can succeed immediately
this.performSync(peerId, rpcClient, false).then((result) => {
log.info({ result }, "Force sync: complete");
});
return ok(syncStatus.value);
}

public async syncStatus(peerId: string, theirSnapshot: TrieSnapshot): HubAsyncResult<SyncStatus> {
const ourSnapshotResult = await this.getSnapshot(theirSnapshot.prefix);

Expand Down
24 changes: 24 additions & 0 deletions apps/hubble/src/rpc/httpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
ValidationResponse,
base58ToBytes,
bytesToBase58,
SyncStatusResponse,
} from "@farcaster/hub-nodejs";
import { Metadata, ServerUnaryCall } from "@grpc/grpc-js";
import fastify from "fastify";
Expand Down Expand Up @@ -266,6 +267,29 @@ export class HttpAPIServer {
this.grpcImpl.getCurrentPeers(call, handleResponse(reply, ContactInfoResponse));
});

//================stopSync================
// @doc-tag: /stopSync
this.app.post("/v1/stopSync", (request, reply) => {
const call = getCallObject("stopSync", {}, request);
this.grpcImpl.stopSync(call, handleResponse(reply, SyncStatusResponse));
});

//================syncStatus================
// @doc-tag: /syncStatus?peer_id=...
this.app.get<{ Querystring: { peer_id: string } }>("/v1/syncStatus", (request, reply) => {
const { peer_id } = request.query;
const call = getCallObject("getSyncStatus", { peerId: peer_id }, request);
this.grpcImpl.getSyncStatus(call, handleResponse(reply, SyncStatusResponse));
});

//================forceSync================
// @doc-tag: /forceSync?peer_id=...
this.app.post<{ Querystring: { peer_id: string } }>("/v1/forceSync", (request, reply) => {
const { peer_id } = request.query;
const call = getCallObject("forceSync", { peerId: peer_id }, request);
this.grpcImpl.forceSync(call, handleResponse(reply, SyncStatusResponse));
});

//================Casts================
// @doc-tag: /castById?fid=...&hash=...
this.app.get<{ Querystring: { fid: string; hash: string } }>("/v1/castById", (request, reply) => {
Expand Down
Loading

0 comments on commit 30e1f95

Please sign in to comment.