Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.end() of stream #506

Open
Knowzzz opened this issue Jan 5, 2025 · 1 comment
Open

.end() of stream #506

Knowzzz opened this issue Jan 5, 2025 · 1 comment

Comments

@Knowzzz
Copy link

Knowzzz commented Jan 5, 2025

I think there's a problem when stopping the stream, when the stream object is in a class. I have a class with 2 methods:

  • one to start the stream and receive data,
  • the other to end the stream

the problem is that when I .end() the stream, my log in my method is displayed, so the method has executed correctly, but I'm still receiving data and I can't end the stream.


import Client, {
	CommitmentLevel,
	type SubscribeUpdate,
	type SubscribeRequest,
} from "@triton-one/yellowstone-grpc";
import bs58 from "bs58";

const processBuffers = (obj: any): any =>
	!obj
		? obj
		: Buffer.isBuffer(obj) || obj instanceof Uint8Array
			? bs58.encode(obj)
			: Array.isArray(obj)
				? obj.map(processBuffers)
				: typeof obj === "object"
					? Object.fromEntries(
							Object.entries(obj).map(([k, v]) => [k, processBuffers(v)]),
						)
					: obj;

class TritonClient {
	private client: Client;
	private stream: any;

	constructor() {
		this.client = new Client("http:com:4001", "102c6d3c", {
			"grpc.max_receive_message_length": 128 * 1024 * 1024,
		});
	}

	public async startStream() {
		this.stream = await this.client.subscribe();
		this.setupStreamListeners();

		await this.write({
			slots: {},
			accounts: {},
			accountsDataSlice: [],
			transactions: {},
			blocks: {
				allRaydiumTxs: {
					accountInclude: ["675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"],
				},
			},
			blocksMeta: {},
			entry: {},
			commitment: CommitmentLevel.PROCESSED,
			transactionsStatus: {},
		});

		setInterval(
			() =>
				this.write({
					ping: { id: 1 },
					accounts: {},
					accountsDataSlice: [],
					transactions: {},
					blocks: {},
					blocksMeta: {},
					entry: {},
					slots: {},
					transactionsStatus: {},
				}).catch(console.error),
			30000,
		);
	}

	private setupStreamListeners() {
		let previousTimestamp = Date.now();

		this.stream.on("data", (data: SubscribeUpdate) => {
			try {
				const block = data.block;
				if (!block || !block.transactions) {
					return;
				}

				const currentTimestamp = Date.now();
				const delay = currentTimestamp - previousTimestamp;
				previousTimestamp = currentTimestamp;

				console.log(
					`Block timestamp: ${Number(block.blockTime?.timestamp) * 1000}, Current timestamp: ${currentTimestamp}`,
				);

				console.log(
					`New block received : Slot ${block.slot}, delay since last block : ${delay} ms`,
				);
			} catch (error) {
				console.error("Error Triton :", { error });
			}
		});

		this.stream.on("error", (e) => {
			console.error("Stream error:", e);
			this.endStream();
		});
		this.stream.on("end", () => console.log("Stream ended"));
		this.stream.on("close", () => console.log("Stream closed"));
	}

	private async write(req: SubscribeRequest) {
		return new Promise<void>((resolve, reject) =>
			this.stream.write(req, (err: any) => (err ? reject(err) : resolve())),
		);
	}

	public endStream() {
		if (this.stream) {
			this.stream.end();
			console.log("Stream ended by user");
		}
	}
}

const tritonClient = new TritonClient();
tritonClient.startStream().catch(console.error);

setTimeout(() => tritonClient.endStream(), 10000);

Here is an example of the code
Thank you in advance for your reply.

@fs5252
Copy link

fs5252 commented Jan 6, 2025

please use .cancel()and.destory() instead of .end()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants