Skip to content

Commit

Permalink
test: add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Mar 14, 2024
1 parent 966d095 commit 95c154f
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 143 deletions.
25 changes: 16 additions & 9 deletions Examples/SlackClone/AppView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,33 @@ final class AppViewModel {
@MainActor
struct AppView: View {
@Bindable var model: AppViewModel
let log = LogStore.shared

@State var logPresented = false

@ViewBuilder
var body: some View {
if model.session != nil {
NavigationSplitView {
ChannelListView(channel: $model.selectedChannel)
.toolbar {
ToolbarItem {
Button("Log") {
logPresented = true
}
}
}
} detail: {
if let channel = model.selectedChannel {
MessagesView(channel: channel).id(channel.id)
}
}
.overlay(alignment: .bottom) {
LabeledContent(
"Connection Status",
value: model.realtimeConnectionStatus?.description ?? "Unknown"
)
.padding()
.background(.regularMaterial)
.clipShape(Capsule())
.padding()
.sheet(isPresented: $logPresented) {
List {
ForEach(0 ..< log.messages.count, id: \.self) { i in
Text(log.messages[i].description)
}
}
}
} else {
AuthView()
Expand Down
8 changes: 7 additions & 1 deletion Examples/SlackClone/Logger.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ extension Logger {
static let main = Self(subsystem: "com.supabase.SlackClone", category: "app")
}

final class SupabaseLoggerImpl: SupabaseLogger, @unchecked Sendable {
@Observable
final class LogStore: SupabaseLogger {
private let lock = NSLock()
private var loggers: [String: Logger] = [:]

static let shared = LogStore()

var messages: [SupabaseLogMessage] = []

func log(message: SupabaseLogMessage) {
messages.append(message)
lock.withLock {
if loggers[message.system] == nil {
loggers[message.system] = Logger(
Expand Down
4 changes: 2 additions & 2 deletions Examples/SlackClone/Supabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ let decoder: JSONDecoder = {
}()

let supabase = SupabaseClient(
supabaseURL: URL(string: "http://192.168.0.4:54321")!,
supabaseURL: URL(string: "http://192.168.0.6:54321")!,
supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0",
options: SupabaseClientOptions(
db: .init(encoder: encoder, decoder: decoder),
global: SupabaseClientOptions.GlobalOptions(logger: SupabaseLoggerImpl())
global: SupabaseClientOptions.GlobalOptions(logger: LogStore.shared)
)
)
157 changes: 77 additions & 80 deletions Sources/Realtime/V2/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,19 @@ public actor RealtimeClientV2 {
}
}

let config: Configuration
let ws: any WebSocketClient

var accessToken: String?
var ref = 0
var pendingHeartbeatRef: Int?

var heartbeatTask: Task<Void, Never>?
var messageTask: Task<Void, Never>?
var inFlightConnectionTask: Task<Void, Never>?
var connectionTask: Task<Void, Never>?

public private(set) var subscriptions: [String: RealtimeChannelV2] = [:]

let config: Configuration
let ws: any WebSocketClient

private let statusEventEmitter = EventEmitter<Status>(initialEvent: .disconnected)

public var statusChange: AsyncStream<Status> {
Expand All @@ -93,22 +94,8 @@ public actor RealtimeClientV2 {
statusEventEmitter.attach(listener)
}

deinit {
heartbeatTask?.cancel()
messageTask?.cancel()
subscriptions = [:]
}

public init(config: Configuration) {
let sessionConfiguration = URLSessionConfiguration.default
sessionConfiguration.httpAdditionalHeaders = config.headers
let ws = DefaultWebSocketClient(
realtimeURL: config.realtimeWebSocketURL,
configuration: sessionConfiguration,
logger: config.logger
)

self.init(config: config, ws: ws)
self.init(config: config, ws: WebSocket(config: config))
}

init(config: Configuration, ws: any WebSocketClient) {
Expand All @@ -122,68 +109,86 @@ public actor RealtimeClientV2 {
}
}

public func connect() async {
guard status != .connected else {
return
}
deinit {
heartbeatTask?.cancel()
messageTask?.cancel()
subscriptions = [:]
}

public func connect() async {
await connect(reconnect: false)
}

func connect(reconnect: Bool) async {
if let inFlightConnectionTask {
return await inFlightConnectionTask.value
}
if status == .disconnected {
connectionTask = Task {
if reconnect {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.reconnectDelay))

inFlightConnectionTask = Task { [self] in
defer { inFlightConnectionTask = nil }

if reconnect {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.reconnectDelay))
if Task.isCancelled {
config.logger?.debug("Reconnect cancelled, returning")
return
}
}

if Task.isCancelled {
config.logger?.debug("reconnect cancelled, returning")
if status == .connected {
config.logger?.debug("WebsSocket already connected")
return
}
}

if status == .connected {
config.logger?.debug("Websocket already connected")
return
}
status = .connecting

status = .connecting

Task {
for await connectionStatus in ws.connect() {
if Task.isCancelled {
break
}

switch connectionStatus {
case .open:
status = .connected
config.logger?.debug("Connected to realtime WebSocket")
listenForMessages()
startHeartbeating()
if reconnect {
await rejoinChannels()
}

case .close:
config.logger?.debug("WebSocket connection closed. Trying again in \(config.reconnectDelay) seconds.")
disconnect()
await connect(reconnect: true)

case .complete(let error):
config.logger?.error(
"WebSocket connection error: \(error?.localizedDescription ?? "<none>")"
)
disconnect()
case .connected:
await onConnected(reconnect: reconnect)

case .disconnected:
await onDisconnected()

case let .error(error):
await onError(error)
}
}
}
}

_ = await statusChange.first { @Sendable in $0 == .connected }
_ = await statusChange.first { @Sendable in $0 == .connected }
}

private func onConnected(reconnect: Bool) async {
status = .connected
config.logger?.debug("Connected to realtime WebSocket")
listenForMessages()
startHeartbeating()
if reconnect {
await rejoinChannels()
}
}

await inFlightConnectionTask?.value
private func onDisconnected() async {
config.logger?
.debug(
"WebSocket disconnected. Trying again in \(config.reconnectDelay)"
)
await reconnect()
}

private func onError(_ error: (any Error)?) async {
config.logger?
.debug(
"WebSocket error \(error?.localizedDescription ?? "<none>"). Trying again in \(config.reconnectDelay)"
)
await reconnect()
}

private func reconnect() async {
disconnect()
await connect(reconnect: true)
}

public func channel(
Expand Down Expand Up @@ -222,14 +227,8 @@ public actor RealtimeClientV2 {
}

private func rejoinChannels() async {
await withTaskGroup(of: Void.self) { group in
for channel in subscriptions.values {
_ = group.addTaskUnlessCancelled {
await channel.subscribe()
}

await group.waitForAll()
}
for channel in subscriptions.values {
await channel.subscribe()
}
}

Expand All @@ -249,32 +248,29 @@ public actor RealtimeClientV2 {
config.logger?.debug(
"Error while listening for messages. Trying again in \(config.reconnectDelay) \(error)"
)
await disconnect()
await connect(reconnect: true)
await reconnect()
}
}
}

private func startHeartbeating() {
heartbeatTask = Task { [weak self] in
guard let self else { return }

heartbeatTask = Task { [weak self, config] in
while !Task.isCancelled {
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(config.heartbeatInterval))
if Task.isCancelled {
break
}
await sendHeartbeat()
await self?.sendHeartbeat()
}
}
}

private func sendHeartbeat() async {
if pendingHeartbeatRef != nil {
pendingHeartbeatRef = nil
config.logger?.debug("Heartbeat timeout. Trying to reconnect in \(config.reconnectDelay)")
disconnect()
await connect(reconnect: true)
config.logger?.debug("Heartbeat timeout")

await reconnect()
return
}

Expand All @@ -296,7 +292,8 @@ public actor RealtimeClientV2 {
ref = 0
messageTask?.cancel()
heartbeatTask?.cancel()
ws.cancel()
connectionTask?.cancel()
ws.disconnect()
status = .disconnected
}

Expand Down
Loading

0 comments on commit 95c154f

Please sign in to comment.