Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Empty Bearer Token + Upstream changes #85

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 62 additions & 47 deletions lib/src/socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ class PhoenixSocket {
/// endpoint is the full url to which you wish to connect
/// e.g. `ws://localhost:4000/websocket/socket`
PhoenixSocket(
/// The URL of the Phoenix server.
String endpoint, {
/// The options used when initiating and maintaining the
/// websocket connection.
PhoenixSocketOptions? socketOptions,

/// The factory to use to create the WebSocketChannel.
WebSocketChannel Function(Uri uri)? webSocketChannelFactory,
}) : _endpoint = endpoint,
/// The URL of the Phoenix server.
String endpoint, {
/// The options used when initiating and maintaining the
/// websocket connection.
PhoenixSocketOptions? socketOptions,

/// The factory to use to create the WebSocketChannel.
WebSocketChannel Function(Uri uri)? webSocketChannelFactory,
}) : _endpoint = endpoint,
_socketState = SocketState.unknown,
_webSocketChannelFactory = webSocketChannelFactory {
_options = socketOptions ?? PhoenixSocketOptions();
Expand Down Expand Up @@ -85,9 +85,9 @@ class PhoenixSocket {
final Map<String, Stream<Message>> _topicStreams = {};

final BehaviorSubject<PhoenixSocketEvent> _stateStreamController =
BehaviorSubject();
BehaviorSubject();
final StreamController<String> _receiveStreamController =
StreamController.broadcast();
StreamController.broadcast();
final String _endpoint;
final StreamController<Message> _topicMessages = StreamController();
final WebSocketChannel Function(Uri uri)? _webSocketChannelFactory;
Expand Down Expand Up @@ -187,49 +187,64 @@ class PhoenixSocket {
}

_mountPoint = await _buildMountPoint(_endpoint, _options);
_logger.finest(() => 'Attempting to connect to $_mountPoint');

try {
_ws = _webSocketChannelFactory != null
? _webSocketChannelFactory!(_mountPoint)
: WebSocketChannel.connect(_mountPoint);

_ws!.stream
.where(_shouldPipeMessage)
.listen(_onSocketData, cancelOnError: true)
..onError(_onSocketError)
..onDone(_onSocketClosed);
} catch (error, stacktrace) {
_onSocketError(error, stacktrace);
}
// workaround to check the existing bearer token
final token = _mountPoint.queryParameters["token"];

_reconnectAttempts++;
_socketState = SocketState.connecting;
if(token != null && token.length > 1) {
_logger.finest(() => 'Attempting to connect to $_mountPoint');

try {
// Wait for the WebSocket to be ready before continuing. In case of a
// failure to connect, the future will complete with an error and will be
// caught.
await _ws!.ready;

_socketState = SocketState.connected;

_logger.finest('Waiting for initial heartbeat roundtrip');
if (await _sendHeartbeat(ignorePreviousHeartbeat: true)) {
_stateStreamController.add(PhoenixSocketOpenEvent());
_logger.info('Socket open');
completer.complete(this);
} else {
throw PhoenixException();
try {
_ws = _webSocketChannelFactory != null
? _webSocketChannelFactory!(_mountPoint)
: WebSocketChannel.connect(_mountPoint);

_ws!.stream
.where(_shouldPipeMessage)
.listen(_onSocketData, cancelOnError: true)
..onError(_onSocketError)
..onDone(_onSocketClosed);
} catch (error, stacktrace) {
_onSocketError(error, stacktrace);
}
} catch (err, stackTrace) {
_logger.severe('Raised Exception', err, stackTrace);

_reconnectAttempts++;
_socketState = SocketState.connecting;

try {
// Wait for the WebSocket to be ready before continuing. In case of a
// failure to connect, the future will complete with an error and will be
// caught.
await _ws!.ready;

_socketState = SocketState.connected;

_logger.finest('Waiting for initial heartbeat roundtrip');
if (await _sendHeartbeat(ignorePreviousHeartbeat: true)) {
_stateStreamController.add(PhoenixSocketOpenEvent());
_logger.info('Socket open');
completer.complete(this);
} else {
throw PhoenixException();
} // else
} catch (err, stackTrace) {
_logger.severe('Raised Exception', err, stackTrace);

_ws = null;
_socketState = SocketState.closed;

completer.complete(_delayedReconnect());
} // catch
} // if
else {
// without a bearer token we don't do anything and start the retry loop
_logger.severe('Invalid bearer token: "$token"');
_stateStreamController.add(PhoenixSocketErrorEvent(error: "Invalid bearer token", stacktrace: null));
_ws = null;
_socketState = SocketState.closed;

_reconnectAttempts++;
completer.complete(_delayedReconnect());
}
} // else
}

/// Attempts to make a WebSocket connection to the Phoenix backend.
Expand Down Expand Up @@ -565,7 +580,7 @@ class PhoenixSocket {
return;
} else {
_logger.info(
() => 'Socket closed with reason ${ev.reason} and code ${ev.code}',
() => 'Socket closed with reason ${ev.reason} and code ${ev.code}',
);
_triggerChannelExceptions(exc);
}
Expand Down
Loading