Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise the emitter behaviour #850

Merged
merged 3 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 39 additions & 39 deletions Sources/Core/Emitter/Emitter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,14 @@ class Emitter: EmitterEventProcessing {
}

/// Insert a Payload object into the buffer to be sent to collector.
/// This method will add the payload to the database and flush (send all events).
/// This method will add the payload to the database and flush (send all events) when the buffer is full.
/// - Parameter eventPayload: A Payload containing a completed event to be added into the buffer.
func addPayload(toBuffer eventPayload: Payload) {
self.eventStore.addEvent(eventPayload)
self.flush()

if self.eventStore.count() >= self.bufferOption.rawValue {
self.flush()
}
}

/// Empties the buffer of events using the respective HTTP request method.
Expand Down Expand Up @@ -377,46 +380,43 @@ class Emitter: EmitterEventProcessing {
requests.append(request)
}
} else {
var i = 0
while i < events.count {
var eventArray: [Payload] = []
var indexArray: [Int64] = []

let iUntil = min(i + bufferOption.rawValue, events.count)
for j in i..<iUntil {
let event = events[j]

let payload = event.payload
let emitterEventId = event.storeId
addSendingTime(to: payload, timestamp: sendingTime)

if isOversize(payload, byteLimit: byteLimit) {
let request = Request(payload: payload, emitterEventId: emitterEventId, oversize: true)
requests.append(request)
} else if isOversize(payload, byteLimit: byteLimit, previousPayloads: eventArray) {
let request = Request(payloads: eventArray, emitterEventIds: indexArray)
requests.append(request)

// Clear collection and build a new POST
eventArray = []
indexArray = []

// Build and store the request
eventArray.append(payload)
indexArray.append(emitterEventId)
} else {
// Add event to collections
eventArray.append(payload)
indexArray.append(emitterEventId)
}
var eventPayloads: [Payload] = []
var eventIds: [Int64] = []

for event in events {
let payload = event.payload
let emitterEventId = event.storeId
addSendingTime(to: payload, timestamp: sendingTime)

// Oversize event -> separate requests
if isOversize(payload, byteLimit: byteLimit) {
let request = Request(payload: payload, emitterEventId: emitterEventId, oversize: true)
requests.append(request)
}

// Check if all payloads have been processed
if eventArray.count != 0 {
let request = Request(payloads: eventArray, emitterEventIds: indexArray)
// Events up to this one are oversize -> create request for them
else if isOversize(payload, byteLimit: byteLimit, previousPayloads: eventPayloads) {
let request = Request(payloads: eventPayloads, emitterEventIds: eventIds)
requests.append(request)

// Clear collection and build a new POST
eventPayloads = []
eventIds = []

// Build and store the request
eventPayloads.append(payload)
eventIds.append(emitterEventId)
}
i += bufferOption.rawValue
// Add to the list of events for the request
else {
eventPayloads.append(payload)
eventIds.append(emitterEventId)
}
}

// Check if there are any remaining events not in a request
if !eventPayloads.isEmpty {
let request = Request(payloads: eventPayloads, emitterEventIds: eventIds)
requests.append(request)
}
}
return requests
Expand Down
2 changes: 1 addition & 1 deletion Sources/Snowplow/Configurations/EmitterConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import Foundation
@objc(SPEmitterConfigurationProtocol)
public protocol EmitterConfigurationProtocol: AnyObject {
/// Sets whether the buffer should send events instantly or after the buffer
/// has reached it's limit. By default, this is set to BufferOption Default.
/// has reached it's limit. By default, this is set to BufferOption single.
@objc
var bufferOption: BufferOption { get set }
/// Maximum number of events collected from the EventStore to be sent in a request.
Expand Down
22 changes: 14 additions & 8 deletions Sources/Snowplow/Emitter/BufferOption.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ import Foundation
/// An enum for buffer options.
@objc(SPBufferOption)
public enum BufferOption : Int {
/// Sends both GET and POST requests with only a single event. Can cause a spike in
/// network traffic if used in correlation with a large amount of events.
/// Sends both GET and POST requests with only a single event.
/// This is the default setting.
/// Can cause a spike in network traffic if used in correlation with a large amount of events.
case single = 1
/// Sends POST requests in groups of 10 events. This is the default amount of events too
/// package into a POST. All GET requests will still emit one at a time.
case defaultGroup = 10
/// Sends POST requests in groups of 25 events. Useful for situations where many events
/// need to be sent. All GET requests will still emit one at a time.
/// Sends POST requests in groups of 10 events.
/// All GET requests will still emit one at a time.
case smallGroup = 10
/// Sends POST requests in groups of 25 events.
/// Useful for situations where many events need to be sent.
/// All GET requests will still emit one at a time.
case largeGroup = 25
}

Expand All @@ -32,8 +34,12 @@ extension BufferOption {
switch value {
case "Single":
return .single
case "SmallGroup":
return .smallGroup
case "DefaultGroup":
return .defaultGroup
return .smallGroup
case "LargeGroup":
return .largeGroup
case "HeavyGroup":
return .largeGroup
default:
Expand Down
4 changes: 2 additions & 2 deletions Sources/Snowplow/Emitter/EmitterDefaults.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import Foundation
public class EmitterDefaults {
public private(set) static var httpMethod: HttpMethodOptions = .post
public private(set) static var httpProtocol: ProtocolOptions = .https
public private(set) static var emitRange = 150
public private(set) static var emitRange = BufferOption.largeGroup.rawValue
public private(set) static var emitThreadPoolSize = 15
public private(set) static var byteLimitGet = 40000
public private(set) static var byteLimitPost = 40000
public private(set) static var serverAnonymisation = false
public private(set) static var bufferOption: BufferOption = .defaultGroup
public private(set) static var bufferOption: BufferOption = .single
public private(set) static var retryFailedRequests = true
}
87 changes: 54 additions & 33 deletions Sources/Snowplow/Network/DefaultNetworkConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,47 +96,68 @@ public class DefaultNetworkConnection: NSObject, NetworkConnection {

@objc
public func sendRequests(_ requests: [Request]) -> [RequestResult] {
let urlRequests = requests.map { _httpMethod == .get ? buildGet($0) : buildPost($0) }

var results: [RequestResult] = []

for request in requests {
let urlRequest = _httpMethod == .get
? buildGet(request)
: buildPost(request)

dataOperationQueue.addOperation({
//source: https://forums.developer.apple.com/thread/11519
var httpResponse: HTTPURLResponse? = nil
var connectionError: Error? = nil
var sem: DispatchSemaphore

sem = DispatchSemaphore(value: 0)

URLSession.shared.dataTask(with: urlRequest) { data, urlResponse, error in
connectionError = error
httpResponse = urlResponse as? HTTPURLResponse
sem.signal()
}.resume()

let _ = sem.wait(timeout: .distantFuture)
var statusCode: NSNumber?
if let httpResponse = httpResponse { statusCode = NSNumber(value: httpResponse.statusCode) }

let result = RequestResult(statusCode: statusCode, oversize: request.oversize, storeIds: request.emitterEventIds)
if !result.isSuccessful {
logError(message: "Connection error: " + (connectionError?.localizedDescription ?? "-"))
}

objc_sync_enter(self)
// if there is only one request, make it directly
if requests.count == 1 {
if let request = requests.first, let urlRequest = urlRequests.first {
let result = DefaultNetworkConnection.makeRequest(
request: request,
urlRequest: urlRequest
)

results.append(result)
objc_sync_exit(self)
})
}
}
// if there are more than 1 request, use the operation queue
else if requests.count > 1 {
for (request, urlRequest) in zip(requests, urlRequests) {
dataOperationQueue.addOperation({
let result = DefaultNetworkConnection.makeRequest(
request: request,
urlRequest: urlRequest
)

objc_sync_enter(self)
results.append(result)
objc_sync_exit(self)
})
}
dataOperationQueue.waitUntilAllOperationsAreFinished()
}
dataOperationQueue.waitUntilAllOperationsAreFinished()

return results
}

// MARK: - Private methods

private static func makeRequest(request: Request, urlRequest: URLRequest) -> RequestResult {
//source: https://forums.developer.apple.com/thread/11519
var httpResponse: HTTPURLResponse? = nil
var connectionError: Error? = nil
var sem: DispatchSemaphore

sem = DispatchSemaphore(value: 0)

URLSession.shared.dataTask(with: urlRequest) { data, urlResponse, error in
connectionError = error
httpResponse = urlResponse as? HTTPURLResponse
sem.signal()
}.resume()

let _ = sem.wait(timeout: .distantFuture)
var statusCode: NSNumber?
if let httpResponse = httpResponse { statusCode = NSNumber(value: httpResponse.statusCode) }

let result = RequestResult(statusCode: statusCode, oversize: request.oversize, storeIds: request.emitterEventIds)
if !result.isSuccessful {
logError(message: "Connection error: " + (connectionError?.localizedDescription ?? "-"))
}

return result
}

private func setup() {
// Decode url to extract protocol
let url = URL(string: _urlString)
Expand Down
Loading
Loading