Skip to content

Commit

Permalink
Node: Add Binary support for stream commands, part 1 (valkey-io#2200)
Browse files Browse the repository at this point in the history
* Node: Add Binary support for stream commands, part 1

Signed-off-by: TJ Zhang <[email protected]>
Co-authored-by: TJ Zhang <[email protected]>
  • Loading branch information
tjzhang-BQ and TJ Zhang authored Aug 29, 2024
1 parent 9565313 commit 8d482be
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 111 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
* Node: Added XACK commands ([#2112](https://github.com/valkey-io/valkey-glide/pull/2112))
* Node: Added XGROUP SETID command ([#2135]((https://github.com/valkey-io/valkey-glide/pull/2135))
* Node: Added binary variant to string commands ([#2183](https://github.com/valkey-io/valkey-glide/pull/2183))
* Node: Added binary variant to stream commands ([#2200](https://github.com/valkey-io/valkey-glide/pull/2200))

#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
Expand Down
79 changes: 42 additions & 37 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4731,14 +4731,18 @@ export class BaseClient {
* @param key - The key of the stream.
* @param values - field-value pairs to be added to the entry.
* @param options - options detailing how to add to the stream.
* @param options - (Optional) See {@link StreamAddOptions} and {@link DecoderOption}.
* @returns The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists.
*/
public async xadd(
key: string,
values: [string, string][],
options?: StreamAddOptions,
): Promise<string | null> {
return this.createWritePromise(createXAdd(key, values, options));
key: GlideString,
values: [GlideString, GlideString][],
options?: StreamAddOptions & DecoderOption,
): Promise<GlideString | null> {
return this.createWritePromise(
createXAdd(key, values, options),
options,
);
}

/**
Expand All @@ -4757,7 +4761,7 @@ export class BaseClient {
* // Output is 2 since the stream marked 2 entries as deleted.
* ```
*/
public async xdel(key: string, ids: string[]): Promise<number> {
public async xdel(key: GlideString, ids: GlideString[]): Promise<number> {
return this.createWritePromise(createXDel(key, ids));
}

Expand Down Expand Up @@ -5024,7 +5028,7 @@ export class BaseClient {
* @param consumer - The group consumer.
* @param minIdleTime - The minimum idle time for the message to be claimed.
* @param ids - An array of entry ids.
* @param options - (Optional) Stream claim options {@link StreamClaimOptions}.
* @param options - (Optional) See {@link StreamClaimOptions} and {@link DecoderOption}.
* @returns A `Record` of message entries that are claimed by the consumer.
*
* @example
Expand All @@ -5038,13 +5042,14 @@ export class BaseClient {
* ```
*/
public async xclaim(
key: string,
group: string,
consumer: string,
key: GlideString,
group: GlideString,
consumer: GlideString,
minIdleTime: number,
ids: string[],
options?: StreamClaimOptions,
ids: GlideString[],
options?: StreamClaimOptions & DecoderOption,
): Promise<Record<string, [string, string][]>> {
// TODO: convert Record return type to Object array
return this.createWritePromise(
createXClaim(key, group, consumer, minIdleTime, ids, options),
);
Expand Down Expand Up @@ -5093,13 +5098,14 @@ export class BaseClient {
* ```
*/
public async xautoclaim(
key: string,
group: string,
consumer: string,
key: GlideString,
group: GlideString,
consumer: GlideString,
minIdleTime: number,
start: string,
start: GlideString,
count?: number,
): Promise<[string, Record<string, [string, string][]>, string[]?]> {
// TODO: convert Record return type to Object array
return this.createWritePromise(
createXAutoClaim(key, group, consumer, minIdleTime, start, count),
);
Expand Down Expand Up @@ -5218,13 +5224,14 @@ export class BaseClient {
* ```
*/
public async xgroupCreate(
key: string,
groupName: string,
id: string,
key: GlideString,
groupName: GlideString,
id: GlideString,
options?: StreamGroupOptions,
): Promise<string> {
): Promise<"OK"> {
return this.createWritePromise(
createXGroupCreate(key, groupName, id, options),
{ decoder: Decoder.String },
);
}

Expand All @@ -5244,8 +5251,8 @@ export class BaseClient {
* ```
*/
public async xgroupDestroy(
key: string,
groupName: string,
key: GlideString,
groupName: GlideString,
): Promise<boolean> {
return this.createWritePromise(createXGroupDestroy(key, groupName));
}
Expand Down Expand Up @@ -5340,9 +5347,9 @@ export class BaseClient {
* ```
*/
public async xgroupCreateConsumer(
key: string,
groupName: string,
consumerName: string,
key: GlideString,
groupName: GlideString,
consumerName: GlideString,
): Promise<boolean> {
return this.createWritePromise(
createXGroupCreateConsumer(key, groupName, consumerName),
Expand All @@ -5366,9 +5373,9 @@ export class BaseClient {
* ```
*/
public async xgroupDelConsumer(
key: string,
groupName: string,
consumerName: string,
key: GlideString,
groupName: GlideString,
consumerName: GlideString,
): Promise<number> {
return this.createWritePromise(
createXGroupDelConsumer(key, groupName, consumerName),
Expand Down Expand Up @@ -5406,9 +5413,9 @@ export class BaseClient {
* ```
*/
public async xack(
key: string,
group: string,
ids: string[],
key: GlideString,
group: GlideString,
ids: GlideString[],
): Promise<number> {
return this.createWritePromise(createXAck(key, group, ids));
}
Expand All @@ -5424,7 +5431,6 @@ export class BaseClient {
* group.
* @param entriesRead - (Optional) A value representing the number of stream entries already read by the group.
* This option can only be specified if you are using Valkey version 7.0.0 or above.
* @param decoder - (Optional) {@link Decoder} type which defines how to handle the response. If not set, the default decoder from the client config will be used.
* @returns `"OK"`.
*
* * @example
Expand All @@ -5433,16 +5439,15 @@ export class BaseClient {
* ```
*/
public async xgroupSetId(
key: string,
groupName: string,
id: string,
key: GlideString,
groupName: GlideString,
id: GlideString,
entriesRead?: number,
decoder?: Decoder,
): Promise<"OK"> {
return this.createWritePromise(
createXGroupSetid(key, groupName, id, entriesRead),
{
decoder: decoder,
decoder: Decoder.String,
},
);
}
Expand Down
62 changes: 31 additions & 31 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2052,7 +2052,7 @@ export type StreamAddOptions = {
trim?: StreamTrimOptions;
};

function addTrimOptions(options: StreamTrimOptions, args: string[]) {
function addTrimOptions(options: StreamTrimOptions, args: GlideString[]) {
if (options.method === "maxlen") {
args.push("MAXLEN");
} else if (options.method === "minid") {
Expand Down Expand Up @@ -2081,8 +2081,8 @@ function addTrimOptions(options: StreamTrimOptions, args: string[]) {
* @internal
*/
export function createXAdd(
key: string,
values: [string, string][],
key: GlideString,
values: [GlideString, GlideString][],
options?: StreamAddOptions,
): command_request.Command {
const args = [key];
Expand Down Expand Up @@ -2113,8 +2113,8 @@ export function createXAdd(
* @internal
*/
export function createXDel(
key: string,
ids: string[],
key: GlideString,
ids: GlideString[],
): command_request.Command {
return createCommand(RequestType.XDel, [key, ...ids]);
}
Expand Down Expand Up @@ -2173,9 +2173,9 @@ export function createXRevRange(
* @internal
*/
export function createXGroupCreateConsumer(
key: string,
groupName: string,
consumerName: string,
key: GlideString,
groupName: GlideString,
consumerName: GlideString,
): command_request.Command {
return createCommand(RequestType.XGroupCreateConsumer, [
key,
Expand All @@ -2188,9 +2188,9 @@ export function createXGroupCreateConsumer(
* @internal
*/
export function createXGroupDelConsumer(
key: string,
groupName: string,
consumerName: string,
key: GlideString,
groupName: GlideString,
consumerName: GlideString,
): command_request.Command {
return createCommand(RequestType.XGroupDelConsumer, [
key,
Expand Down Expand Up @@ -2714,11 +2714,11 @@ export type StreamClaimOptions = {

/** @internal */
export function createXClaim(
key: string,
group: string,
consumer: string,
key: GlideString,
group: GlideString,
consumer: GlideString,
minIdleTime: number,
ids: string[],
ids: GlideString[],
options?: StreamClaimOptions,
justId?: boolean,
): command_request.Command {
Expand All @@ -2740,11 +2740,11 @@ export function createXClaim(

/** @internal */
export function createXAutoClaim(
key: string,
group: string,
consumer: string,
key: GlideString,
group: GlideString,
consumer: GlideString,
minIdleTime: number,
start: string,
start: GlideString,
count?: number,
justId?: boolean,
): command_request.Command {
Expand Down Expand Up @@ -2784,12 +2784,12 @@ export type StreamGroupOptions = {
* @internal
*/
export function createXGroupCreate(
key: string,
groupName: string,
id: string,
key: GlideString,
groupName: GlideString,
id: GlideString,
options?: StreamGroupOptions,
): command_request.Command {
const args: string[] = [key, groupName, id];
const args: GlideString[] = [key, groupName, id];

if (options) {
if (options.mkStream) {
Expand All @@ -2809,8 +2809,8 @@ export function createXGroupCreate(
* @internal
*/
export function createXGroupDestroy(
key: string,
groupName: string,
key: GlideString,
groupName: GlideString,
): command_request.Command {
return createCommand(RequestType.XGroupDestroy, [key, groupName]);
}
Expand Down Expand Up @@ -3975,9 +3975,9 @@ export function createGetEx(
* @internal
*/
export function createXAck(
key: string,
group: string,
ids: string[],
key: GlideString,
group: GlideString,
ids: GlideString[],
): command_request.Command {
return createCommand(RequestType.XAck, [key, group, ...ids]);
}
Expand All @@ -3986,9 +3986,9 @@ export function createXAck(
* @internal
*/
export function createXGroupSetid(
key: string,
groupName: string,
id: string,
key: GlideString,
groupName: GlideString,
id: GlideString,
entriesRead?: number,
): command_request.Command {
const args = [key, groupName, id];
Expand Down
Loading

0 comments on commit 8d482be

Please sign in to comment.