From 3aa6b4031b47b2d13b2730f1a1831a1d9fc5b8e8 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 30 Oct 2023 07:04:17 -0300 Subject: [PATCH] Realtime (#126) * Bring SwiftPhoenixClient to Realtime * Sync postgrest changes between client and server * Add track, untrack and send methods to channel * Start adding transformer and implementing trigger method * Refactoring Example project * Adding channel filter * Fix bindings sync between server and client * Set access token * Rollback Examples * Format Realtime package * Clean up * Handle error when subscribing to channel * Remove unused files from example project * Remove Realtime tests --- .../xcshareddata/xcschemes/Realtime.xcscheme | 66 ++ Examples/Examples.xcodeproj/project.pbxproj | 165 +++++ Examples/Examples/AddTodoListView.swift | 3 +- Examples/Examples/AuthView.swift | 3 +- Examples/Examples/MFAFlow.swift | 12 +- .../AccentColor.colorset/Contents.json | 11 + .../AppIcon.appiconset/Contents.json | 58 ++ .../Assets.xcassets/Contents.json | 6 + Examples/RealtimeSample/ContentView.swift | 123 ++++ .../Preview Assets.xcassets/Contents.json | 6 + .../RealtimeSample.entitlements | 12 + .../RealtimeSample/RealtimeSampleApp.swift | 29 + Package.swift | 7 +- Sources/Realtime/Defaults.swift | 201 +---- Sources/Realtime/Delegated.swift | 8 +- Sources/Realtime/HeartbeatTimer.swift | 41 +- Sources/Realtime/Message.swift | 25 +- ...Transport.swift => PhoenixTransport.swift} | 132 ++-- Sources/Realtime/Presence.swift | 64 +- Sources/Realtime/Push.swift | 44 +- .../{Channel.swift => RealtimeChannel.swift} | 691 ++++++++++++------ Sources/Realtime/RealtimeClient.swift | 246 ++++--- Sources/Realtime/RealtimeError.swift | 16 + Sources/Realtime/SynchronizedArray.swift | 33 - Tests/RealtimeTests/ChannelTopicTests.swift | 19 - Tests/RealtimeTests/RealtimeTests.swift | 245 +++---- 26 files changed, 1396 insertions(+), 870 deletions(-) create mode 100644 .swiftpm/xcode/xcshareddata/xcschemes/Realtime.xcscheme create mode 100644 Examples/RealtimeSample/Assets.xcassets/AccentColor.colorset/Contents.json create mode 100644 Examples/RealtimeSample/Assets.xcassets/AppIcon.appiconset/Contents.json create mode 100644 Examples/RealtimeSample/Assets.xcassets/Contents.json create mode 100644 Examples/RealtimeSample/ContentView.swift create mode 100644 Examples/RealtimeSample/Preview Content/Preview Assets.xcassets/Contents.json create mode 100644 Examples/RealtimeSample/RealtimeSample.entitlements create mode 100644 Examples/RealtimeSample/RealtimeSampleApp.swift rename Sources/Realtime/{Transport.swift => PhoenixTransport.swift} (70%) rename Sources/Realtime/{Channel.swift => RealtimeChannel.swift} (54%) create mode 100644 Sources/Realtime/RealtimeError.swift delete mode 100644 Sources/Realtime/SynchronizedArray.swift delete mode 100644 Tests/RealtimeTests/ChannelTopicTests.swift diff --git a/.swiftpm/xcode/xcshareddata/xcschemes/Realtime.xcscheme b/.swiftpm/xcode/xcshareddata/xcschemes/Realtime.xcscheme new file mode 100644 index 00000000..b4487cb4 --- /dev/null +++ b/.swiftpm/xcode/xcshareddata/xcschemes/Realtime.xcscheme @@ -0,0 +1,66 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/Examples/Examples.xcodeproj/project.pbxproj b/Examples/Examples.xcodeproj/project.pbxproj index ab6a3112..887c2636 100644 --- a/Examples/Examples.xcodeproj/project.pbxproj +++ b/Examples/Examples.xcodeproj/project.pbxproj @@ -15,6 +15,11 @@ 79018AE42AE3F185006EA669 /* Routes.swift in Sources */ = {isa = PBXBuildFile; fileRef = 79018AE32AE3F185006EA669 /* Routes.swift */; }; 79018AE62AE3F1E4006EA669 /* Auth.swift in Sources */ = {isa = PBXBuildFile; fileRef = 79018AE52AE3F1E4006EA669 /* Auth.swift */; }; 79018AE82AE3F1F3006EA669 /* SignUpUseCase.swift in Sources */ = {isa = PBXBuildFile; fileRef = 79018AE72AE3F1F3006EA669 /* SignUpUseCase.swift */; }; + 790308E92AEE7B4D003C4A98 /* RealtimeSampleApp.swift in Sources */ = {isa = PBXBuildFile; fileRef = 790308E82AEE7B4D003C4A98 /* RealtimeSampleApp.swift */; }; + 790308EB2AEE7B4D003C4A98 /* ContentView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 790308EA2AEE7B4D003C4A98 /* ContentView.swift */; }; + 790308ED2AEE7B4E003C4A98 /* Assets.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = 790308EC2AEE7B4E003C4A98 /* Assets.xcassets */; }; + 790308F02AEE7B4E003C4A98 /* Preview Assets.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = 790308EF2AEE7B4E003C4A98 /* Preview Assets.xcassets */; }; + 790308F62AEE7B5B003C4A98 /* Realtime in Frameworks */ = {isa = PBXBuildFile; productRef = 790308F52AEE7B5B003C4A98 /* Realtime */; }; 793895CA2954ABFF0044F2B8 /* ExamplesApp.swift in Sources */ = {isa = PBXBuildFile; fileRef = 793895C92954ABFF0044F2B8 /* ExamplesApp.swift */; }; 793895CC2954ABFF0044F2B8 /* RootView.swift in Sources */ = {isa = PBXBuildFile; fileRef = 793895CB2954ABFF0044F2B8 /* RootView.swift */; }; 793895CE2954AC000044F2B8 /* Assets.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = 793895CD2954AC000044F2B8 /* Assets.xcassets */; }; @@ -66,6 +71,12 @@ 79018AE32AE3F185006EA669 /* Routes.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Routes.swift; sourceTree = ""; }; 79018AE52AE3F1E4006EA669 /* Auth.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Auth.swift; sourceTree = ""; }; 79018AE72AE3F1F3006EA669 /* SignUpUseCase.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SignUpUseCase.swift; sourceTree = ""; }; + 790308E62AEE7B4D003C4A98 /* RealtimeSample.app */ = {isa = PBXFileReference; explicitFileType = wrapper.application; includeInIndex = 0; path = RealtimeSample.app; sourceTree = BUILT_PRODUCTS_DIR; }; + 790308E82AEE7B4D003C4A98 /* RealtimeSampleApp.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RealtimeSampleApp.swift; sourceTree = ""; }; + 790308EA2AEE7B4D003C4A98 /* ContentView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ContentView.swift; sourceTree = ""; }; + 790308EC2AEE7B4E003C4A98 /* Assets.xcassets */ = {isa = PBXFileReference; lastKnownFileType = folder.assetcatalog; path = Assets.xcassets; sourceTree = ""; }; + 790308EF2AEE7B4E003C4A98 /* Preview Assets.xcassets */ = {isa = PBXFileReference; lastKnownFileType = folder.assetcatalog; path = "Preview Assets.xcassets"; sourceTree = ""; }; + 790308F12AEE7B4E003C4A98 /* RealtimeSample.entitlements */ = {isa = PBXFileReference; lastKnownFileType = text.plist.entitlements; path = RealtimeSample.entitlements; sourceTree = ""; }; 793895C62954ABFF0044F2B8 /* Examples.app */ = {isa = PBXFileReference; explicitFileType = wrapper.application; includeInIndex = 0; path = Examples.app; sourceTree = BUILT_PRODUCTS_DIR; }; 793895C92954ABFF0044F2B8 /* ExamplesApp.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ExamplesApp.swift; sourceTree = ""; }; 793895CB2954ABFF0044F2B8 /* RootView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RootView.swift; sourceTree = ""; }; @@ -107,6 +118,14 @@ /* End PBXFileReference section */ /* Begin PBXFrameworksBuildPhase section */ + 790308E32AEE7B4D003C4A98 /* Frameworks */ = { + isa = PBXFrameworksBuildPhase; + buildActionMask = 2147483647; + files = ( + 790308F62AEE7B5B003C4A98 /* Realtime in Frameworks */, + ); + runOnlyForDeploymentPostprocessing = 0; + }; 793895C32954ABFF0044F2B8 /* Frameworks */ = { isa = PBXFrameworksBuildPhase; buildActionMask = 2147483647; @@ -214,11 +233,32 @@ path = Routes; sourceTree = ""; }; + 790308E72AEE7B4D003C4A98 /* RealtimeSample */ = { + isa = PBXGroup; + children = ( + 790308E82AEE7B4D003C4A98 /* RealtimeSampleApp.swift */, + 790308EA2AEE7B4D003C4A98 /* ContentView.swift */, + 790308EC2AEE7B4E003C4A98 /* Assets.xcassets */, + 790308F12AEE7B4E003C4A98 /* RealtimeSample.entitlements */, + 790308EE2AEE7B4E003C4A98 /* Preview Content */, + ); + path = RealtimeSample; + sourceTree = ""; + }; + 790308EE2AEE7B4E003C4A98 /* Preview Content */ = { + isa = PBXGroup; + children = ( + 790308EF2AEE7B4E003C4A98 /* Preview Assets.xcassets */, + ); + path = "Preview Content"; + sourceTree = ""; + }; 793895BD2954ABFF0044F2B8 = { isa = PBXGroup; children = ( 793895C82954ABFF0044F2B8 /* Examples */, 79C591DA2AE0880F0088A9C8 /* ProductSample */, + 790308E72AEE7B4D003C4A98 /* RealtimeSample */, 793895C72954ABFF0044F2B8 /* Products */, 7956405A2954AC3E0088A06F /* Frameworks */, ); @@ -229,6 +269,7 @@ children = ( 793895C62954ABFF0044F2B8 /* Examples.app */, 79C591D92AE0880F0088A9C8 /* ProductSample.app */, + 790308E62AEE7B4D003C4A98 /* RealtimeSample.app */, ); name = Products; sourceTree = ""; @@ -329,6 +370,26 @@ /* End PBXGroup section */ /* Begin PBXNativeTarget section */ + 790308E52AEE7B4D003C4A98 /* RealtimeSample */ = { + isa = PBXNativeTarget; + buildConfigurationList = 790308F22AEE7B4E003C4A98 /* Build configuration list for PBXNativeTarget "RealtimeSample" */; + buildPhases = ( + 790308E22AEE7B4D003C4A98 /* Sources */, + 790308E32AEE7B4D003C4A98 /* Frameworks */, + 790308E42AEE7B4D003C4A98 /* Resources */, + ); + buildRules = ( + ); + dependencies = ( + ); + name = RealtimeSample; + packageProductDependencies = ( + 790308F52AEE7B5B003C4A98 /* Realtime */, + ); + productName = RealtimeSample; + productReference = 790308E62AEE7B4D003C4A98 /* RealtimeSample.app */; + productType = "com.apple.product-type.application"; + }; 793895C52954ABFF0044F2B8 /* Examples */ = { isa = PBXNativeTarget; buildConfigurationList = 793895D52954AC000044F2B8 /* Build configuration list for PBXNativeTarget "Examples" */; @@ -382,6 +443,9 @@ LastSwiftUpdateCheck = 1500; LastUpgradeCheck = 1500; TargetAttributes = { + 790308E52AEE7B4D003C4A98 = { + CreatedOnToolsVersion = 15.0.1; + }; 793895C52954ABFF0044F2B8 = { CreatedOnToolsVersion = 14.1; }; @@ -410,11 +474,21 @@ targets = ( 793895C52954ABFF0044F2B8 /* Examples */, 79C591D82AE0880F0088A9C8 /* ProductSample */, + 790308E52AEE7B4D003C4A98 /* RealtimeSample */, ); }; /* End PBXProject section */ /* Begin PBXResourcesBuildPhase section */ + 790308E42AEE7B4D003C4A98 /* Resources */ = { + isa = PBXResourcesBuildPhase; + buildActionMask = 2147483647; + files = ( + 790308F02AEE7B4E003C4A98 /* Preview Assets.xcassets in Resources */, + 790308ED2AEE7B4E003C4A98 /* Assets.xcassets in Resources */, + ); + runOnlyForDeploymentPostprocessing = 0; + }; 793895C42954ABFF0044F2B8 /* Resources */ = { isa = PBXResourcesBuildPhase; buildActionMask = 2147483647; @@ -437,6 +511,15 @@ /* End PBXResourcesBuildPhase section */ /* Begin PBXSourcesBuildPhase section */ + 790308E22AEE7B4D003C4A98 /* Sources */ = { + isa = PBXSourcesBuildPhase; + buildActionMask = 2147483647; + files = ( + 790308EB2AEE7B4D003C4A98 /* ContentView.swift in Sources */, + 790308E92AEE7B4D003C4A98 /* RealtimeSampleApp.swift in Sources */, + ); + runOnlyForDeploymentPostprocessing = 0; + }; 793895C22954ABFF0044F2B8 /* Sources */ = { isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; @@ -491,6 +574,75 @@ /* End PBXSourcesBuildPhase section */ /* Begin XCBuildConfiguration section */ + 790308F32AEE7B4E003C4A98 /* Debug */ = { + isa = XCBuildConfiguration; + buildSettings = { + ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon; + ASSETCATALOG_COMPILER_GENERATE_SWIFT_ASSET_SYMBOL_EXTENSIONS = YES; + ASSETCATALOG_COMPILER_GLOBAL_ACCENT_COLOR_NAME = AccentColor; + CODE_SIGN_ENTITLEMENTS = RealtimeSample/RealtimeSample.entitlements; + CODE_SIGN_STYLE = Automatic; + COMBINE_HIDPI_IMAGES = YES; + CURRENT_PROJECT_VERSION = 1; + DEVELOPMENT_ASSET_PATHS = "\"RealtimeSample/Preview Content\""; + DEVELOPMENT_TEAM = ELTTE7K8TT; + ENABLE_HARDENED_RUNTIME = YES; + ENABLE_PREVIEWS = YES; + GENERATE_INFOPLIST_FILE = YES; + INFOPLIST_KEY_NSHumanReadableCopyright = ""; + LD_RUNPATH_SEARCH_PATHS = ( + "$(inherited)", + "@executable_path/../Frameworks", + ); + LOCALIZATION_PREFERS_STRING_CATALOGS = YES; + MACOSX_DEPLOYMENT_TARGET = 14.0; + MARKETING_VERSION = 1.0; + PRODUCT_BUNDLE_IDENTIFIER = dev.grds.RealtimeSample; + PRODUCT_NAME = "$(TARGET_NAME)"; + SDKROOT = macosx; + SUPPORTED_PLATFORMS = "iphoneos iphonesimulator macosx"; + SUPPORTS_MACCATALYST = NO; + SWIFT_ACTIVE_COMPILATION_CONDITIONS = "DEBUG $(inherited)"; + SWIFT_EMIT_LOC_STRINGS = YES; + SWIFT_VERSION = 5.0; + TARGETED_DEVICE_FAMILY = 1; + }; + name = Debug; + }; + 790308F42AEE7B4E003C4A98 /* Release */ = { + isa = XCBuildConfiguration; + buildSettings = { + ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon; + ASSETCATALOG_COMPILER_GENERATE_SWIFT_ASSET_SYMBOL_EXTENSIONS = YES; + ASSETCATALOG_COMPILER_GLOBAL_ACCENT_COLOR_NAME = AccentColor; + CODE_SIGN_ENTITLEMENTS = RealtimeSample/RealtimeSample.entitlements; + CODE_SIGN_STYLE = Automatic; + COMBINE_HIDPI_IMAGES = YES; + CURRENT_PROJECT_VERSION = 1; + DEVELOPMENT_ASSET_PATHS = "\"RealtimeSample/Preview Content\""; + DEVELOPMENT_TEAM = ELTTE7K8TT; + ENABLE_HARDENED_RUNTIME = YES; + ENABLE_PREVIEWS = YES; + GENERATE_INFOPLIST_FILE = YES; + INFOPLIST_KEY_NSHumanReadableCopyright = ""; + LD_RUNPATH_SEARCH_PATHS = ( + "$(inherited)", + "@executable_path/../Frameworks", + ); + LOCALIZATION_PREFERS_STRING_CATALOGS = YES; + MACOSX_DEPLOYMENT_TARGET = 14.0; + MARKETING_VERSION = 1.0; + PRODUCT_BUNDLE_IDENTIFIER = dev.grds.RealtimeSample; + PRODUCT_NAME = "$(TARGET_NAME)"; + SDKROOT = macosx; + SUPPORTED_PLATFORMS = "iphoneos iphonesimulator macosx"; + SUPPORTS_MACCATALYST = NO; + SWIFT_EMIT_LOC_STRINGS = YES; + SWIFT_VERSION = 5.0; + TARGETED_DEVICE_FAMILY = 1; + }; + name = Release; + }; 793895D32954AC000044F2B8 /* Debug */ = { isa = XCBuildConfiguration; buildSettings = { @@ -713,6 +865,15 @@ /* End XCBuildConfiguration section */ /* Begin XCConfigurationList section */ + 790308F22AEE7B4E003C4A98 /* Build configuration list for PBXNativeTarget "RealtimeSample" */ = { + isa = XCConfigurationList; + buildConfigurations = ( + 790308F32AEE7B4E003C4A98 /* Debug */, + 790308F42AEE7B4E003C4A98 /* Release */, + ); + defaultConfigurationIsVisible = 0; + defaultConfigurationName = Release; + }; 793895C12954ABFF0044F2B8 /* Build configuration list for PBXProject "Examples" */ = { isa = XCConfigurationList; buildConfigurations = ( @@ -770,6 +931,10 @@ /* End XCRemoteSwiftPackageReference section */ /* Begin XCSwiftPackageProductDependency section */ + 790308F52AEE7B5B003C4A98 /* Realtime */ = { + isa = XCSwiftPackageProductDependency; + productName = Realtime; + }; 7956406C2955B3500088A06F /* SwiftUINavigation */ = { isa = XCSwiftPackageProductDependency; package = 7956406B2955B3500088A06F /* XCRemoteSwiftPackageReference "swiftui-navigation" */; diff --git a/Examples/Examples/AddTodoListView.swift b/Examples/Examples/AddTodoListView.swift index d4eb5630..f6033149 100644 --- a/Examples/Examples/AddTodoListView.swift +++ b/Examples/Examples/AddTodoListView.swift @@ -42,7 +42,8 @@ struct AddTodoListView_Previews: PreviewProvider { description: "", isComplete: false, ownerID: UUID() - )) + ) + ) ) { _ in } } diff --git a/Examples/Examples/AuthView.swift b/Examples/Examples/AuthView.swift index 5e6d8537..2809efd6 100644 --- a/Examples/Examples/AuthView.swift +++ b/Examples/Examples/AuthView.swift @@ -86,7 +86,8 @@ struct AuthView: View { try await supabase.auth.signIn(email: email, password: password) case .signUp: try await supabase.auth.signUp( - email: email, password: password, redirectTo: URL(string: "com.supabase.Examples://")!) + email: email, password: password, redirectTo: URL(string: "com.supabase.Examples://")! + ) } } catch { withAnimation { diff --git a/Examples/Examples/MFAFlow.swift b/Examples/Examples/MFAFlow.swift index c89d88b0..e7982c07 100644 --- a/Examples/Examples/MFAFlow.swift +++ b/Examples/Examples/MFAFlow.swift @@ -5,8 +5,8 @@ // Created by Guilherme Souza on 27/10/23. // -import SVGView import Supabase +import SVGView import SwiftUI enum MFAStatus { @@ -104,7 +104,8 @@ struct MFAEnrollView: View { Task { do { try await supabase.auth.mfa.challengeAndVerify( - params: MFAChallengeAndVerifyParams(factorId: enrollResponse!.id, code: verificationCode)) + params: MFAChallengeAndVerifyParams(factorId: enrollResponse!.id, code: verificationCode) + ) } catch { self.error = error } @@ -158,7 +159,8 @@ struct MFAVerifyView: View { } try await supabase.auth.mfa.challengeAndVerify( - params: MFAChallengeAndVerifyParams(factorId: totpFactor.id, code: verificationCode)) + params: MFAChallengeAndVerifyParams(factorId: totpFactor.id, code: verificationCode) + ) } catch { self.error = error } @@ -190,9 +192,7 @@ struct MFAVerifiedView: View { for factor in factorsToRemove { try await supabase.auth.mfa.unenroll(params: MFAUnenrollParams(factorId: factor.id)) } - } catch { - - } + } catch {} } } } diff --git a/Examples/RealtimeSample/Assets.xcassets/AccentColor.colorset/Contents.json b/Examples/RealtimeSample/Assets.xcassets/AccentColor.colorset/Contents.json new file mode 100644 index 00000000..eb878970 --- /dev/null +++ b/Examples/RealtimeSample/Assets.xcassets/AccentColor.colorset/Contents.json @@ -0,0 +1,11 @@ +{ + "colors" : [ + { + "idiom" : "universal" + } + ], + "info" : { + "author" : "xcode", + "version" : 1 + } +} diff --git a/Examples/RealtimeSample/Assets.xcassets/AppIcon.appiconset/Contents.json b/Examples/RealtimeSample/Assets.xcassets/AppIcon.appiconset/Contents.json new file mode 100644 index 00000000..3f00db43 --- /dev/null +++ b/Examples/RealtimeSample/Assets.xcassets/AppIcon.appiconset/Contents.json @@ -0,0 +1,58 @@ +{ + "images" : [ + { + "idiom" : "mac", + "scale" : "1x", + "size" : "16x16" + }, + { + "idiom" : "mac", + "scale" : "2x", + "size" : "16x16" + }, + { + "idiom" : "mac", + "scale" : "1x", + "size" : "32x32" + }, + { + "idiom" : "mac", + "scale" : "2x", + "size" : "32x32" + }, + { + "idiom" : "mac", + "scale" : "1x", + "size" : "128x128" + }, + { + "idiom" : "mac", + "scale" : "2x", + "size" : "128x128" + }, + { + "idiom" : "mac", + "scale" : "1x", + "size" : "256x256" + }, + { + "idiom" : "mac", + "scale" : "2x", + "size" : "256x256" + }, + { + "idiom" : "mac", + "scale" : "1x", + "size" : "512x512" + }, + { + "idiom" : "mac", + "scale" : "2x", + "size" : "512x512" + } + ], + "info" : { + "author" : "xcode", + "version" : 1 + } +} diff --git a/Examples/RealtimeSample/Assets.xcassets/Contents.json b/Examples/RealtimeSample/Assets.xcassets/Contents.json new file mode 100644 index 00000000..73c00596 --- /dev/null +++ b/Examples/RealtimeSample/Assets.xcassets/Contents.json @@ -0,0 +1,6 @@ +{ + "info" : { + "author" : "xcode", + "version" : 1 + } +} diff --git a/Examples/RealtimeSample/ContentView.swift b/Examples/RealtimeSample/ContentView.swift new file mode 100644 index 00000000..45f88b06 --- /dev/null +++ b/Examples/RealtimeSample/ContentView.swift @@ -0,0 +1,123 @@ +// +// ContentView.swift +// RealtimeSample +// +// Created by Guilherme Souza on 29/10/23. +// + +import Realtime +import SwiftUI + +struct ContentView: View { + @State var inserts: [Message] = [] + @State var updates: [Message] = [] + @State var deletes: [Message] = [] + + @State var socketStatus: String? + @State var channelStatus: String? + + @State var publicSchema: RealtimeChannel? + + var body: some View { + List { + Section("INSERTS") { + ForEach(Array(zip(inserts.indices, inserts)), id: \.0) { _, message in + Text(message.stringfiedPayload()) + } + } + + Section("UPDATES") { + ForEach(Array(zip(updates.indices, updates)), id: \.0) { _, message in + Text(message.stringfiedPayload()) + } + } + + Section("DELETES") { + ForEach(Array(zip(deletes.indices, deletes)), id: \.0) { _, message in + Text(message.stringfiedPayload()) + } + } + } + .overlay(alignment: .bottomTrailing) { + VStack(alignment: .leading) { + Toggle( + "Toggle Subscription", + isOn: Binding(get: { publicSchema?.isJoined == true }, set: { _ in toggleSubscription() }) + ) + Text("Socket: \(socketStatus ?? "")") + Text("Channel: \(channelStatus ?? "")") + } + .padding() + .background(.regularMaterial) + .padding() + } + .onAppear { + createSubscription() + + socket.connect() + socket.onOpen { + socketStatus = "OPEN" + } + socket.onClose { + socketStatus = "CLOSE" + } + socket.onError { error, _ in + socketStatus = "ERROR: \(error.localizedDescription)" + } + } + } + + func createSubscription() { + publicSchema = socket.channel("public") + .on("postgres_changes", filter: ChannelFilter(event: "INSERT", schema: "public")) { + inserts.append($0) + } + .on("postgres_changes", filter: ChannelFilter(event: "UPDATE", schema: "public")) { + updates.append($0) + } + .on("postgres_changes", filter: ChannelFilter(event: "DELETE", schema: "public")) { + deletes.append($0) + } + + publicSchema?.onError { _ in channelStatus = "ERROR" } + publicSchema?.onClose { _ in channelStatus = "Closed gracefully" } + publicSchema? + .subscribe { state, _ in + switch state { + case .subscribed: + channelStatus = "OK" + case .closed: + channelStatus = "CLOSED" + case .timedOut: + channelStatus = "Timed out" + case .channelError: + channelStatus = "ERROR" + } + } + } + + func toggleSubscription() { + if publicSchema?.isJoined == true { + publicSchema?.unsubscribe() + } else { + createSubscription() + } + } +} + +extension Message { + func stringfiedPayload() -> String { + do { + let data = try JSONSerialization.data( + withJSONObject: payload, options: [.prettyPrinted, .sortedKeys] + ) + return String(data: data, encoding: .utf8) ?? "" + } catch { + return "" + } + } +} + +#Preview { + ContentView() +} diff --git a/Examples/RealtimeSample/Preview Content/Preview Assets.xcassets/Contents.json b/Examples/RealtimeSample/Preview Content/Preview Assets.xcassets/Contents.json new file mode 100644 index 00000000..73c00596 --- /dev/null +++ b/Examples/RealtimeSample/Preview Content/Preview Assets.xcassets/Contents.json @@ -0,0 +1,6 @@ +{ + "info" : { + "author" : "xcode", + "version" : 1 + } +} diff --git a/Examples/RealtimeSample/RealtimeSample.entitlements b/Examples/RealtimeSample/RealtimeSample.entitlements new file mode 100644 index 00000000..625af03d --- /dev/null +++ b/Examples/RealtimeSample/RealtimeSample.entitlements @@ -0,0 +1,12 @@ + + + + + com.apple.security.app-sandbox + + com.apple.security.files.user-selected.read-only + + com.apple.security.network.client + + + diff --git a/Examples/RealtimeSample/RealtimeSampleApp.swift b/Examples/RealtimeSample/RealtimeSampleApp.swift new file mode 100644 index 00000000..c69e19eb --- /dev/null +++ b/Examples/RealtimeSample/RealtimeSampleApp.swift @@ -0,0 +1,29 @@ +// +// RealtimeSampleApp.swift +// RealtimeSample +// +// Created by Guilherme Souza on 29/10/23. +// + +import Realtime +import SwiftUI + +@main +struct RealtimeSampleApp: App { + var body: some Scene { + WindowGroup { + ContentView() + } + } +} + +let socket: RealtimeClient = { + let client = RealtimeClient( + "https://PROJECT_ID.supabase.co/realtime/v1", + params: [ + "apikey": "SUPABASE_ANON_KEY" + ] + ) + client.logger = { print($0) } + return client +}() diff --git a/Package.swift b/Package.swift index 2534ae83..78f1f025 100644 --- a/Package.swift +++ b/Package.swift @@ -21,7 +21,8 @@ let package = Package( .library(name: "Storage", targets: ["Storage"]), .library( name: "Supabase", - targets: ["Supabase", "Functions", "PostgREST", "GoTrue", "Realtime", "Storage"]), + targets: ["Supabase", "Functions", "PostgREST", "GoTrue", "Realtime", "Storage"] + ), ], dependencies: [ .package(url: "https://github.com/kishikawakatsumi/KeychainAccess", from: "4.2.2"), @@ -59,7 +60,7 @@ let package = Package( exclude: ["__Snapshots__"] ), .testTarget(name: "PostgRESTIntegrationTests", dependencies: ["PostgREST"]), - .target(name: "Realtime"), + .target(name: "Realtime", dependencies: ["_Helpers"]), .testTarget(name: "RealtimeTests", dependencies: ["Realtime"]), .target(name: "Storage", dependencies: ["_Helpers"]), .testTarget(name: "StorageTests", dependencies: ["Storage"]), @@ -79,6 +80,6 @@ let package = Package( for target in package.targets where !target.isTest { target.swiftSettings = [ - .enableUpcomingFeature("StrictConcurrency=complete") + .enableUpcomingFeature("StrictConcurrency=complete"), ] } diff --git a/Sources/Realtime/Defaults.swift b/Sources/Realtime/Defaults.swift index bfc4e55d..b6f3e6c9 100644 --- a/Sources/Realtime/Defaults.swift +++ b/Sources/Realtime/Defaults.swift @@ -28,7 +28,8 @@ public enum Defaults { /// Default interval to send heartbeats on public static let heartbeatInterval: TimeInterval = 30.0 - /// Default maximum amount of time which the system may delay heartbeat events in order to minimize power usage + /// Default maximum amount of time which the system may delay heartbeat events in order to + /// minimize power usage public static let heartbeatLeeway: DispatchTimeInterval = .milliseconds(10) /// Default reconnect algorithm for the socket @@ -43,28 +44,20 @@ public enum Defaults { public static let vsn = "2.0.0" - /// Default encoder - public static let encoder: JSONEncoder = JSONEncoder() - /// Default encode function, utilizing JSONSerialization.data public static let encode: (Any) -> Data = { json in - assert(JSONSerialization.isValidJSONObject(json), "Invalid JSON object") - return - try! JSONSerialization + try! JSONSerialization .data( withJSONObject: json, options: JSONSerialization.WritingOptions() ) } - /// Default decoder - public static let decoder: JSONDecoder = JSONDecoder() - /// Default decode function, utilizing JSONSerialization.jsonObject public static let decode: (Data) -> Any? = { data in guard let json = - try? JSONSerialization + try? JSONSerialization .jsonObject( with: data, options: JSONSerialization.ReadingOptions() @@ -74,7 +67,8 @@ public enum Defaults { } public static let heartbeatQueue: DispatchQueue = .init( - label: "com.phoenix.socket.heartbeat") + label: "com.phoenix.socket.heartbeat" + ) } /// Represents the multiple states that a Channel can be in @@ -88,174 +82,23 @@ public enum ChannelState: String { } /// Represents the different events that can be sent through -/// a channel regarding a Channel's lifecycle or -/// that can be registered to be notified of. -public enum ChannelEvent: RawRepresentable { - public enum Presence: String { - case state - case diff - } - - case heartbeat - case join - case leave - case reply - case error - case close - - case all - case insert - case update - case delete - - case channelReply(String) - - case broadcast - - case presence - case presenceState - case presenceDiff - - public var rawValue: String { - switch self { - case .heartbeat: return "heartbeat" - case .join: return "phx_join" - case .leave: return "phx_leave" - case .reply: return "phx_reply" - case .error: return "phx_error" - case .close: return "phx_close" - - case .all: return "*" - case .insert: return "insert" - case .update: return "update" - case .delete: return "delete" - - case let .channelReply(reference): return "chan_reply_\(reference)" - - case .broadcast: return "broadcast" - - case .presence: return "presence" - case .presenceState: return "presence_state" - case .presenceDiff: return "presence_diff" - } - } - - public init?(rawValue: String) { - switch rawValue.lowercased() { - case "heartbeat": self = .heartbeat - case "phx_join": self = .join - case "phx_leave": self = .leave - case "phx_reply": self = .reply - case "phx_error": self = .error - case "phx_close": self = .close - case "*": self = .all - case "insert": self = .insert - case "update": self = .update - case "delete": self = .delete - case "broadcast": self = .broadcast - case "presence": self = .presence - case "presence_state": self = .presenceState - case "presence_diff": self = .presenceDiff - default: return nil - } - } - - var isLifecyleEvent: Bool { - switch self { - case .join, .leave, .reply, .error, .close: return true +/// a channel regarding a Channel's lifecycle. +public enum ChannelEvent { + public static let heartbeat = "heartbeat" + public static let join = "phx_join" + public static let leave = "phx_leave" + public static let reply = "phx_reply" + public static let error = "phx_error" + public static let close = "phx_close" + public static let accessToken = "access_token" + public static let postgresChanges = "postgres_changes" + public static let broadcast = "broadcast" + public static let presence = "presence" + + static func isLifecyleEvent(_ event: String) -> Bool { + switch event { + case join, leave, reply, error, close: return true default: return false } } } - -/// Represents the different topic a channel can subscribe to. -public enum ChannelTopic: RawRepresentable, Equatable { - case all - case schema(_ schema: String) - case table(_ table: String, schema: String) - case column(_ column: String, value: String, table: String, schema: String) - - case heartbeat - - public var rawValue: String { - switch self { - case .all: return "realtime:*" - case let .schema(name): return "realtime:\(name)" - case let .table(tableName, schema): return "realtime:\(schema):\(tableName)" - case let .column(columnName, value, table, schema): - return "realtime:\(schema):\(table):\(columnName)=eq.\(value)" - case .heartbeat: return "phoenix" - } - } - - public init?(rawValue: String) { - if rawValue == "realtime:*" || rawValue == "*" { - self = .all - } else if rawValue == "phoenix" { - self = .heartbeat - } else { - let parts = rawValue.replacingOccurrences(of: "realtime:", with: "").split(separator: ":") - switch parts.count { - case 1: - self = .schema(String(parts[0])) - case 2: - self = .table(String(parts[1]), schema: String(parts[0])) - case 3: - let condition = parts[2].split(separator: "=") - if condition.count == 2, - condition[1].hasPrefix("eq.") - { - self = .column( - String(condition[0]), value: String(condition[1].dropFirst(3)), table: String(parts[1]), - schema: String(parts[0]) - ) - } else { - return nil - } - default: - return nil - } - } - } -} - -/// Represents the broadcast and presence options for a channel. -public struct ChannelOptions { - /// Used to track presence payload across clients. Must be unique per client. If `nil`, the server will generate one. - var presenceKey: String? - /// Enables the client to receieve their own`broadcast` messages - var broadcastSelf: Bool - /// Instructs the server to acknoledge the client's `broadcast` messages - var broadcastAcknowledge: Bool - - public init( - presenceKey: String? = nil, broadcastSelf: Bool = false, broadcastAcknowledge: Bool = false - ) { - self.presenceKey = presenceKey - self.broadcastSelf = broadcastSelf - self.broadcastAcknowledge = broadcastAcknowledge - } - - /// Parameters used to configure the channel - var params: [String: [String: Any]] { - [ - "config": [ - "presence": [ - "key": presenceKey ?? "" - ], - "broadcast": [ - "ack": broadcastAcknowledge, - "self": broadcastSelf, - ], - ] - ] - } - -} - -/// Represents the different status of a push -public enum PushStatus: String { - case ok - case error - case timeout -} diff --git a/Sources/Realtime/Delegated.swift b/Sources/Realtime/Delegated.swift index a3388d10..6e548914 100644 --- a/Sources/Realtime/Delegated.swift +++ b/Sources/Realtime/Delegated.swift @@ -32,7 +32,7 @@ public struct Delegated { with callback: @escaping (Target, Input) -> Output ) { self.callback = { [weak target] input in - guard let target = target else { + guard let target else { return nil } return callback(target, input) @@ -40,11 +40,11 @@ public struct Delegated { } public func call(_ input: Input) -> Output? { - return callback?(input) + callback?(input) } public var isDelegateSet: Bool { - return callback != nil + callback != nil } } @@ -85,7 +85,7 @@ extension Delegated where Input == Void { extension Delegated where Input == Void { public func call() -> Output? { - return call(()) + call(()) } } diff --git a/Sources/Realtime/HeartbeatTimer.swift b/Sources/Realtime/HeartbeatTimer.swift index d8de6c52..28200826 100644 --- a/Sources/Realtime/HeartbeatTimer.swift +++ b/Sources/Realtime/HeartbeatTimer.swift @@ -20,10 +20,13 @@ import Foundation -/// Heartbeat Timer class which manages the lifecycle of the underlying -/// timer which triggers when a heartbeat should be fired. This heartbeat -/// runs on it's own Queue so that it does not interfere with the main -/// queue but guarantees thread safety. +/** + Heartbeat Timer class which manages the lifecycle of the underlying + timer which triggers when a heartbeat should be fired. This heartbeat + runs on it's own Queue so that it does not interfere with the main + queue but guarantees thread safety. + */ + class HeartbeatTimer { // ---------------------------------------------------------------------- @@ -53,13 +56,13 @@ class HeartbeatTimer { private var temporaryEventHandler: (() -> Void)? /** - Create a new HeartbeatTimer + Create a new HeartbeatTimer - - Parameters: - - timeInterval: Interval to fire the timer. Repeats - - queue: Queue to schedule the timer on - - leeway: The maximum amount of time which the system may delay the delivery of the timer events - */ + - Parameters: + - timeInterval: Interval to fire the timer. Repeats + - queue: Queue to schedule the timer on + - leeway: The maximum amount of time which the system may delay the delivery of the timer events + */ init( timeInterval: TimeInterval, queue: DispatchQueue = Defaults.heartbeatQueue, leeway: DispatchTimeInterval = Defaults.heartbeatLeeway @@ -70,10 +73,10 @@ class HeartbeatTimer { } /** - Create a new HeartbeatTimer + Create a new HeartbeatTimer - - Parameter timeInterval: Interval to fire the timer. Repeats - */ + - Parameter timeInterval: Interval to fire the timer. Repeats + */ convenience init(timeInterval: TimeInterval) { self.init(timeInterval: timeInterval, queue: Defaults.heartbeatQueue) } @@ -109,17 +112,17 @@ class HeartbeatTimer { } /** - True if the Timer exists and has not been cancelled. False otherwise - */ + True if the Timer exists and has not been cancelled. False otherwise + */ var isValid: Bool { guard let timer = temporaryTimer else { return false } return !timer.isCancelled } /** - Calls the Timer's event handler immediately. This method - is primarily used in tests (not ideal) - */ + Calls the Timer's event handler immediately. This method + is primarily used in tests (not ideal) + */ func fire() { guard isValid else { return } temporaryEventHandler?() @@ -128,6 +131,6 @@ class HeartbeatTimer { extension HeartbeatTimer: Equatable { static func == (lhs: HeartbeatTimer, rhs: HeartbeatTimer) -> Bool { - return lhs.uuid == rhs.uuid + lhs.uuid == rhs.uuid } } diff --git a/Sources/Realtime/Message.swift b/Sources/Realtime/Message.swift index 50472032..5fb934cd 100644 --- a/Sources/Realtime/Message.swift +++ b/Sources/Realtime/Message.swift @@ -21,22 +21,22 @@ import Foundation /// Data that is received from the Server. -public class Message { +public struct Message { /// Reference number. Empty if missing public let ref: String /// Join Reference number - internal let joinRef: String? + let joinRef: String? /// Message topic - public let topic: ChannelTopic + public let topic: String /// Message event - public let event: ChannelEvent + public let event: String /// The raw payload from the Message, including a nested response from /// phx_reply events. It is recommended to use `payload` instead. - internal let rawPayload: Payload + let rawPayload: Payload /// Message payload public var payload: Payload { @@ -50,16 +50,13 @@ public class Message { /// message.payload["status"] /// ``` public var status: PushStatus? { - guard let status = rawPayload["status"] as? String else { - return nil - } - return PushStatus(rawValue: status) + (rawPayload["status"] as? String).flatMap(PushStatus.init(rawValue:)) } init( ref: String = "", - topic: ChannelTopic = .all, - event: ChannelEvent = .all, + topic: String = "", + event: String = "", payload: Payload = [:], joinRef: String? = nil ) { @@ -75,9 +72,9 @@ public class Message { joinRef = json[0] as? String ref = json[1] as? String ?? "" - if let topic = (json[2] as? String).flatMap(ChannelTopic.init(rawValue:)), - let event = (json[3] as? String).flatMap(ChannelEvent.init(rawValue:)), - let payload = json[4] as? Payload + if let topic = json[2] as? String, + let event = json[3] as? String, + let payload = json[4] as? Payload { self.topic = topic self.event = event diff --git a/Sources/Realtime/Transport.swift b/Sources/Realtime/PhoenixTransport.swift similarity index 70% rename from Sources/Realtime/Transport.swift rename to Sources/Realtime/PhoenixTransport.swift index 92f73641..8867e8d3 100644 --- a/Sources/Realtime/Transport.swift +++ b/Sources/Realtime/PhoenixTransport.swift @@ -25,34 +25,39 @@ import Foundation // MARK: - Transport Protocol // ---------------------------------------------------------------------- -/// Defines a `Socket`'s Transport layer. +/** + Defines a `Socket`'s Transport layer. + */ // sourcery: AutoMockable -public protocol Transport { +public protocol PhoenixTransport { /// The current `ReadyState` of the `Transport` layer - var readyState: TransportReadyState { get } + var readyState: PhoenixTransportReadyState { get } /// Delegate for the `Transport` layer - var delegate: TransportDelegate? { get set } + var delegate: PhoenixTransportDelegate? { get set } /** - Connect to the server - */ - func connect() + Connect to the server + + - Parameters: + - headers: Headers to include in the URLRequests when opening the Websocket connection. Can be empty [:] + */ + func connect(with headers: [String: Any]) /** - Disconnect from the server. + Disconnect from the server. - - Parameters: - - code: Status code as defined by Section 7.4 of RFC 6455. - - reason: Reason why the connection is closing. Optional. - */ + - Parameters: + - code: Status code as defined by Section 7.4 of RFC 6455. + - reason: Reason why the connection is closing. Optional. + */ func disconnect(code: Int, reason: String?) /** - Sends a message to the server. + Sends a message to the server. - - Parameter data: Data to send. - */ + - Parameter data: Data to send. + */ func send(data: Data) } @@ -62,36 +67,36 @@ public protocol Transport { // ---------------------------------------------------------------------- /// Delegate to receive notifications of events that occur in the `Transport` layer -public protocol TransportDelegate { +public protocol PhoenixTransportDelegate { /** - Notified when the `Transport` opens. + Notified when the `Transport` opens. - - Parameter response: Response from the server indicating that the WebSocket handshake was successful and the connection has been upgraded to webSockets - */ + - Parameter response: Response from the server indicating that the WebSocket handshake was successful and the connection has been upgraded to webSockets + */ func onOpen(response: URLResponse?) /** - Notified when the `Transport` receives an error. + Notified when the `Transport` receives an error. - - Parameter error: Client-side error from the underlying `Transport` implementation - - Parameter response: Response from the server, if any, that occurred with the Error + - Parameter error: Client-side error from the underlying `Transport` implementation + - Parameter response: Response from the server, if any, that occurred with the Error - */ + */ func onError(error: Error, response: URLResponse?) /** - Notified when the `Transport` receives a message from the server. + Notified when the `Transport` receives a message from the server. - - Parameter message: Message received from the server - */ + - Parameter message: Message received from the server + */ func onMessage(message: String) /** - Notified when the `Transport` closes. + Notified when the `Transport` closes. - - Parameter code: Code that was sent when the `Transport` closed - - Parameter reason: A concise human-readable prose explanation for the closure - */ + - Parameter code: Code that was sent when the `Transport` closed + - Parameter reason: A concise human-readable prose explanation for the closure + */ func onClose(code: Int, reason: String?) } @@ -101,7 +106,7 @@ public protocol TransportDelegate { // ---------------------------------------------------------------------- /// Available `ReadyState`s of a `Transport` layer. -public enum TransportReadyState { +public enum PhoenixTransportReadyState { /// The `Transport` is opening a connection to the server. case connecting @@ -128,12 +133,12 @@ public enum TransportReadyState { /// `Transport` implementations. Or you can create your own implementation using /// your own WebSocket library or implementation. @available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, *) -open class URLSessionTransport: NSObject, Transport, URLSessionWebSocketDelegate { +open class URLSessionTransport: NSObject, PhoenixTransport, URLSessionWebSocketDelegate { /// The URL to connect to - internal let url: URL + let url: URL /// The URLSession configuration - internal let configuration: URLSessionConfiguration + let configuration: URLSessionConfiguration /// The underling URLSession. Assigned during `connect()` private var session: URLSession? = nil @@ -142,33 +147,33 @@ open class URLSessionTransport: NSObject, Transport, URLSessionWebSocketDelegate private var task: URLSessionWebSocketTask? = nil /** - Initializes a `Transport` layer built using URLSession's WebSocket + Initializes a `Transport` layer built using URLSession's WebSocket - Example: + Example: - ```swift - let url = URL("wss://example.com/socket") - let transport: Transport = URLSessionTransport(url: url) - ``` + ```swift + let url = URL("wss://example.com/socket") + let transport: Transport = URLSessionTransport(url: url) + ``` - Using a custom `URLSessionConfiguration` + Using a custom `URLSessionConfiguration` - ```swift - let url = URL("wss://example.com/socket") - let configuration = URLSessionConfiguration.default - let transport: Transport = URLSessionTransport(url: url, configuration: configuration) - ``` + ```swift + let url = URL("wss://example.com/socket") + let configuration = URLSessionConfiguration.default + let transport: Transport = URLSessionTransport(url: url, configuration: configuration) + ``` - - parameter url: URL to connect to - - parameter configuration: Provide your own URLSessionConfiguration. Uses `.default` if none provided - */ + - parameter url: URL to connect to + - parameter configuration: Provide your own URLSessionConfiguration. Uses `.default` if none provided + */ public init(url: URL, configuration: URLSessionConfiguration = .default) { // URLSession requires that the endpoint be "wss" instead of "https". let endpoint = url.absoluteString let wsEndpoint = endpoint - .replacingOccurrences(of: "http://", with: "ws://") - .replacingOccurrences(of: "https://", with: "wss://") + .replacingOccurrences(of: "http://", with: "ws://") + .replacingOccurrences(of: "https://", with: "wss://") // Force unwrapping should be safe here since a valid URL came in and we just // replaced the protocol. @@ -180,16 +185,23 @@ open class URLSessionTransport: NSObject, Transport, URLSessionWebSocketDelegate // MARK: - Transport - public var readyState: TransportReadyState = .closed - public var delegate: TransportDelegate? = nil + public var readyState: PhoenixTransportReadyState = .closed + public var delegate: PhoenixTransportDelegate? = nil - open func connect() { + public func connect(with headers: [String: Any]) { // Set the transport state as connecting readyState = .connecting // Create the session and websocket task session = URLSession(configuration: configuration, delegate: self, delegateQueue: nil) - task = session?.webSocketTask(with: url) + var request = URLRequest(url: url) + + headers.forEach { (key: String, value: Any) in + guard let value = value as? String else { return } + request.addValue(value, forHTTPHeaderField: key) + } + + task = session?.webSocketTask(with: request) // Start the task task?.resume() @@ -197,11 +209,11 @@ open class URLSessionTransport: NSObject, Transport, URLSessionWebSocketDelegate open func disconnect(code: Int, reason: String?) { /* - TODO: - 1. Provide a "strict" mode that fails if an invalid close code is given - 2. If strict mode is disabled, default to CloseCode.invalid - 3. Provide default .normalClosure function - */ + TODO: + 1. Provide a "strict" mode that fails if an invalid close code is given + 2. If strict mode is disabled, default to CloseCode.invalid + 3. Provide default .normalClosure function + */ guard let closeCode = URLSessionWebSocketTask.CloseCode(rawValue: code) else { fatalError("Could not create a CloseCode with invalid code: [\(code)].") } diff --git a/Sources/Realtime/Presence.swift b/Sources/Realtime/Presence.swift index 67bf4c84..82e08508 100644 --- a/Sources/Realtime/Presence.swift +++ b/Sources/Realtime/Presence.swift @@ -103,16 +103,16 @@ public final class Presence { /// let options = Options(events: [.state: "my_state", .diff: "my_diff"]) /// let presence = Presence(channel, opts: options) public struct Options { - let events: [Events: ChannelEvent] + let events: [Events: String] /// Default set of Options used when creating Presence. Uses the /// phoenix events "presence_state" and "presence_diff" public static let defaults = Options(events: [ - .state: .presenceState, - .diff: .presenceDiff, + .state: "presence_state", + .diff: "presence_diff", ]) - public init(events: [Events: ChannelEvent]) { + public init(events: [Events: String]) { self.events = events } } @@ -163,7 +163,7 @@ public final class Presence { // ---------------------------------------------------------------------- /// The channel the Presence belongs to - weak var channel: Channel? + weak var channel: RealtimeChannel? /// Caller to callback hooks var caller: Caller @@ -184,7 +184,7 @@ public final class Presence { /// Callback to be informed of joins public var onJoin: OnJoin { - get { return caller.onJoin } + get { caller.onJoin } set { caller.onJoin = newValue } } @@ -195,7 +195,7 @@ public final class Presence { /// Callback to be informed of leaves public var onLeave: OnLeave { - get { return caller.onLeave } + get { caller.onLeave } set { caller.onLeave = newValue } } @@ -206,7 +206,7 @@ public final class Presence { /// Callback to be informed of synces public var onSync: OnSync { - get { return caller.onSync } + get { caller.onSync } set { caller.onSync = newValue } } @@ -215,19 +215,19 @@ public final class Presence { onSync = callback } - public init(channel: Channel, opts: Options = Options.defaults) { + public init(channel: RealtimeChannel, opts: Options = Options.defaults) { state = [:] pendingDiffs = [] self.channel = channel joinRef = nil caller = Caller() - guard // Do not subscribe to events if they were not provided + guard // Do not subscribe to events if they were not provided let stateEvent = opts.events[.state], let diffEvent = opts.events[.diff] else { return } - self.channel?.delegateOn(stateEvent, to: self) { (self, message) in + self.channel?.delegateOn(stateEvent, filter: ChannelFilter(), to: self) { (self, message) in guard let newState = message.rawPayload as? State else { return } self.joinRef = self.channel?.joinRef @@ -251,7 +251,7 @@ public final class Presence { self.caller.onSync() } - self.channel?.delegateOn(diffEvent, to: self) { (self, message) in + self.channel?.delegateOn(diffEvent, filter: ChannelFilter(), to: self) { (self, message) in guard let diff = message.rawPayload as? Diff else { return } if self.isPendingSyncState { self.pendingDiffs.append(diff) @@ -269,17 +269,17 @@ public final class Presence { /// Returns the array of presences, with deault selected metadata. public func list() -> [Map] { - return list(by: { _, pres in pres }) + list(by: { _, pres in pres }) } /// Returns the array of presences, with selected metadata public func list(by transformer: (String, Map) -> T) -> [T] { - return Presence.listBy(state, transformer: transformer) + Presence.listBy(state, transformer: transformer) } /// Filter the Presence state with a given function public func filter(by filter: ((String, Map) -> Bool)?) -> State { - return Presence.filter(state, by: filter) + Presence.filter(state, by: filter) } // ---------------------------------------------------------------------- @@ -406,38 +406,6 @@ public final class Presence { _ presences: State, transformer: (String, Map) -> T ) -> [T] { - return presences.map(transformer) + presences.map(transformer) } } - -extension Presence.Map { - - /// Decodes the presence metadata to an array of the specified type. - /// - parameter type: The type to decode to. - /// - parameter decoder: The decoder to use. - /// - returns: The decoded values. - /// - throws: Any error that occurs during decoding. - public func decode( - to type: T.Type = T.self, decoder: JSONDecoder = Defaults.decoder - ) throws -> [T] { - let metas: [Presence.Meta] = self["metas"]! - let data = try JSONSerialization.data(withJSONObject: metas) - return try decoder.decode([T].self, from: data) - } - -} - -extension Presence.State { - - /// Decodes the presence metadata to a dictionary of arrays of the specified type. - /// - parameter type: The type to decode to. - /// - parameter decoder: The decoder to use. - /// - returns: The dictionary of decoded values. - /// - throws: Any error that occurs during decoding. - public func decode( - to type: T.Type = T.self, decoder: JSONDecoder = Defaults.decoder - ) throws -> [String: [T]] { - return try mapValues { try $0.decode(decoder: decoder) } - } - -} diff --git a/Sources/Realtime/Push.swift b/Sources/Realtime/Push.swift index 0eb5e8bc..df038a9a 100644 --- a/Sources/Realtime/Push.swift +++ b/Sources/Realtime/Push.swift @@ -23,10 +23,10 @@ import Foundation /// Represnts pushing data to a `Channel` through the `Socket` public class Push { /// The channel sending the Push - public weak var channel: Channel? + public weak var channel: RealtimeChannel? - /// The event, for example `ChannelEvent.join` - public let event: ChannelEvent + /// The event, for example `phx_join` + public let event: String /// The payload, for example ["user_id": "abc123"] public var payload: Payload @@ -53,7 +53,7 @@ public class Push { var ref: String? /// The event that is associated with the reference ID of the Push - var refEvent: ChannelEvent? + var refEvent: String? /// Initializes a Push /// @@ -62,8 +62,8 @@ public class Push { /// - parameter payload: Optional. The Payload to send, e.g. ["user_id": "abc123"] /// - parameter timeout: Optional. The push timeout. Default is 10.0s init( - channel: Channel, - event: ChannelEvent, + channel: RealtimeChannel, + event: String, payload: Payload = [:], timeout: TimeInterval = Defaults.timeoutInterval ) { @@ -94,7 +94,7 @@ public class Push { startTimeout() sent = true channel?.socket?.push( - topic: channel?.topic ?? .all, + topic: channel?.topic ?? "", event: event, payload: payload, ref: ref, @@ -158,9 +158,9 @@ public class Push { /// Shared behavior between `receive` calls @discardableResult - internal func receive(_ status: PushStatus, delegated: Delegated) -> Push { + func receive(_ status: PushStatus, delegated: Delegated) -> Push { // If the message has already been received, pass it to the callback immediately - if hasReceived(status: status), let receivedMessage = receivedMessage { + if hasReceived(status: status), let receivedMessage { delegated.call(receivedMessage) } @@ -176,7 +176,7 @@ public class Push { } /// Resets the Push as it was after it was first tnitialized. - internal func reset() { + func reset() { cancelRefEvent() ref = nil refEvent = nil @@ -194,38 +194,38 @@ public class Push { /// Reverses the result on channel.on(ChannelEvent, callback) that spawned the Push private func cancelRefEvent() { - guard let refEvent = refEvent else { return } + guard let refEvent else { return } channel?.off(refEvent) } /// Cancel any ongoing Timeout Timer - internal func cancelTimeout() { + func cancelTimeout() { timeoutWorkItem?.cancel() timeoutWorkItem = nil } /// Starts the Timer which will trigger a timeout after a specific _timeout_ /// time, in milliseconds, is reached. - internal func startTimeout() { + func startTimeout() { // Cancel any existing timeout before starting a new one if let safeWorkItem = timeoutWorkItem, !safeWorkItem.isCancelled { cancelTimeout() } guard - let channel = channel, + let channel, let socket = channel.socket else { return } let ref = socket.makeRef() - let refEvent = ChannelEvent.channelReply(ref) + let refEvent = channel.replyEventName(ref) self.ref = ref self.refEvent = refEvent /// If a response is received before the Timer triggers, cancel timer - /// and match the recevied event to it's corresponding hook - channel.delegateOn(refEvent, to: self) { (self, message) in + /// and match the received event to it's corresponding hook + channel.delegateOn(refEvent, filter: ChannelFilter(), to: self) { (self, message) in self.cancelRefEvent() self.cancelTimeout() self.receivedMessage = message @@ -248,17 +248,17 @@ public class Push { /// /// - parameter status: Status to check /// - return: True if given status has been received by the Push. - internal func hasReceived(status: PushStatus) -> Bool { - return receivedMessage?.status == status + func hasReceived(status: PushStatus) -> Bool { + receivedMessage?.status == status } /// Triggers an event to be sent though the Channel - internal func trigger(_ status: PushStatus, payload: Payload) { + func trigger(_ status: PushStatus, payload: Payload) { /// If there is no ref event, then there is nothing to trigger on the channel - guard let refEvent = refEvent else { return } + guard let refEvent else { return } var mutPayload = payload - mutPayload["status"] = status + mutPayload["status"] = status.rawValue channel?.trigger(event: refEvent, payload: mutPayload) } diff --git a/Sources/Realtime/Channel.swift b/Sources/Realtime/RealtimeChannel.swift similarity index 54% rename from Sources/Realtime/Channel.swift rename to Sources/Realtime/RealtimeChannel.swift index c22a5215..ff21bf37 100644 --- a/Sources/Realtime/Channel.swift +++ b/Sources/Realtime/RealtimeChannel.swift @@ -20,23 +20,108 @@ import Foundation import Swift +@_spi(Internal) import _Helpers /// Container class of bindings to the channel struct Binding { - // The event that the Binding is bound to - let event: ChannelEvent - - // The reference number of the Binding - let ref: Int + let type: String + let filter: [String: String] // The callback to be triggered let callback: Delegated + + let id: String? +} + +public struct ChannelFilter { + public let event: String? + public let schema: String? + public let table: String? + public let filter: String? + + public init( + event: String? = nil, schema: String? = nil, table: String? = nil, filter: String? = nil + ) { + self.event = event + self.schema = schema + self.table = table + self.filter = filter + } + + var asDictionary: [String: String] { + [ + "event": event, + "schema": schema, + "table": table, + "filter": filter, + ].compactMapValues { $0 } + } +} + +public enum ChannelResponse { + case ok, timedOut, error +} + +public enum RealtimeListenTypes: String { + case postgresChanges = "postgres_changes" + case broadcast + case presence +} + +/// Represents the broadcast and presence options for a channel. +public struct RealtimeChannelOptions { + /// Used to track presence payload across clients. Must be unique per client. If `nil`, the server + /// will generate one. + var presenceKey: String? + /// Enables the client to receive their own`broadcast` messages + var broadcastSelf: Bool + /// Instructs the server to acknowledge the client's `broadcast` messages + var broadcastAcknowledge: Bool + + public init( + presenceKey: String? = nil, + broadcastSelf: Bool = false, + broadcastAcknowledge: Bool = false + ) { + self.presenceKey = presenceKey + self.broadcastSelf = broadcastSelf + self.broadcastAcknowledge = broadcastAcknowledge + } + + /// Parameters used to configure the channel + var params: [String: [String: Any]] { + [ + "config": [ + "presence": [ + "key": presenceKey ?? "", + ], + "broadcast": [ + "ack": broadcastAcknowledge, + "self": broadcastSelf, + ], + ], + ] + } +} + +/// Represents the different status of a push +public enum PushStatus: String { + case ok + case error + case timeout +} + +public enum RealtimeSubscribeStates { + case subscribed + case timedOut + case closed + case channelError } /// -/// Represents a Channel which is bound to a topic +/// Represents a RealtimeChannel which is bound to a topic /// -/// A Channel can bind to multiple events on a given topic and +/// A RealtimeChannel can bind to multiple events on a given topic and /// be informed when those events occur within a topic. /// /// ### Example: @@ -49,33 +134,34 @@ struct Binding { /// .receive("timeout") { payload in print("Networking issue...", payload) } /// /// channel.join() -/// .receive("ok") { payload in print("Channel Joined", payload) } +/// .receive("ok") { payload in print("RealtimeChannel Joined", payload) } /// .receive("error") { payload in print("Failed ot join", payload) } /// .receive("timeout") { payload in print("Networking issue...", payload) } /// -public class Channel { - /// The topic of the Channel. e.g. "rooms:friends" - public let topic: ChannelTopic +public class RealtimeChannel { + /// The topic of the RealtimeChannel. e.g. "rooms:friends" + public let topic: String /// The params sent when joining the channel public var params: Payload { - didSet { self.joinPush.payload = params } + didSet { joinPush.payload = params } } + public private(set) lazy var presence = Presence(channel: self) + /// The Socket that the channel belongs to weak var socket: RealtimeClient? - /// Current state of the Channel + var subTopic: String + + /// Current state of the RealtimeChannel var state: ChannelState /// Collection of event bindings - var syncBindingsDel: SynchronizedArray + let bindings: LockIsolated<[String: [Binding]]> - /// Tracks event binding ref counters - var bindingRef: Int - - /// Timout when attempting to join a Channel + /// Timeout when attempting to join a RealtimeChannel var timeout: TimeInterval /// Set to true once the channel calls .join() @@ -84,7 +170,7 @@ public class Channel { /// Push to send when the channel calls .join() var joinPush: Push! - /// Buffer of Pushes that will be sent once the Channel's socket connects + /// Buffer of Pushes that will be sent once the RealtimeChannel's socket connects var pushBuffer: [Push] /// Timer to attempt to rejoin @@ -93,26 +179,18 @@ public class Channel { /// Refs of stateChange hooks var stateChangeRefs: [String] - /// Initialize a Channel - /// - parameter topic: Topic of the Channel - /// - parameter options: Optional. Options to configure channel broadcast and presence. Leave nil for postgres channel. - /// - parameter socket: Socket that the channel is a part of - convenience init(topic: ChannelTopic, options: ChannelOptions? = nil, socket: RealtimeClient) { - self.init(topic: topic, params: options?.params ?? [:], socket: socket) - } - - /// Initialize a Channel + /// Initialize a RealtimeChannel /// - /// - parameter topic: Topic of the Channel + /// - parameter topic: Topic of the RealtimeChannel /// - parameter params: Optional. Parameters to send when joining. /// - parameter socket: Socket that the channel is a part of - init(topic: ChannelTopic, params: [String: Any], socket: RealtimeClient) { + init(topic: String, params: [String: Any] = [:], socket: RealtimeClient) { state = ChannelState.closed self.topic = topic + subTopic = topic.replacingOccurrences(of: "realtime:", with: "") self.params = params self.socket = socket - syncBindingsDel = SynchronizedArray() - bindingRef = 0 + bindings = LockIsolated([:]) timeout = socket.timeout joinedOnce = false pushBuffer = [] @@ -158,7 +236,7 @@ public class Channel { /// Handle when a response is received after join() joinPush.delegateReceive(.ok, to: self) { (self, _) in - // Mark the Channel as joined + // Mark the RealtimeChannel as joined self.state = ChannelState.joined // Reset the timer, preventing it from attempting to join again @@ -169,7 +247,7 @@ public class Channel { self.pushBuffer = [] } - // Perform if Channel errors while attempting to joi + // Perform if RealtimeChannel errors while attempting to joi joinPush.delegateReceive(.error, to: self) { (self, _) in self.state = .errored if self.socket?.isConnected == true { self.rejoinTimer.scheduleTimeout() } @@ -190,14 +268,14 @@ public class Channel { ) leavePush.send() - // Mark the Channel as in an error and attempt to rejoin if socket is connected + // Mark the RealtimeChannel as in an error and attempt to rejoin if socket is connected self.state = ChannelState.errored self.joinPush.reset() if self.socket?.isConnected == true { self.rejoinTimer.scheduleTimeout() } } - /// Perfom when the Channel has been closed + /// Perfom when the RealtimeChannel has been closed delegateOnClose(to: self) { (self, _) in // Reset any timer that may be on-going self.rejoinTimer.reset() @@ -212,7 +290,7 @@ public class Channel { self.socket?.remove(self) } - /// Perfom when the Channel errors + /// Perfom when the RealtimeChannel errors delegateOnError(to: self) { (self, message) in // Log that the channel received an error self.socket?.logItems( @@ -237,10 +315,10 @@ public class Channel { } // Perform when the join reply is received - delegateOn(ChannelEvent.reply, to: self) { (self, message) in + delegateOn(ChannelEvent.reply, filter: ChannelFilter(), to: self) { (self, message) in // Trigger bindings self.trigger( - event: ChannelEvent.channelReply(message.ref), + event: self.replyEventName(message.ref), payload: message.rawPayload, ref: message.ref, joinRef: message.joinRef @@ -263,63 +341,185 @@ public class Channel { /// Joins the channel /// - /// - parameter timeout: Optional. Defaults to Channel's timeout + /// - parameter timeout: Optional. Defaults to RealtimeChannel's timeout /// - return: Push event @discardableResult - public func join(timeout: TimeInterval? = nil) -> Push { + public func subscribe( + callback: ((RealtimeSubscribeStates, Error?) -> Void)? = nil, + timeout: TimeInterval? = nil + ) -> RealtimeChannel { guard !joinedOnce else { fatalError( "tried to join multiple times. 'join' " - + "can only be called a single time per channel instance") + + "can only be called a single time per channel instance" + ) } - // Join the Channel - if let safeTimeout = timeout { self.timeout = safeTimeout } + onError { message in + let values = message.payload.values.map { "\($0) " } + let error = RealtimeError(values.isEmpty ? "error" : values.joined(separator: ", ")) + callback?(.channelError, error) + } + + onClose { _ in + callback?(.closed, nil) + } + + // Join the RealtimeChannel + if let safeTimeout = timeout { + self.timeout = safeTimeout + } + + let broadcast = params["config", as: [String: Any].self]?["broadcast"] + let presence = params["config", as: [String: Any].self]?["presence"] + + var accessTokenPayload: Payload = [:] + var config: Payload = [ + "postgres_changes": bindings.value["postgres_changes"]?.map(\.filter) ?? [], + ] + + config["broadcast"] = broadcast + config["presence"] = presence + + if let accessToken = socket?.accessToken { + accessTokenPayload["access_token"] = accessToken + } + + params["config"] = config joinedOnce = true rejoin() - return joinPush + + joinPush + .delegateReceive(.ok, to: self) { (self, message) in + if self.socket?.accessToken != nil { + self.socket?.setAuth(self.socket?.accessToken) + } + + guard let serverPostgresFilters = message.payload["postgres_changes"] as? [[String: Any]] + else { + callback?(.subscribed, nil) + return + } + + let clientPostgresBindings = self.bindings.value["postgres_changes"] ?? [] + let bindingsCount = clientPostgresBindings.count + var newPostgresBindings: [Binding] = [] + + for i in 0 ..< bindingsCount { + let clientPostgresBinding = clientPostgresBindings[i] + + let event = clientPostgresBinding.filter["event"] + let schema = clientPostgresBinding.filter["schema"] + let table = clientPostgresBinding.filter["table"] + let filter = clientPostgresBinding.filter["filter"] + + let serverPostgresFilter = serverPostgresFilters[i] + + if serverPostgresFilter["event", as: String.self] == event, + serverPostgresFilter["schema", as: String.self] == schema, + serverPostgresFilter["table", as: String.self] == table, + serverPostgresFilter["filter", as: String.self] == filter + { + newPostgresBindings.append( + Binding( + type: clientPostgresBinding.type, + filter: clientPostgresBinding.filter, + callback: clientPostgresBinding.callback, + id: serverPostgresFilter["id", as: Int.self].flatMap(String.init) + ) + ) + } else { + self.unsubscribe() + callback?( + .channelError, + RealtimeError("Mismatch between client and server bindings for postgres changes.") + ) + return + } + } + + self.bindings.withValue { + $0["postgres_changes"] = newPostgresBindings + } + callback?(.subscribed, nil) + } + .delegateReceive(.error, to: self) { _, message in + let values = message.payload.values.map { "\($0) " } + let error = RealtimeError(values.isEmpty ? "error" : values.joined(separator: ", ")) + callback?(.channelError, error) + } + .delegateReceive(.timeout, to: self) { _, _ in + callback?(.timedOut, nil) + } + + return self + } + + public func presenceState() -> Presence.State { + presence.state + } + + public func track(payload: Payload, opts: Payload = [:]) async -> ChannelResponse { + await send( + type: .presence, + payload: [ + "event": "track", + "payload": payload, + ], + opts: opts + ) + } + + public func untrack(opts: Payload = [:]) async -> ChannelResponse { + await send( + type: .presence, + payload: ["event": "untrack"], + opts: opts + ) } - /// Hook into when the Channel is closed. Does not handle retain cycles. + /// Hook into when the RealtimeChannel is closed. Does not handle retain cycles. /// Use `delegateOnClose(to:)` for automatic handling of retain cycles. /// /// Example: /// /// let channel = socket.channel("topic") /// channel.onClose() { [weak self] message in - /// self?.print("Channel \(message.topic) has closed" + /// self?.print("RealtimeChannel \(message.topic) has closed" /// } /// - /// - parameter callback: Called when the Channel closes + /// - parameter callback: Called when the RealtimeChannel closes /// - return: Ref counter of the subscription. See `func off()` @discardableResult - public func onClose(_ callback: @escaping ((Message) -> Void)) -> Int { - return on(ChannelEvent.close, callback: callback) + public func onClose(_ callback: @escaping ((Message) -> Void)) -> RealtimeChannel { + on(ChannelEvent.close, filter: ChannelFilter(), callback: callback) } - /// Hook into when the Channel is closed. Automatically handles retain + /// Hook into when the RealtimeChannel is closed. Automatically handles retain /// cycles. Use `onClose()` to handle yourself. /// /// Example: /// /// let channel = socket.channel("topic") /// channel.delegateOnClose(to: self) { (self, message) in - /// self.print("Channel \(message.topic) has closed" + /// self.print("RealtimeChannel \(message.topic) has closed" /// } /// /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Channel closes + /// - parameter callback: Called when the RealtimeChannel closes /// - return: Ref counter of the subscription. See `func off()` @discardableResult public func delegateOnClose( to owner: Target, callback: @escaping ((Target, Message) -> Void) - ) -> Int { - return delegateOn(ChannelEvent.close, to: owner, callback: callback) + ) -> RealtimeChannel { + delegateOn( + ChannelEvent.close, filter: ChannelFilter(), to: owner, callback: callback + ) } - /// Hook into when the Channel receives an Error. Does not handle retain + /// Hook into when the RealtimeChannel receives an Error. Does not handle retain /// cycles. Use `delegateOnError(to:)` for automatic handling of retain /// cycles. /// @@ -327,35 +527,37 @@ public class Channel { /// /// let channel = socket.channel("topic") /// channel.onError() { [weak self] (message) in - /// self?.print("Channel \(message.topic) has errored" + /// self?.print("RealtimeChannel \(message.topic) has errored" /// } /// - /// - parameter callback: Called when the Channel closes + /// - parameter callback: Called when the RealtimeChannel closes /// - return: Ref counter of the subscription. See `func off()` @discardableResult - public func onError(_ callback: @escaping ((_ message: Message) -> Void)) -> Int { - return on(ChannelEvent.error, callback: callback) + public func onError(_ callback: @escaping ((_ message: Message) -> Void)) -> RealtimeChannel { + on(ChannelEvent.error, filter: ChannelFilter(), callback: callback) } - /// Hook into when the Channel receives an Error. Automatically handles + /// Hook into when the RealtimeChannel receives an Error. Automatically handles /// retain cycles. Use `onError()` to handle yourself. /// /// Example: /// /// let channel = socket.channel("topic") /// channel.delegateOnError(to: self) { (self, message) in - /// self.print("Channel \(message.topic) has closed" + /// self.print("RealtimeChannel \(message.topic) has closed" /// } /// /// - parameter owner: Class registering the callback. Usually `self` - /// - parameter callback: Called when the Channel closes + /// - parameter callback: Called when the RealtimeChannel closes /// - return: Ref counter of the subscription. See `func off()` @discardableResult public func delegateOnError( to owner: Target, callback: @escaping ((Target, Message) -> Void) - ) -> Int { - return delegateOn(ChannelEvent.error, to: owner, callback: callback) + ) -> RealtimeChannel { + delegateOn( + ChannelEvent.error, filter: ChannelFilter(), to: owner, callback: callback + ) } /// Subscribes on channel events. Does not handle retain cycles. Use @@ -382,11 +584,15 @@ public class Channel { /// - parameter callback: Called with the event's message /// - return: Ref counter of the subscription. See `func off()` @discardableResult - public func on(_ event: ChannelEvent, callback: @escaping ((Message) -> Void)) -> Int { + public func on( + _ event: String, + filter: ChannelFilter, + callback: @escaping ((Message) -> Void) + ) -> RealtimeChannel { var delegated = Delegated() delegated.manuallyDelegate(with: callback) - return on(event, delegated: delegated) + return on(event, filter: filter, delegated: delegated) } /// Subscribes on channel events. Automatically handles retain cycles. Use @@ -415,24 +621,29 @@ public class Channel { /// - return: Ref counter of the subscription. See `func off()` @discardableResult public func delegateOn( - _ event: ChannelEvent, + _ event: String, + filter: ChannelFilter, to owner: Target, callback: @escaping ((Target, Message) -> Void) - ) -> Int { + ) -> RealtimeChannel { var delegated = Delegated() delegated.delegate(to: owner, with: callback) - return on(event, delegated: delegated) + return on(event, filter: filter, delegated: delegated) } /// Shared method between `on` and `manualOn` @discardableResult - private func on(_ event: ChannelEvent, delegated: Delegated) -> Int { - let ref = bindingRef - bindingRef = ref + 1 + private func on( + _ type: String, filter: ChannelFilter, delegated: Delegated + ) -> RealtimeChannel { + bindings.withValue { + $0[type.lowercased(), default: []].append( + Binding(type: type.lowercased(), filter: filter.asDictionary, callback: delegated, id: nil) + ) + } - syncBindingsDel.append(Binding(event: event, ref: ref, callback: delegated)) - return ref + return self } /// Unsubscribes from a channel event. If a `ref` is given, only the exact @@ -453,14 +664,16 @@ public class Channel { /// "event" and nothing is printed if the channel receives "other_event". /// /// - parameter event: Event to unsubscribe from - /// - paramter ref: Ref counter returned when subscribing. Can be omitted - public func off(_ event: ChannelEvent, ref: Int? = nil) { - syncBindingsDel.removeAll { bind -> Bool in - bind.event == event && (ref == nil || ref == bind.ref) + /// - parameter ref: Ref counter returned when subscribing. Can be omitted + public func off(_ type: String, filter: [String: String] = [:]) { + bindings.withValue { + $0[type.lowercased()] = $0[type.lowercased(), default: []].filter { bind in + !(bind.type.lowercased() == type.lowercased() && bind.filter == filter) + } } } - /// Push a payload to the Channel + /// Push a payload to the RealtimeChannel /// /// Example: /// @@ -473,7 +686,7 @@ public class Channel { /// - parameter timeout: Optional timeout @discardableResult public func push( - _ event: ChannelEvent, + _ event: String, payload: Payload, timeout: TimeInterval = Defaults.timeoutInterval ) -> Push { @@ -499,6 +712,80 @@ public class Channel { return pushEvent } + public func send( + type: RealtimeListenTypes, + event: String? = nil, + payload: Payload, + opts: Payload = [:] + ) async -> ChannelResponse { + var payload = payload + payload["type"] = type.rawValue + if let event { + payload["event"] = event + } + + if !canPush, type == .broadcast { + var headers = socket?.headers ?? [:] + headers["Content-Type"] = "application/json" + headers["apikey"] = socket?.accessToken + + let body = [ + "messages": [ + "topic": subTopic, + "payload": payload, + "event": event as Any, + ], + ] + + do { + let request = try Request( + path: "", + method: "POST", + headers: headers.mapValues { "\($0)" }, + body: JSONSerialization.data(withJSONObject: body) + ) + let urlRequest = try request.urlRequest(withBaseURL: broadcastEndpointURL) + let (_, response) = try await URLSession.shared.data(for: urlRequest) + guard let httpResponse = response as? HTTPURLResponse else { + return .error + } + if 200 ..< 300 ~= httpResponse.statusCode { + return .ok + } + + return .error + } catch { + return .error + } + } else { + return await withCheckedContinuation { continuation in + let push = self.push( + type.rawValue, payload: payload, + timeout: (opts["timeout"] as? TimeInterval) ?? self.timeout + ) + + if let type = payload["type"] as? String, type == "broadcast", + let config = self.params["config"] as? [String: Any], + let broadcast = config["broadcast"] as? [String: Any] + { + let ack = broadcast["ack"] as? Bool + if ack == nil || ack == false { + continuation.resume(returning: .ok) + return + } + } + + push + .receive(.ok) { _ in + continuation.resume(returning: .ok) + } + .receive(.timeout) { _ in + continuation.resume(returning: .timedOut) + } + } + } + } + /// Leaves the channel /// /// Unsubscribes from server events, and instructs channel to terminate on @@ -516,7 +803,7 @@ public class Channel { /// - parameter timeout: Optional timeout /// - return: Push that can add receive hooks @discardableResult - public func leave(timeout: TimeInterval = Defaults.timeoutInterval) -> Push { + public func unsubscribe(timeout: TimeInterval = Defaults.timeoutInterval) -> Push { // If attempting a rejoin during a leave, then reset, cancelling the rejoin rejoinTimer.reset() @@ -546,8 +833,10 @@ public class Channel { .receive(.timeout, delegated: onCloseDelegate) leavePush.send() - // If the Channel cannot send push events, trigger a success locally - if !canPush { leavePush.trigger(.ok, payload: [:]) } + // If the RealtimeChannel cannot send push events, trigger a success locally + if !canPush { + leavePush.trigger(.ok, payload: [:]) + } // Return the push so it can be bound to return leavePush @@ -569,15 +858,15 @@ public class Channel { // MARK: - Internal // ---------------------------------------------------------------------- - /// Checks if an event received by the Socket belongs to this Channel + /// Checks if an event received by the Socket belongs to this RealtimeChannel func isMember(_ message: Message) -> Bool { - // Return false if the message's topic does not match the Channel's topic + // Return false if the message's topic does not match the RealtimeChannel's topic guard message.topic == topic else { return false } guard let safeJoinRef = message.joinRef, safeJoinRef != joinRef, - message.event.isLifecyleEvent + ChannelEvent.isLifecyleEvent(message.event) else { return true } socket?.logItems( @@ -587,7 +876,7 @@ public class Channel { return false } - /// Sends the payload to join the Channel + /// Sends the payload to join the RealtimeChannel func sendJoin(_ timeout: TimeInterval) { state = ChannelState.joining joinPush.resend(timeout) @@ -610,11 +899,52 @@ public class Channel { /// /// - parameter message: Message to pass to the event bindings func trigger(_ message: Message) { + let typeLower = message.event.lowercased() + + let events = Set([ + ChannelEvent.close, + ChannelEvent.error, + ChannelEvent.leave, + ChannelEvent.join, + ]) + + if message.ref != message.joinRef, events.contains(typeLower) { + return + } + let handledMessage = onMessage(message) - syncBindingsDel - .filter { $0.event == message.event } - .forEach { $0.callback.call(handledMessage) } + let bindings: [Binding] + + if ["insert", "update", "delete"].contains(typeLower) { + bindings = self.bindings.value["postgres_changes", default: []].filter { bind in + bind.filter["event"] == "*" || bind.filter["event"] == typeLower + } + } else { + bindings = self.bindings.value[typeLower, default: []].filter { bind in + if ["broadcast", "presence", "postgres_changes"].contains(typeLower) { + let bindEvent = bind.filter["event"]?.lowercased() + + if let bindId = bind.id.flatMap(Int.init) { + let ids = message.payload["ids", as: [Int].self] ?? [] + return ids.contains(bindId) + && ( + bindEvent == "*" + || bindEvent + == message.payload["data", as: [String: Any].self]?["type", as: String.self]? + .lowercased() + ) + } + + return bindEvent == "*" + || bindEvent == message.payload["event", as: String.self]?.lowercased() + } + + return bind.type.lowercased() == typeLower + } + } + + bindings.forEach { $0.callback.call(handledMessage) } } /// Triggers an event to the correct event bindings created by @@ -625,7 +955,7 @@ public class Channel { /// - parameter ref: Ref of the event. Defaults to empty /// - parameter joinRef: Ref of the join event. Defaults to nil func trigger( - event: ChannelEvent, + event: String, payload: Payload = [:], ref: String = "", joinRef: String? = nil @@ -640,15 +970,33 @@ public class Channel { trigger(message) } + /// - parameter ref: The ref of the event push + /// - return: The event name of the reply + func replyEventName(_ ref: String) -> String { + "chan_reply_\(ref)" + } + /// The Ref send during the join message. var joinRef: String? { - return joinPush.ref + joinPush.ref } - /// - return: True if the Channel can push messages, meaning the socket + /// - return: True if the RealtimeChannel can push messages, meaning the socket /// is connected and the channel is joined var canPush: Bool { - return socket?.isConnected == true && isJoined + socket?.isConnected == true && isJoined + } + + var broadcastEndpointURL: URL { + var url = socket?.endPoint ?? "" + url = url.replacingOccurrences(of: "^ws", with: "http", options: .regularExpression, range: nil) + url = url.replacingOccurrences( + of: "(/socket/websocket|/socket|/websocket)/?$", with: "", options: .regularExpression, + range: nil + ) + url = + "\(url.replacingOccurrences(of: "/+$", with: "", options: .regularExpression, range: nil))/api/broadcast" + return URL(string: url)! } } @@ -657,166 +1005,35 @@ public class Channel { // MARK: - Public API // ---------------------------------------------------------------------- -extension Channel { - /// - return: True if the Channel has been closed +extension RealtimeChannel { + /// - return: True if the RealtimeChannel has been closed public var isClosed: Bool { - return state == .closed + state == .closed } - /// - return: True if the Channel experienced an error + /// - return: True if the RealtimeChannel experienced an error public var isErrored: Bool { - return state == .errored + state == .errored } /// - return: True if the channel has joined public var isJoined: Bool { - return state == .joined + state == .joined } /// - return: True if the channel has requested to join public var isJoining: Bool { - return state == .joining + state == .joining } /// - return: True if the channel has requested to leave public var isLeaving: Bool { - return state == .leaving + state == .leaving } } -// ---------------------------------------------------------------------- -// MARK: - Codable Payload - -// ---------------------------------------------------------------------- - -extension Payload { - - /// Initializes a payload from a given value - /// - parameter value: The value to encode - /// - parameter encoder: The encoder to use to encode the payload - /// - throws: Throws an error if the payload cannot be encoded - init(_ value: T, encoder: JSONEncoder = Defaults.encoder) throws { - let data = try encoder.encode(value) - self = try JSONSerialization.jsonObject(with: data, options: .allowFragments) as! Payload - } - - /// Decodes the payload to a given type - /// - parameter type: The type to decode to - /// - parameter decoder: The decoder to use to decode the payload - /// - returns: The decoded payload - /// - throws: Throws an error if the payload cannot be decoded - public func decode( - to type: T.Type = T.self, decoder: JSONDecoder = Defaults.decoder - ) throws -> T { - let data = try JSONSerialization.data(withJSONObject: self) - return try decoder.decode(type, from: data) - } - -} - -// ---------------------------------------------------------------------- - -// MARK: - Broadcast API - -// ---------------------------------------------------------------------- - -/// Represents the payload of a broadcast message -public struct BroadcastPayload { - public let type: String - public let event: String - public let payload: Payload -} - -extension Channel { - /// Broadcasts the payload to all other members of the channel - /// - parameter event: The event to broadcast - /// - parameter payload: The payload to broadcast - @discardableResult - public func broadcast(event: String, payload: Payload) -> Push { - self.push( - .broadcast, - payload: [ - "type": "broadcast", - "event": event, - "payload": payload, - ]) - } - - /// Broadcasts the encodable payload to all other members of the channel - /// - parameter event: The event to broadcast - /// - parameter payload: The payload to broadcast - /// - parameter encoder: The encoder to use to encode the payload - /// - throws: Throws an error if the payload cannot be encoded - @discardableResult - public func broadcast(event: String, payload: Encodable, encoder: JSONEncoder = Defaults.encoder) - throws -> Push - { - self.broadcast(event: event, payload: try Payload(payload)) - } - - /// Subscribes to broadcast events. Does not handle retain cycles. - /// - /// Example: - /// - /// let ref = channel.onBroadcast { [weak self] (message,broadcast) in - /// print(broadcast.event, broadcast.payload) - /// } - /// channel.off(.broadcast, ref1) - /// - /// Subscription returns a ref counter, which can be used later to - /// unsubscribe the exact event listener - /// - parameter callback: Called with the broadcast payload - /// - returns: Ref counter of the subscription. See `func off()` - @discardableResult - public func onBroadcast(callback: @escaping (Message, BroadcastPayload) -> Void) -> Int { - self.on( - .broadcast, - callback: { message in - let payload = BroadcastPayload( - type: message.payload["type"] as! String, event: message.payload["event"] as! String, - payload: message.payload["payload"] as! Payload) - callback(message, payload) - }) - } - -} -// ---------------------------------------------------------------------- - -// MARK: - Presence API - -// ---------------------------------------------------------------------- - -extension Channel { - /// Share presence state, available to all channel members via sync - /// - parameter payload: The payload to broadcast - @discardableResult - public func track(payload: Payload) -> Push { - self.push( - .presence, - payload: [ - "type": "presence", - "event": "track", - "payload": payload, - ]) - } - - /// Share presence state, available to all channel members via sync - /// - parameter payload: The payload to broadcast - /// - parameter encoder: The encoder to use to encode the payload - /// - throws: Throws an error if the payload cannot be encoded - @discardableResult - public func track(payload: Encodable, encoder: JSONEncoder = Defaults.encoder) throws -> Push { - self.track(payload: try Payload(payload)) - } - - /// Remove presence state for given channel - @discardableResult - public func untrack() -> Push { - self.push( - .presence, - payload: [ - "type": "presence", - "event": "untrack", - ]) +extension [String: Any] { + subscript(_ key: Key, as _: T.Type) -> T? { + self[key] as? T } } diff --git a/Sources/Realtime/RealtimeClient.swift b/Sources/Realtime/RealtimeClient.swift index eb9ab62a..a559c3bc 100644 --- a/Sources/Realtime/RealtimeClient.swift +++ b/Sources/Realtime/RealtimeClient.swift @@ -19,6 +19,7 @@ // THE SOFTWARE. import Foundation +@_spi(Internal) import _Helpers public enum SocketError: Error { case abnormalClosureError @@ -32,26 +33,27 @@ public typealias PayloadClosure = () -> Payload? /// Struct that gathers callbacks assigned to the Socket struct StateChangeCallbacks { - var open: [(ref: String, callback: Delegated)] = [] - var close: [(ref: String, callback: Delegated<(Int, String?), Void>)] = [] - var error: [(ref: String, callback: Delegated<(Error, URLResponse?), Void>)] = [] - var message: [(ref: String, callback: Delegated)] = [] + var open: LockIsolated<[(ref: String, callback: Delegated)]> = .init([]) + var close: LockIsolated<[(ref: String, callback: Delegated<(Int, String?), Void>)]> = .init([]) + var error: LockIsolated<[(ref: String, callback: Delegated<(Error, URLResponse?), Void>)]> = + .init([]) + var message: LockIsolated<[(ref: String, callback: Delegated)]> = .init([]) } /// ## Socket Connection /// A single connection is established to the server and /// channels are multiplexed over the connection. -/// Connect to the server using the `Socket` class: +/// Connect to the server using the `RealtimeClient` class: /// /// ```swift -/// let socket = new Socket("/socket", paramsClosure: { ["userToken": "123" ] }) +/// let socket = new RealtimeClient("/socket", paramsClosure: { ["userToken": "123" ] }) /// socket.connect() /// ``` /// -/// The `Socket` constructor takes the mount point of the socket, +/// The `RealtimeClient` constructor takes the mount point of the socket, /// the authentication params, as well as options that can be found in /// the Socket docs, such as configuring the heartbeat. -public class RealtimeClient: TransportDelegate { +public class RealtimeClient: PhoenixTransportDelegate { // ---------------------------------------------------------------------- // MARK: - Public Attributes @@ -70,7 +72,7 @@ public class RealtimeClient: TransportDelegate { /// If the `Socket` was created with static params, then those will be /// returned every time. public var params: Payload? { - return paramsClosure?() + paramsClosure?() } /// The optional params closure used to get params when connecting. Must @@ -79,7 +81,7 @@ public class RealtimeClient: TransportDelegate { /// The WebSocket transport. Default behavior is to provide a /// URLSessionWebsocketTask. See README for alternatives. - private let transport: (URL) -> Transport + private let transport: (URL) -> PhoenixTransport /// Phoenix serializer version, defaults to "2.0.0" public let vsn: String @@ -93,10 +95,14 @@ public class RealtimeClient: TransportDelegate { /// Timeout to use when opening connections public var timeout: TimeInterval = Defaults.timeoutInterval + /// Custom headers to be added to the socket connection request + public var headers: [String: Any] = [:] + /// Interval between sending a heartbeat public var heartbeatInterval: TimeInterval = Defaults.heartbeatInterval - /// The maximum amount of time which the system may delay heartbeats in order to optimize power usage + /// The maximum amount of time which the system may delay heartbeats in order to optimize power + /// usage public var heartbeatLeeway: DispatchTimeInterval = Defaults.heartbeatLeeway /// Interval between socket reconnect attempts, in seconds @@ -136,14 +142,14 @@ public class RealtimeClient: TransportDelegate { var stateChangeCallbacks: StateChangeCallbacks = .init() /// Collection on channels created for the Socket - public internal(set) var channels: [Channel] = [] + public internal(set) var channels: [RealtimeChannel] = [] /// Buffers messages that need to be sent once the socket has connected. It is an array /// of tuples, with the ref of the message to send and the callback that will send the message. var sendBuffer: [(ref: String?, callback: () throws -> Void)] = [] /// Ref counter for messages - var ref: UInt64 = .min // 0 (max: 18,446,744,073,709,551,615) + var ref: UInt64 = .min // 0 (max: 18,446,744,073,709,551,615) /// Timer that triggers sending new Heartbeat messages var heartbeatTimer: HeartbeatTimer? @@ -158,7 +164,9 @@ public class RealtimeClient: TransportDelegate { var closeStatus: CloseStatus = .unknown /// The connection to the server - var connection: Transport? = nil + var connection: PhoenixTransport? = nil + + var accessToken: String? // ---------------------------------------------------------------------- @@ -195,7 +203,7 @@ public class RealtimeClient: TransportDelegate { public init( endPoint: String, - transport: @escaping ((URL) -> Transport), + transport: @escaping ((URL) -> PhoenixTransport), paramsClosure: PayloadClosure? = nil, vsn: String = Defaults.vsn ) { @@ -203,6 +211,12 @@ public class RealtimeClient: TransportDelegate { self.paramsClosure = paramsClosure self.endPoint = endPoint self.vsn = vsn + let params = paramsClosure?() + if let jwt = (params?["Authorization"] as? String)?.split(separator: " ").last { + accessToken = String(jwt) + } else { + accessToken = params?["apikey"] as? String + } endPointUrl = RealtimeClient.buildEndpointUrl( endpoint: endPoint, paramsClosure: paramsClosure, @@ -242,12 +256,28 @@ public class RealtimeClient: TransportDelegate { /// - return: True if the socket is connected public var isConnected: Bool { - return connectionState == .open + connectionState == .open } /// - return: The state of the connect. [.connecting, .open, .closing, .closed] - public var connectionState: TransportReadyState { - return connection?.readyState ?? .closed + public var connectionState: PhoenixTransportReadyState { + connection?.readyState ?? .closed + } + + /// Sets the JWT access token used for channel subscription authorization and Realtime RLS. + /// - Parameter token: A JWT string. + public func setAuth(_ token: String?) { + accessToken = token + + for channel in channels { + if token != nil { + channel.params["user_token"] = token + } + + if channel.joinedOnce, channel.isJoined { + channel.push(ChannelEvent.accessToken, payload: ["access_token": token as Any]) + } + } } /// Connects the Socket. The params passed to the Socket on initialization @@ -278,7 +308,7 @@ public class RealtimeClient: TransportDelegate { // self.connection?.enabledSSLCipherSuites = enabledSSLCipherSuites // #endif - connection?.connect() + connection?.connect(with: headers) } /// Disconnects the socket @@ -298,19 +328,19 @@ public class RealtimeClient: TransportDelegate { teardown(code: code, reason: reason, callback: callback) } - internal func teardown( + func teardown( code: CloseCode = CloseCode.normal, reason: String? = nil, callback: (() -> Void)? = nil ) { connection?.delegate = nil connection?.disconnect(code: code.rawValue, reason: reason) connection = nil - // The socket connection has been torndown, heartbeats are not needed + // The socket connection has been turndown, heartbeats are not needed heartbeatTimer?.stop() // Since the connection's delegate was nil'd out, inform all state // callbacks that the connection has closed - stateChangeCallbacks.close.forEach { $0.callback.call((code.rawValue, reason)) } + stateChangeCallbacks.close.value.forEach { $0.callback.call((code.rawValue, reason)) } callback?() } @@ -332,7 +362,7 @@ public class RealtimeClient: TransportDelegate { /// - parameter callback: Called when the Socket is opened @discardableResult public func onOpen(callback: @escaping () -> Void) -> String { - return onOpen { _ in callback() } + onOpen { _ in callback() } } /// Registers callbacks for connection open events. Does not handle retain @@ -350,7 +380,9 @@ public class RealtimeClient: TransportDelegate { var delegated = Delegated() delegated.manuallyDelegate(with: callback) - return append(callback: delegated, to: &stateChangeCallbacks.open) + return stateChangeCallbacks.open.withValue { + self.append(callback: delegated, to: &$0) + } } /// Registers callbacks for connection open events. Automatically handles @@ -369,7 +401,7 @@ public class RealtimeClient: TransportDelegate { to owner: T, callback: @escaping ((T) -> Void) ) -> String { - return delegateOnOpen(to: owner) { owner, _ in callback(owner) } + delegateOnOpen(to: owner) { owner, _ in callback(owner) } } /// Registers callbacks for connection open events. Automatically handles @@ -391,7 +423,9 @@ public class RealtimeClient: TransportDelegate { var delegated = Delegated() delegated.delegate(to: owner, with: callback) - return append(callback: delegated, to: &stateChangeCallbacks.open) + return stateChangeCallbacks.open.withValue { + self.append(callback: delegated, to: &$0) + } } /// Registers callbacks for connection close events. Does not handle retain @@ -406,7 +440,7 @@ public class RealtimeClient: TransportDelegate { /// - parameter callback: Called when the Socket is closed @discardableResult public func onClose(callback: @escaping () -> Void) -> String { - return onClose { _, _ in callback() } + onClose { _, _ in callback() } } /// Registers callbacks for connection close events. Does not handle retain @@ -424,7 +458,9 @@ public class RealtimeClient: TransportDelegate { var delegated = Delegated<(Int, String?), Void>() delegated.manuallyDelegate(with: callback) - return append(callback: delegated, to: &stateChangeCallbacks.close) + return stateChangeCallbacks.close.withValue { + self.append(callback: delegated, to: &$0) + } } /// Registers callbacks for connection close events. Automatically handles @@ -443,7 +479,7 @@ public class RealtimeClient: TransportDelegate { to owner: T, callback: @escaping ((T) -> Void) ) -> String { - return delegateOnClose(to: owner) { owner, _ in callback(owner) } + delegateOnClose(to: owner) { owner, _ in callback(owner) } } /// Registers callbacks for connection close events. Automatically handles @@ -465,7 +501,9 @@ public class RealtimeClient: TransportDelegate { var delegated = Delegated<(Int, String?), Void>() delegated.delegate(to: owner, with: callback) - return append(callback: delegated, to: &stateChangeCallbacks.close) + return stateChangeCallbacks.close.withValue { + self.append(callback: delegated, to: &$0) + } } /// Registers callbacks for connection error events. Does not handle retain @@ -483,7 +521,9 @@ public class RealtimeClient: TransportDelegate { var delegated = Delegated<(Error, URLResponse?), Void>() delegated.manuallyDelegate(with: callback) - return append(callback: delegated, to: &stateChangeCallbacks.error) + return stateChangeCallbacks.error.withValue { + self.append(callback: delegated, to: &$0) + } } /// Registers callbacks for connection error events. Automatically handles @@ -505,7 +545,9 @@ public class RealtimeClient: TransportDelegate { var delegated = Delegated<(Error, URLResponse?), Void>() delegated.delegate(to: owner, with: callback) - return append(callback: delegated, to: &stateChangeCallbacks.error) + return stateChangeCallbacks.error.withValue { + self.append(callback: delegated, to: &$0) + } } /// Registers callbacks for connection message events. Does not handle @@ -524,7 +566,9 @@ public class RealtimeClient: TransportDelegate { var delegated = Delegated() delegated.manuallyDelegate(with: callback) - return append(callback: delegated, to: &stateChangeCallbacks.message) + return stateChangeCallbacks.message.withValue { + append(callback: delegated, to: &$0) + } } /// Registers callbacks for connection message events. Automatically handles @@ -546,10 +590,14 @@ public class RealtimeClient: TransportDelegate { var delegated = Delegated() delegated.delegate(to: owner, with: callback) - return append(callback: delegated, to: &stateChangeCallbacks.message) + return stateChangeCallbacks.message.withValue { + self.append(callback: delegated, to: &$0) + } } - private func append(callback: T, to array: inout [(ref: String, callback: T)]) -> String { + private func append(callback: T, to array: inout [(ref: String, callback: T)]) + -> String + { let ref = makeRef() array.append((ref, callback)) return ref @@ -559,33 +607,16 @@ public class RealtimeClient: TransportDelegate { /// call this method when you are finished when the Socket in order to release /// any references held by the socket. public func releaseCallbacks() { - stateChangeCallbacks.open.removeAll() - stateChangeCallbacks.close.removeAll() - stateChangeCallbacks.error.removeAll() - stateChangeCallbacks.message.removeAll() + stateChangeCallbacks.open.setValue([]) + stateChangeCallbacks.close.setValue([]) + stateChangeCallbacks.error.setValue([]) + stateChangeCallbacks.message.setValue([]) } // ---------------------------------------------------------------------- // MARK: - Channel Initialization - // ---------------------------------------------------------------------- - /// Initialize a new Channel - /// - /// Example: - /// - /// let channel = socket.channel("rooms", options: ChannelOptions(presenceKey: "user123")) - /// - /// - parameter topic: Topic of the channel - /// - parameter options: Optional. Options to configure channel broadcast and presence. Leave nil for postgres channel. - /// - return: A new channel - public func channel( - _ topic: ChannelTopic, - options: ChannelOptions? = nil - ) -> Channel { - let channel = Channel(topic: topic, options: options, socket: self) - channels.append(channel) - return channel - } + // ---------------------------------------------------------------------- /// Initialize a new Channel /// @@ -596,12 +627,13 @@ public class RealtimeClient: TransportDelegate { /// - parameter topic: Topic of the channel /// - parameter params: Optional. Parameters for the channel /// - return: A new channel - @available(*, deprecated, renamed: "channel(_:options:)") public func channel( - _ topic: ChannelTopic, - params: [String: Any] - ) -> Channel { - let channel = Channel(topic: topic, params: params, socket: self) + _ topic: String, + params: RealtimeChannelOptions = .init() + ) -> RealtimeChannel { + let channel = RealtimeChannel( + topic: "realtime:\(topic)", params: params.params, socket: self + ) channels.append(channel) return channel @@ -617,7 +649,7 @@ public class RealtimeClient: TransportDelegate { /// socket.remove(channel) /// /// - parameter channel: Channel to remove - public func remove(_ channel: Channel) { + public func remove(_ channel: RealtimeChannel) { off(channel.stateChangeRefs) channels.removeAll(where: { $0.joinRef == channel.joinRef }) } @@ -627,17 +659,25 @@ public class RealtimeClient: TransportDelegate { /// /// - Parameter refs: List of refs returned by calls to `onOpen`, `onClose`, etc public func off(_ refs: [String]) { - stateChangeCallbacks.open = stateChangeCallbacks.open.filter { - !refs.contains($0.ref) + stateChangeCallbacks.open.withValue { + $0 = $0.filter { + !refs.contains($0.ref) + } } - stateChangeCallbacks.close = stateChangeCallbacks.close.filter { - !refs.contains($0.ref) + stateChangeCallbacks.close.withValue { + $0 = $0.filter { + !refs.contains($0.ref) + } } - stateChangeCallbacks.error = stateChangeCallbacks.error.filter { - !refs.contains($0.ref) + stateChangeCallbacks.error.withValue { + $0 = $0.filter { + !refs.contains($0.ref) + } } - stateChangeCallbacks.message = stateChangeCallbacks.message.filter { - !refs.contains($0.ref) + stateChangeCallbacks.message.withValue { + $0 = $0.filter { + !refs.contains($0.ref) + } } } @@ -655,15 +695,16 @@ public class RealtimeClient: TransportDelegate { /// - parameter payload: /// - parameter ref: Optional. Defaults to nil /// - parameter joinRef: Optional. Defaults to nil - internal func push( - topic: ChannelTopic, - event: ChannelEvent, + func push( + topic: String, + event: String, payload: Payload, ref: String? = nil, joinRef: String? = nil ) { - let callback: (() throws -> Void) = { - let body: [Any?] = [joinRef, ref, topic.rawValue, event.rawValue, payload] + let callback: (() throws -> Void) = { [weak self] in + guard let self else { return } + let body: [Any?] = [joinRef, ref, topic, event, payload] let data = self.encode(body) self.logItems("push", "Sending \(String(data: data, encoding: String.Encoding.utf8) ?? "")") @@ -700,7 +741,7 @@ public class RealtimeClient: TransportDelegate { // ---------------------------------------------------------------------- /// Called when the underlying Websocket connects to it's host - internal func onConnectionOpen(response: URLResponse?) { + func onConnectionOpen(response: URLResponse?) { logItems("transport", "Connected to \(endPoint)") // Reset the close status now that the socket has been connected @@ -716,10 +757,10 @@ public class RealtimeClient: TransportDelegate { resetHeartbeat() // Inform all onOpen callbacks that the Socket has opened - stateChangeCallbacks.open.forEach { $0.callback.call(response) } + stateChangeCallbacks.open.value.forEach { $0.callback.call(response) } } - internal func onConnectionClosed(code: Int, reason: String?) { + func onConnectionClosed(code: Int, reason: String?) { logItems("transport", "close") // Send an error to all channels @@ -734,20 +775,20 @@ public class RealtimeClient: TransportDelegate { reconnectTimer.scheduleTimeout() } - stateChangeCallbacks.close.forEach { $0.callback.call((code, reason)) } + stateChangeCallbacks.close.value.forEach { $0.callback.call((code, reason)) } } - internal func onConnectionError(_ error: Error, response: URLResponse?) { + func onConnectionError(_ error: Error, response: URLResponse?) { logItems("transport", error, response ?? "") // Send an error to all channels triggerChannelError() // Inform any state callbacks of the error - stateChangeCallbacks.error.forEach { $0.callback.call((error, response)) } + stateChangeCallbacks.error.value.forEach { $0.callback.call((error, response)) } } - internal func onConnectionMessage(_ rawMessage: String) { + func onConnectionMessage(_ rawMessage: String) { logItems("receive ", rawMessage) guard @@ -762,7 +803,7 @@ public class RealtimeClient: TransportDelegate { // Clear heartbeat ref, preventing a heartbeat timeout disconnect if message.ref == pendingHeartbeatRef { pendingHeartbeatRef = nil } - if message.event == .close { + if message.event == "phx_close" { print("Close Event Received") } @@ -772,11 +813,11 @@ public class RealtimeClient: TransportDelegate { .forEach { $0.trigger(message) } // Inform all onMessage callbacks of the message - stateChangeCallbacks.message.forEach { $0.callback.call(message) } + stateChangeCallbacks.message.value.forEach { $0.callback.call(message) } } /// Triggers an error event to all of the connected Channels - internal func triggerChannelError() { + func triggerChannelError() { channels.forEach { channel in // Only trigger a channel error if it is in an "opened" state if !(channel.isErrored || channel.isLeaving || channel.isClosed) { @@ -786,19 +827,19 @@ public class RealtimeClient: TransportDelegate { } /// Send all messages that were buffered before the socket opened - internal func flushSendBuffer() { - guard isConnected && sendBuffer.count > 0 else { return } + func flushSendBuffer() { + guard isConnected, sendBuffer.count > 0 else { return } sendBuffer.forEach { try? $0.callback() } sendBuffer = [] } /// Removes an item from the sendBuffer with the matching ref - internal func removeFromSendBuffer(ref: String) { + func removeFromSendBuffer(ref: String) { sendBuffer = sendBuffer.filter { $0.ref != ref } } /// Builds a fully qualified socket `URL` from `endPoint` and `params`. - internal static func buildEndpointUrl( + static func buildEndpointUrl( endpoint: String, paramsClosure params: PayloadClosure?, vsn: String ) -> URL { guard @@ -824,7 +865,8 @@ public class RealtimeClient: TransportDelegate { urlComponents.queryItems?.append( contentsOf: params.map { URLQueryItem(name: $0.key, value: String(describing: $0.value)) - }) + } + ) } guard let qualifiedUrl = urlComponents.url @@ -833,13 +875,13 @@ public class RealtimeClient: TransportDelegate { } // Leaves any channel that is open that has a duplicate topic - internal func leaveOpenTopic(topic: ChannelTopic) { + func leaveOpenTopic(topic: String) { guard let dupe = channels.first(where: { $0.topic == topic && ($0.isJoined || $0.isJoining) }) else { return } logItems("transport", "leaving duplicate topic: [\(topic)]") - dupe.leave() + dupe.unsubscribe() } // ---------------------------------------------------------------------- @@ -847,7 +889,7 @@ public class RealtimeClient: TransportDelegate { // MARK: - Heartbeat // ---------------------------------------------------------------------- - internal func resetHeartbeat() { + func resetHeartbeat() { // Clear anything related to the heartbeat pendingHeartbeatRef = nil heartbeatTimer?.stop() @@ -886,24 +928,24 @@ public class RealtimeClient: TransportDelegate { // The last heartbeat was acknowledged by the server. Send another one pendingHeartbeatRef = makeRef() push( - topic: .heartbeat, + topic: "phoenix", event: ChannelEvent.heartbeat, payload: [:], ref: pendingHeartbeatRef ) } - internal func abnormalClose(_ reason: String) { + func abnormalClose(_ reason: String) { closeStatus = .abnormal /* - We use NORMAL here since the client is the one determining to close the - connection. However, we set to close status to abnormal so that - the client knows that it should attempt to reconnect. + We use NORMAL here since the client is the one determining to close the + connection. However, we set to close status to abnormal so that + the client knows that it should attempt to reconnect. - If the server subsequently acknowledges with code 1000 (normal close), - the socket will keep the `.abnormal` close status and trigger a reconnection. - */ + If the server subsequently acknowledges with code 1000 (normal close), + the socket will keep the `.abnormal` close status and trigger a reconnection. + */ connection?.disconnect(code: CloseCode.normal.rawValue, reason: reason) } diff --git a/Sources/Realtime/RealtimeError.swift b/Sources/Realtime/RealtimeError.swift new file mode 100644 index 00000000..db0d3770 --- /dev/null +++ b/Sources/Realtime/RealtimeError.swift @@ -0,0 +1,16 @@ +// +// RealtimeError.swift +// +// +// Created by Guilherme Souza on 30/10/23. +// + +import Foundation + +struct RealtimeError: LocalizedError { + var errorDescription: String? + + init(_ errorDescription: String) { + self.errorDescription = errorDescription + } +} diff --git a/Sources/Realtime/SynchronizedArray.swift b/Sources/Realtime/SynchronizedArray.swift deleted file mode 100644 index e7345ce3..00000000 --- a/Sources/Realtime/SynchronizedArray.swift +++ /dev/null @@ -1,33 +0,0 @@ -// -// SynchronizedArray.swift -// SwiftPhoenixClient -// -// Created by Daniel Rees on 4/12/23. -// Copyright © 2023 SwiftPhoenixClient. All rights reserved. -// - -import Foundation - -/// A thread-safe array. -public class SynchronizedArray { - fileprivate let queue = DispatchQueue(label: "spc_sync_array", attributes: .concurrent) - fileprivate var array = [Element]() - - func append(_ newElement: Element) { - queue.async(flags: .barrier) { - self.array.append(newElement) - } - } - - func removeAll(where shouldBeRemoved: @escaping (Element) -> Bool) { - queue.async(flags: .barrier) { - self.array.removeAll(where: shouldBeRemoved) - } - } - - func filter(_ isIncluded: (Element) -> Bool) -> [Element] { - var result = [Element]() - queue.sync { result = self.array.filter(isIncluded) } - return result - } -} diff --git a/Tests/RealtimeTests/ChannelTopicTests.swift b/Tests/RealtimeTests/ChannelTopicTests.swift deleted file mode 100644 index 5a21bfd2..00000000 --- a/Tests/RealtimeTests/ChannelTopicTests.swift +++ /dev/null @@ -1,19 +0,0 @@ -import XCTest - -@testable import Realtime - -final class ChannelTopicTests: XCTestCase { - func testRawValue() { - XCTAssertEqual(ChannelTopic.all, ChannelTopic(rawValue: "realtime:*")) - XCTAssertEqual(ChannelTopic.all, ChannelTopic(rawValue: "*")) - XCTAssertEqual(ChannelTopic.schema("public"), ChannelTopic(rawValue: "realtime:public")) - XCTAssertEqual( - ChannelTopic.table("users", schema: "public"), ChannelTopic(rawValue: "realtime:public:users") - ) - XCTAssertEqual( - ChannelTopic.column("email", value: "mail@supabase.io", table: "users", schema: "public"), - ChannelTopic(rawValue: "realtime:public:users:email=eq.mail@supabase.io") - ) - XCTAssertEqual(ChannelTopic.heartbeat, ChannelTopic(rawValue: "phoenix")) - } -} diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index b8a6868d..78bbb70e 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -3,126 +3,127 @@ import XCTest @testable import Realtime final class RealtimeTests: XCTestCase { - var supabaseUrl: String { - guard let url = ProcessInfo.processInfo.environment["supabaseUrl"] else { - XCTFail("supabaseUrl not defined in environment.") - return "" - } - - return url - } - - var supabaseKey: String { - guard let key = ProcessInfo.processInfo.environment["supabaseKey"] else { - XCTFail("supabaseKey not defined in environment.") - return "" - } - return key - } - - func testConnection() throws { - try XCTSkipIf( - ProcessInfo.processInfo.environment["INTEGRATION_TESTS"] == nil, - "INTEGRATION_TESTS not defined" - ) - - let socket = RealtimeClient( - "\(supabaseUrl)/realtime/v1", params: ["apikey": supabaseKey] - ) - - let e = expectation(description: "testConnection") - socket.onOpen { - XCTAssertEqual(socket.isConnected, true) - DispatchQueue.main.asyncAfter(deadline: .now() + 1) { - socket.disconnect() - } - } - - socket.onError { error, _ in - XCTFail(error.localizedDescription) - } - - socket.onClose { - XCTAssertEqual(socket.isConnected, false) - e.fulfill() - } - - socket.connect() - - waitForExpectations(timeout: 3000) { error in - if let error = error { - XCTFail("\(self.name)) failed: \(error.localizedDescription)") - } - } - } - - func testChannelCreation() throws { - try XCTSkipIf( - ProcessInfo.processInfo.environment["INTEGRATION_TESTS"] == nil, - "INTEGRATION_TESTS not defined" - ) - - let client = RealtimeClient( - "\(supabaseUrl)/realtime/v1", params: ["apikey": supabaseKey] - ) - let allChanges = client.channel(.all) - allChanges.on(.all) { message in - print(message) - } - allChanges.join() - allChanges.leave() - allChanges.off(.all) - - let allPublicInsertChanges = client.channel(.schema("public")) - allPublicInsertChanges.on(.insert) { message in - print(message) - } - allPublicInsertChanges.join() - allPublicInsertChanges.leave() - allPublicInsertChanges.off(.insert) - - let allUsersUpdateChanges = client.channel(.table("users", schema: "public")) - allUsersUpdateChanges.on(.update) { message in - print(message) - } - allUsersUpdateChanges.join() - allUsersUpdateChanges.leave() - allUsersUpdateChanges.off(.update) - - let allUserId99Changes = client.channel( - .column("id", value: "99", table: "users", schema: "public")) - allUserId99Changes.on(.all) { message in - print(message) - } - allUserId99Changes.join() - allUserId99Changes.leave() - allUserId99Changes.off(.all) - - XCTAssertEqual(client.isConnected, false) - - let e = expectation(description: name) - client.onOpen { - XCTAssertEqual(client.isConnected, true) - DispatchQueue.main.asyncAfter(deadline: .now() + 1) { - client.disconnect() - } - } - - client.onError { error, _ in - XCTFail(error.localizedDescription) - } - - client.onClose { - XCTAssertEqual(client.isConnected, false) - e.fulfill() - } - - client.connect() - - waitForExpectations(timeout: 3000) { error in - if let error = error { - XCTFail("\(self.name)) failed: \(error.localizedDescription)") - } - } - } +// var supabaseUrl: String { +// guard let url = ProcessInfo.processInfo.environment["supabaseUrl"] else { +// XCTFail("supabaseUrl not defined in environment.") +// return "" +// } +// +// return url +// } +// +// var supabaseKey: String { +// guard let key = ProcessInfo.processInfo.environment["supabaseKey"] else { +// XCTFail("supabaseKey not defined in environment.") +// return "" +// } +// return key +// } +// +// func testConnection() throws { +// try XCTSkipIf( +// ProcessInfo.processInfo.environment["INTEGRATION_TESTS"] == nil, +// "INTEGRATION_TESTS not defined" +// ) +// +// let socket = RealtimeClient( +// "\(supabaseUrl)/realtime/v1", params: ["apikey": supabaseKey] +// ) +// +// let e = expectation(description: "testConnection") +// socket.onOpen { +// XCTAssertEqual(socket.isConnected, true) +// DispatchQueue.main.asyncAfter(deadline: .now() + 1) { +// socket.disconnect() +// } +// } +// +// socket.onError { error, _ in +// XCTFail(error.localizedDescription) +// } +// +// socket.onClose { +// XCTAssertEqual(socket.isConnected, false) +// e.fulfill() +// } +// +// socket.connect() +// +// waitForExpectations(timeout: 3000) { error in +// if let error { +// XCTFail("\(self.name)) failed: \(error.localizedDescription)") +// } +// } +// } +// +// func testChannelCreation() throws { +// try XCTSkipIf( +// ProcessInfo.processInfo.environment["INTEGRATION_TESTS"] == nil, +// "INTEGRATION_TESTS not defined" +// ) +// +// let client = RealtimeClient( +// "\(supabaseUrl)/realtime/v1", params: ["apikey": supabaseKey] +// ) +// let allChanges = client.channel(.all) +// allChanges.on(.all) { message in +// print(message) +// } +// allChanges.join() +// allChanges.leave() +// allChanges.off(.all) +// +// let allPublicInsertChanges = client.channel(.schema("public")) +// allPublicInsertChanges.on(.insert) { message in +// print(message) +// } +// allPublicInsertChanges.join() +// allPublicInsertChanges.leave() +// allPublicInsertChanges.off(.insert) +// +// let allUsersUpdateChanges = client.channel(.table("users", schema: "public")) +// allUsersUpdateChanges.on(.update) { message in +// print(message) +// } +// allUsersUpdateChanges.join() +// allUsersUpdateChanges.leave() +// allUsersUpdateChanges.off(.update) +// +// let allUserId99Changes = client.channel( +// .column("id", value: "99", table: "users", schema: "public") +// ) +// allUserId99Changes.on(.all) { message in +// print(message) +// } +// allUserId99Changes.join() +// allUserId99Changes.leave() +// allUserId99Changes.off(.all) +// +// XCTAssertEqual(client.isConnected, false) +// +// let e = expectation(description: name) +// client.onOpen { +// XCTAssertEqual(client.isConnected, true) +// DispatchQueue.main.asyncAfter(deadline: .now() + 1) { +// client.disconnect() +// } +// } +// +// client.onError { error, _ in +// XCTFail(error.localizedDescription) +// } +// +// client.onClose { +// XCTAssertEqual(client.isConnected, false) +// e.fulfill() +// } +// +// client.connect() +// +// waitForExpectations(timeout: 3000) { error in +// if let error { +// XCTFail("\(self.name)) failed: \(error.localizedDescription)") +// } +// } +// } }