Skip to content

Commit

Permalink
SwiftWebSocket: thread-safety fixes
Browse files Browse the repository at this point in the history
Apply thread-safety fixes from
tidwall/SwiftWebSocket#141 to embedded copy of
SwiftWebSocket
  • Loading branch information
nosnilmot committed Feb 13, 2023
1 parent 06fe3e4 commit d7d4f8f
Showing 1 changed file with 27 additions and 20 deletions.
47 changes: 27 additions & 20 deletions Sources/WebSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ class Deflater {
/// WebSocket objects are bidirectional network streams that communicate over HTTP. RFC 6455.
class InnerWebSocket: Hashable {
var id : Int
var mutex = pthread_mutex_t()
var mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
let request : URLRequest!
let subProtocols : [String]!
var frames : [Frame] = []
Expand Down Expand Up @@ -595,7 +595,8 @@ class InnerWebSocket: Hashable {
}

init(request: URLRequest, subProtocols : [String] = [], stub : Bool = false){
pthread_mutex_init(&mutex, nil)
mutex.initialize(to: pthread_mutex_t())
pthread_mutex_init(mutex, nil)
self.id = manager.nextId()
self.request = request
self.subProtocols = subProtocols
Expand All @@ -621,13 +622,14 @@ class InnerWebSocket: Hashable {
if inputBytes != nil {
free(inputBytes)
}
pthread_mutex_init(&mutex, nil)
pthread_mutex_init(mutex, nil)
mutex.deallocate()
}
@inline(__always) fileprivate func lock(){
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
}
@inline(__always) fileprivate func unlock(){
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}

fileprivate var dirty : Bool {
Expand Down Expand Up @@ -1571,38 +1573,43 @@ private class Manager {
var queue = DispatchQueue(label: "SwiftWebSocketInstance", attributes: [])
let dispatchQueue = DispatchQueue.init(label: "SwiftWebSocket", qos: .userInteractive, attributes: [])
var once = Int()
var mutex = pthread_mutex_t()
var mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
var cond = pthread_cond_t()
var websockets = Set<InnerWebSocket>()
var _nextId = 0
init(){
pthread_mutex_init(&mutex, nil)
mutex.initialize(to: pthread_mutex_t())
pthread_mutex_init(mutex, nil)
pthread_cond_init(&cond, nil)
dispatchQueue.async {
var wss : [InnerWebSocket] = []
while true {
var wait = true
wss.removeAll()
pthread_mutex_lock(&self.mutex)
pthread_mutex_lock(self.mutex)
for ws in self.websockets {
wss.append(ws)
}
for ws in wss {
self.checkForConnectionTimeout(ws)
if ws.dirty {
pthread_mutex_unlock(&self.mutex)
pthread_mutex_unlock(self.mutex)
ws.step()
pthread_mutex_lock(&self.mutex)
pthread_mutex_lock(self.mutex)
wait = false
}
}
if wait {
_ = self.wait(250)
}
pthread_mutex_unlock(&self.mutex)
pthread_mutex_unlock(self.mutex)
}
}
}
deinit{
pthread_mutex_init(mutex, nil)
mutex.deallocate()
}
func checkForConnectionTimeout(_ ws : InnerWebSocket) {
if ws.rd != nil && ws.wr != nil && (ws.rd.streamStatus == .opening || ws.wr.streamStatus == .opening) {
let age = CFAbsoluteTimeGetCurrent() - ws.createdAt
Expand All @@ -1621,28 +1628,28 @@ private class Manager {
ts.tv_nsec = v1 + v2;
ts.tv_sec += ts.tv_nsec / (1000 * 1000 * 1000);
ts.tv_nsec %= (1000 * 1000 * 1000);
return pthread_cond_timedwait(&self.cond, &self.mutex, &ts)
return pthread_cond_timedwait(&self.cond, self.mutex, &ts)
}
func signal(){
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func add(_ websocket: InnerWebSocket) {
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
websockets.insert(websocket)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func remove(_ websocket: InnerWebSocket) {
pthread_mutex_lock(&mutex)
pthread_mutex_lock(mutex)
websockets.remove(websocket)
pthread_cond_signal(&cond)
pthread_mutex_unlock(&mutex)
pthread_mutex_unlock(mutex)
}
func nextId() -> Int {
pthread_mutex_lock(&mutex)
defer { pthread_mutex_unlock(&mutex) }
pthread_mutex_lock(mutex)
defer { pthread_mutex_unlock(mutex) }
_nextId += 1
return _nextId
}
Expand Down

0 comments on commit d7d4f8f

Please sign in to comment.