Skip to content

Commit

Permalink
feat: add automatic reconnection attempts every 5s
Browse files Browse the repository at this point in the history
  • Loading branch information
lars-berger committed Sep 23, 2024
1 parent d81c8c2 commit b22f5cb
Showing 1 changed file with 132 additions and 58 deletions.
190 changes: 132 additions & 58 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,19 @@ import {
} from './types';

export interface WmClientOptions {
/** IPC server port to connect to. Defaults to `6123`. */
/**
* IPC server port to connect to.
*
* Defaults to `6123`.
*/
port?: number;

/**
* Reconnection interval in milliseconds.
*
* Defaults to `5000` (5 seconds).
*/
reconnectInterval?: number;
}

/** Unregisters a callback. */
Expand All @@ -42,12 +53,19 @@ export type SubscribeCallback<T extends WmEventType> = (

export class WmClient {
private readonly DEFAULT_PORT = 6123;
private readonly DEFAULT_RECONNECT_INTERVAL = 5000;

/** Websocket connection to IPC server. */
private _socket: WebSocket | null = null;
/**
* Promise that resolves to `WebSocket` instance if connected.
*
* Prevents duplicate connections.
*/
private _socketPromise: Promise<WebSocket> | null = null;

/** Promise used to prevent duplicate connections. */
private _createSocketPromise: Promise<WebSocket> | null = null;
/**
* Whether the connection was closed via {@link closeConnection}.
*/
private _isManuallyClosed = false;

private _onMessageCallbacks: MessageCallback[] = [];
private _onConnectCallbacks: ConnectCallback[] = [];
Expand All @@ -64,20 +82,26 @@ export class WmClient {

/**
* Gets all monitors. {@link Monitor}
*
* @throws If connection to IPC server fails.
*/
async queryMonitors(): Promise<MonitorsResponse> {
return this._sendAndWaitReply<MonitorsResponse>('query monitors');
}

/**
* Gets all active workspaces. {@link Workspace}
*
* @throws If connection to IPC server fails.
*/
async queryWorkspaces(): Promise<WorkspacesResponse> {
return this._sendAndWaitReply<WorkspacesResponse>('query workspaces');
}

/**
* Gets all managed windows. {@link Window}
*
* @throws If connection to IPC server fails.
*/
async queryWindows(): Promise<WindowsResponse> {
return this._sendAndWaitReply<WindowsResponse>('query windows');
Expand All @@ -86,13 +110,17 @@ export class WmClient {
/**
* Gets the currently focused container. This can either be a
* {@link Window} or a {@link Workspace} without any descendant windows.
*
* @throws If connection to IPC server fails.
*/
async queryFocused(): Promise<FocusedResponse> {
return this._sendAndWaitReply<FocusedResponse>('query focused');
}

/**
* Gets the active binding modes.
*
* @throws If connection to IPC server fails.
*/
async queryBindingModes(): Promise<BindingModesResponse> {
return this._sendAndWaitReply<BindingModesResponse>(
Expand All @@ -102,6 +130,8 @@ export class WmClient {

/**
* Gets metadata about the running GlazeWM application.
*
* @throws If connection to IPC server fails.
*/
async queryAppMetadata(): Promise<AppMetadataResponse> {
return this._sendAndWaitReply<AppMetadataResponse>(
Expand All @@ -111,6 +141,8 @@ export class WmClient {

/**
* Gets the tiling direction of the focused container.
*
* @throws If connection to IPC server fails.
*/
async queryTilingDirection(): Promise<TilingDirectionResponse> {
return this._sendAndWaitReply<TilingDirectionResponse>(
Expand All @@ -124,7 +156,7 @@ export class WmClient {
* @param command WM command to run (e.g. `"focus --workspace 1"`).
* @param subjectContainerId (optional) ID of container to use as subject.
* If not provided, this defaults to the currently focused container.
* @throws If command fails.
* @throws If the command errors or connection to IPC server fails.
*/
async runCommand(
command: string,
Expand All @@ -140,37 +172,35 @@ export class WmClient {
/**
* Establishes websocket connection.
*
* @throws If connection attempt fails.
* @throws If connection to IPC server fails.
*/
async connect(): Promise<void> {
if (!this._socket) {
const socketPromise =
this._createSocketPromise ??
(this._createSocketPromise = this._createSocket());

this._socket = await socketPromise;
}

this._isManuallyClosed = false;
this._socketPromise ??= this._createSocket();
await this._waitForConnection();
}

/**
* Closes the websocket connection.
*/
closeConnection(): void {
this._socket?.close();
async closeConnection(): Promise<void> {
this._isManuallyClosed = true;
(await this._socketPromise)?.close();
}

/**
* Registers a callback for a GlazeWM event type.
*
* Persists the subscription across reconnections.
*
* @example
* ```typescript
* const unlisten = await client.subscribe(
* WmEventType.FOCUS_CHANGED,
* (event: FocusChangedEvent) => { ... }
* });
* ```
* @throws If *initial* connection to IPC server fails.
*/
async subscribe<T extends WmEventType>(
event: T,
Expand All @@ -182,24 +212,24 @@ export class WmClient {
/**
* Registers a callback for multiple GlazeWM event types.
*
* Persists the subscription across reconnections.
*
* @example
* ```typescript
* const unlisten = await client.subscribeMany(
* [WmEventType.WORSPACE_ACTIVATED, WmEventType.WORSPACE_DEACTIVATED],
* (event: WorkspaceActivatedEvent | WorkspaceDeactivatedEvent) => { ... }
* );
* ```
* @throws If *initial* connection to IPC server fails.
*/
async subscribeMany<T extends WmEventType[]>(
events: T,
callback: SubscribeCallback<T[number]>,
): Promise<UnlistenFn> {
const { subscriptionId } =
await this._sendAndWaitReply<SubscribeResponse>(
`sub --events ${events.join(' ')}`,
);
let subscriptionId = await this._sendSubscribe(events);

const unlisten = this.onMessage(e => {
const unlistenMessage = this.onMessage(e => {
const serverMessage: ServerMessage<WmEventData> = JSON.parse(
e.data as string,
);
Expand All @@ -213,10 +243,15 @@ export class WmClient {
}
});

return async () => {
unlisten();
const unlistenConnect = this.onConnect(async e => {
this._sendUnsubscribe(subscriptionId);
subscriptionId = await this._sendSubscribe(events);
});

await this._sendAndWaitReply<void>(`unsub --id ${subscriptionId}`);
return async () => {
unlistenMessage();
unlistenConnect();
await this._sendUnsubscribe(subscriptionId);
};
}

Expand All @@ -233,7 +268,7 @@ export class WmClient {
}

/**
* Registers a callback for when the websocket connects.
* Registers a callback for when the websocket connects or reconnects.
*
* @example
* ```typescript
Expand Down Expand Up @@ -273,42 +308,47 @@ export class WmClient {
* Sends an IPC message and waits for a reply.
*
* @private
* @throws If message is invalid or IPC server is unable to handle the
* message.
* @throws If message is invalid or connection to IPC server fails.
*/
private async _sendAndWaitReply<T>(message: string): Promise<T> {
let unlisten: UnlistenFn;
if (this._isManuallyClosed) {
throw new Error(
'Websocket connection was closed via `closeConnection`.',
);
}

await this.connect();
const socket = await this._socketPromise!;

// Resolve when a reply comes in for the client message.
return new Promise<T>(async (resolve, reject) => {
try {
await this.connect();
this._socket!.send(message);
socket.send(message, error => {
if (error) {
reject(error);
}
});

unlisten = this.onMessage(e => {
const serverMessage: ServerMessage<T> = JSON.parse(
e.data as string,
);
const unlisten = this.onMessage(e => {
const serverMessage: ServerMessage<T> = JSON.parse(
e.data as string,
);

// Whether the incoming message is a reply to the client message.
const isReplyMessage =
serverMessage.messageType === 'client_response' &&
serverMessage.clientMessage === message;
// Whether the incoming message is a reply to the client message.
const isReplyMessage =
serverMessage.messageType === 'client_response' &&
serverMessage.clientMessage === message;

if (isReplyMessage && serverMessage.error) {
reject(
`Server reply to message '${message}' has error: ${serverMessage.error}`,
);
}
if (isReplyMessage) {
unlisten();

if (isReplyMessage) {
if (serverMessage.error) {
reject(serverMessage.error);
} else {
resolve(serverMessage.data as T);
}
});
} catch (err) {
reject(err);
}
}).finally(() => unlisten());
}
});
});
}

/**
Expand Down Expand Up @@ -364,28 +404,41 @@ export class WmClient {
socket.onerror = e =>
this._onErrorCallbacks.forEach(callback => callback(e));

socket.onclose = e =>
socket.onclose = e => {
this._onDisconnectCallbacks.forEach(callback => callback(e));

// Attempt to reconnect if not manually closed.
if (!this._isManuallyClosed) {
setTimeout(
() => this._socketPromise === this._createSocket(),
this._options?.reconnectInterval ??
this.DEFAULT_RECONNECT_INTERVAL,
);
}
};

return socket;
}

/**
* Waits for the websocket connection to be established.
*
* @private
* @throws On disconnect or close.
*/
private async _waitForConnection(): Promise<WebSocket> {
const socket = await this._socketPromise;

if (
!this._socket ||
this._socket.readyState === this._socket.CLOSED ||
this._socket.readyState === this._socket.CLOSING
!socket ||
socket.readyState === socket.CLOSED ||
socket.readyState === socket.CLOSING
) {
throw new Error('Websocket connection is closed.');
}

if (this._socket.readyState === this._socket.OPEN) {
return this._socket;
if (socket.readyState === socket.OPEN) {
return socket;
}

return new Promise<WebSocket>(async (resolve, reject) => {
Expand All @@ -396,7 +449,7 @@ export class WmClient {

const unlistenConnect = this.onConnect(() => {
cleanup();
resolve(this._socket!);
resolve(socket);
});

const unlistenDisconnect = this.onDisconnect(() => {
Expand All @@ -405,4 +458,25 @@ export class WmClient {
});
});
}

/**
* @private
* @throws If connection to IPC server fails.
*/
private async _sendSubscribe(events: WmEventType[]): Promise<string> {
const { subscriptionId } =
await this._sendAndWaitReply<SubscribeResponse>(
`sub --events ${events.join(' ')}`,
);

return subscriptionId;
}

/**
* @private
* @throws If connection to IPC server fails.
*/
private async _sendUnsubscribe(subscriptionId: string): Promise<void> {
await this._sendAndWaitReply<void>(`unsub --id ${subscriptionId}`);
}
}

0 comments on commit b22f5cb

Please sign in to comment.