Skip to content

Commit

Permalink
integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Dec 16, 2024
1 parent d2b2778 commit 9b88a86
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 138 deletions.
8 changes: 6 additions & 2 deletions Sources/Realtime/V2/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ public final class RealtimeClientV2: Sendable {
}

private func onClose(code: Int?, reason: String?) {
// TODO: implement
options.logger?.debug(
"WebSocket closed. Code: \(code?.description ?? "<none>"), Reason: \(reason ?? "<none>")")

reconnect()
}

private func reconnect() {
Expand Down Expand Up @@ -302,7 +305,8 @@ public final class RealtimeClientV2: Sendable {

switch event {
case .binary:
fatalError("Unsupported binary event")
self.options.logger?.error("Unsupported binary event received.")
break
case .text(let text):
let data = Data(text.utf8)
let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data)
Expand Down
284 changes: 148 additions & 136 deletions Tests/IntegrationTests/RealtimeIntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,31 @@

import ConcurrencyExtras
import CustomDump
import Helpers
import InlineSnapshotTesting
import PostgREST
import Supabase
import TestHelpers
import XCTest

@testable import Realtime

struct TestLogger: SupabaseLogger {
func log(message: SupabaseLogMessage) {
print(message.description)
}
}

final class RealtimeIntegrationTests: XCTestCase {

static let reconnectDelay: TimeInterval = 1

let realtime = RealtimeClientV2(
url: URL(string: "\(DotEnv.SUPABASE_URL)/realtime/v1")!,
options: RealtimeClientOptions(
headers: ["apikey": DotEnv.SUPABASE_ANON_KEY]
headers: ["apikey": DotEnv.SUPABASE_ANON_KEY],
reconnectDelay: reconnectDelay,
logger: TestLogger()
)
)

Expand All @@ -35,23 +48,26 @@ final class RealtimeIntegrationTests: XCTestCase {
}
}

func testBroadcast() async throws {
let expectation = expectation(description: "receivedBroadcastMessages")
expectation.expectedFulfillmentCount = 3
func testDisconnectByUser_shouldNotReconnect() async {
await realtime.connect()
XCTAssertEqual(realtime.status, .connected)

realtime.disconnect()

/// Wait for the reconnection delay
try? await Task.sleep(
nanoseconds: NSEC_PER_SEC * UInt64(Self.reconnectDelay) + 1)

XCTAssertEqual(realtime.status, .disconnected)
}

func testBroadcast() async throws {
let channel = realtime.channel("integration") {
$0.broadcast.receiveOwnBroadcasts = true
}

let receivedMessages = LockIsolated<[JSONObject]>([])

Task {
for await message in channel.broadcastStream(event: "test") {
receivedMessages.withValue {
$0.append(message)
}
expectation.fulfill()
}
let receivedMessagesTask = Task {
await channel.broadcastStream(event: "test").prefix(3).collect()
}

await Task.yield()
Expand All @@ -66,35 +82,38 @@ final class RealtimeIntegrationTests: XCTestCase {
try await channel.broadcast(event: "test", message: Message(value: 2))
try await channel.broadcast(event: "test", message: ["value": 3, "another_value": 42])

await fulfillment(of: [expectation], timeout: 0.5)
let receivedMessages = try await withTimeout(interval: 5) {
await receivedMessagesTask.value
}

expectNoDifference(
receivedMessages.value,
assertInlineSnapshot(of: receivedMessages, as: .json) {
"""
[
[
"event": "test",
"payload": [
"value": 1
],
"type": "broadcast",
],
[
"event": "test",
"payload": [
"value": 2
],
"type": "broadcast",
],
[
"event": "test",
"payload": [
"value": 3,
"another_value": 42,
],
"type": "broadcast",
],
{
"event" : "test",
"payload" : {
"value" : 1
},
"type" : "broadcast"
},
{
"event" : "test",
"payload" : {
"value" : 2
},
"type" : "broadcast"
},
{
"event" : "test",
"payload" : {
"another_value" : 42,
"value" : 3
},
"type" : "broadcast"
}
]
)
"""
}

await channel.unsubscribe()
}
Expand All @@ -118,18 +137,8 @@ final class RealtimeIntegrationTests: XCTestCase {
$0.broadcast.receiveOwnBroadcasts = true
}

let expectation = expectation(description: "presenceChange")
expectation.expectedFulfillmentCount = 4

let receivedPresenceChanges = LockIsolated<[any PresenceAction]>([])

Task {
for await presence in channel.presenceChange() {
receivedPresenceChanges.withValue {
$0.append(presence)
}
expectation.fulfill()
}
let receivedPresenceChangesTask = Task {
await channel.presenceChange().prefix(4).collect()
}

await Task.yield()
Expand All @@ -145,10 +154,12 @@ final class RealtimeIntegrationTests: XCTestCase {

await channel.untrack()

await fulfillment(of: [expectation], timeout: 0.5)
let receivedPresenceChanges = try await withTimeout(interval: 5) {
await receivedPresenceChangesTask.value
}

let joins = try receivedPresenceChanges.value.map { try $0.decodeJoins(as: UserState.self) }
let leaves = try receivedPresenceChanges.value.map { try $0.decodeLeaves(as: UserState.self) }
let joins = try receivedPresenceChanges.map { try $0.decodeJoins(as: UserState.self) }
let leaves = try receivedPresenceChanges.map { try $0.decodeLeaves(as: UserState.self) }
expectNoDifference(
joins,
[
Expand All @@ -172,86 +183,87 @@ final class RealtimeIntegrationTests: XCTestCase {
await channel.unsubscribe()
}

// FIXME: Test getting stuck
// func testPostgresChanges() async throws {
// let channel = realtime.channel("db-changes")
//
// let receivedInsertActions = Task {
// await channel.postgresChange(InsertAction.self, schema: "public").prefix(1).collect()
// }
//
// let receivedUpdateActions = Task {
// await channel.postgresChange(UpdateAction.self, schema: "public").prefix(1).collect()
// }
//
// let receivedDeleteActions = Task {
// await channel.postgresChange(DeleteAction.self, schema: "public").prefix(1).collect()
// }
//
// let receivedAnyActionsTask = Task {
// await channel.postgresChange(AnyAction.self, schema: "public").prefix(3).collect()
// }
//
// await Task.yield()
// await channel.subscribe()
//
// struct Entry: Codable, Equatable {
// let key: String
// let value: AnyJSON
// }
//
// let key = try await (
// db.from("key_value_storage")
// .insert(["key": AnyJSON.string(UUID().uuidString), "value": "value1"]).select().single()
// .execute().value as Entry
// ).key
// try await db.from("key_value_storage").update(["value": "value2"]).eq("key", value: key)
// .execute()
// try await db.from("key_value_storage").delete().eq("key", value: key).execute()
//
// let insertedEntries = try await receivedInsertActions.value.map {
// try $0.decodeRecord(
// as: Entry.self,
// decoder: JSONDecoder()
// )
// }
// let updatedEntries = try await receivedUpdateActions.value.map {
// try $0.decodeRecord(
// as: Entry.self,
// decoder: JSONDecoder()
// )
// }
// let deletedEntryIds = await receivedDeleteActions.value.compactMap {
// $0.oldRecord["key"]?.stringValue
// }
//
// expectNoDifference(insertedEntries, [Entry(key: key, value: "value1")])
// expectNoDifference(updatedEntries, [Entry(key: key, value: "value2")])
// expectNoDifference(deletedEntryIds, [key])
//
// let receivedAnyActions = await receivedAnyActionsTask.value
// XCTAssertEqual(receivedAnyActions.count, 3)
//
// if case let .insert(action) = receivedAnyActions[0] {
// let record = try action.decodeRecord(as: Entry.self, decoder: JSONDecoder())
// expectNoDifference(record, Entry(key: key, value: "value1"))
// } else {
// XCTFail("Expected a `AnyAction.insert` on `receivedAnyActions[0]`")
// }
//
// if case let .update(action) = receivedAnyActions[1] {
// let record = try action.decodeRecord(as: Entry.self, decoder: JSONDecoder())
// expectNoDifference(record, Entry(key: key, value: "value2"))
// } else {
// XCTFail("Expected a `AnyAction.update` on `receivedAnyActions[1]`")
// }
//
// if case let .delete(action) = receivedAnyActions[2] {
// expectNoDifference(key, action.oldRecord["key"]?.stringValue)
// } else {
// XCTFail("Expected a `AnyAction.delete` on `receivedAnyActions[2]`")
// }
//
// await channel.unsubscribe()
// }
func testPostgresChanges() async throws {
let channel = realtime.channel("db-changes")

let receivedInsertActions = Task {
await channel.postgresChange(InsertAction.self, schema: "public").prefix(1).collect()
}

let receivedUpdateActions = Task {
await channel.postgresChange(UpdateAction.self, schema: "public").prefix(1).collect()
}

let receivedDeleteActions = Task {
await channel.postgresChange(DeleteAction.self, schema: "public").prefix(1).collect()
}

let receivedAnyActionsTask = Task {
await channel.postgresChange(AnyAction.self, schema: "public").prefix(3).collect()
}

await Task.yield()
await channel.subscribe()

struct Entry: Codable, Equatable {
let key: String
let value: AnyJSON
}

// Wait until a system event for makind sure DB change listeners are set before making DB changes.
_ = await channel.system().first(where: { _ in true })

let key = try await
(db.from("key_value_storage")
.insert(["key": AnyJSON.string(UUID().uuidString), "value": "value1"]).select().single()
.execute().value as Entry).key
try await db.from("key_value_storage").update(["value": "value2"]).eq("key", value: key)
.execute()
try await db.from("key_value_storage").delete().eq("key", value: key).execute()

let insertedEntries = try await receivedInsertActions.value.map {
try $0.decodeRecord(
as: Entry.self,
decoder: JSONDecoder()
)
}
let updatedEntries = try await receivedUpdateActions.value.map {
try $0.decodeRecord(
as: Entry.self,
decoder: JSONDecoder()
)
}
let deletedEntryIds = await receivedDeleteActions.value.compactMap {
$0.oldRecord["key"]?.stringValue
}

expectNoDifference(insertedEntries, [Entry(key: key, value: "value1")])
expectNoDifference(updatedEntries, [Entry(key: key, value: "value2")])
expectNoDifference(deletedEntryIds, [key])

let receivedAnyActions = await receivedAnyActionsTask.value
XCTAssertEqual(receivedAnyActions.count, 3)

if case let .insert(action) = receivedAnyActions[0] {
let record = try action.decodeRecord(as: Entry.self, decoder: JSONDecoder())
expectNoDifference(record, Entry(key: key, value: "value1"))
} else {
XCTFail("Expected a `AnyAction.insert` on `receivedAnyActions[0]`")
}

if case let .update(action) = receivedAnyActions[1] {
let record = try action.decodeRecord(as: Entry.self, decoder: JSONDecoder())
expectNoDifference(record, Entry(key: key, value: "value2"))
} else {
XCTFail("Expected a `AnyAction.update` on `receivedAnyActions[1]`")
}

if case let .delete(action) = receivedAnyActions[2] {
expectNoDifference(key, action.oldRecord["key"]?.stringValue)
} else {
XCTFail("Expected a `AnyAction.delete` on `receivedAnyActions[2]`")
}

await channel.unsubscribe()
}
}

0 comments on commit 9b88a86

Please sign in to comment.