Skip to content

Commit

Permalink
Merge branch 'master' into ab/tls-2
Browse files Browse the repository at this point in the history
  • Loading branch information
epoberezkin committed Aug 17, 2024
2 parents 4abd389 + f229e13 commit a2b8f29
Show file tree
Hide file tree
Showing 50 changed files with 1,857 additions and 1,075 deletions.
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
# 6.0.0

Version 6.0.0.8

Agent:
- enabled fast handshake support.
- batch-send multiple messages in each connection.
- resume subscriptions as soon as agent moves to foreground or as network connection resumes.
- "known" servers to determine whether to use SMP proxy.
- retry on SMP proxy NO_SESSION error.
- fixes to notification subscriptions.
- persistent server statistics.
- better concurrency.

SMP server:
- reduce threads usage.
- additional statistics.
- improve disabling inactive clients.
- additional control port commands for monitoring.

Notification server:
- support onion-only SMP servers.

# 5.8.2

Agent:
Expand Down
2 changes: 1 addition & 1 deletion package.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: simplexmq
version: 6.0.0.2
version: 6.0.0.8
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,
Expand Down
2 changes: 1 addition & 1 deletion simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack

name: simplexmq
version: 6.0.0.2
version: 6.0.0.8
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
Expand Down
42 changes: 23 additions & 19 deletions src/Simplex/FileTransfer/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import Data.Either (partitionEithers, rights)
import Data.Int (Int64)
import Data.List (foldl', partition, sortOn)
import qualified Data.List.NonEmpty as L
import Data.Map (Map)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (mapMaybe)
import qualified Data.Set as S
Expand Down Expand Up @@ -184,7 +184,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
cfg <- asks config
forever $ do
lift $ waitForWork doWork
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
runXFTPOperation cfg
where
runXFTPOperation :: AgentConfig -> AM ()
Expand All @@ -194,6 +194,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
(fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays) -> do
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
atomically $ incXFTPServerStat c userId srv downloadAttempts
downloadFileChunk fc replica approvedRelays
Expand All @@ -204,7 +205,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
when (serverHostError e) $ notify c rcvFileEntityId $ RFWARN e
liftIO $ closeXFTPServerClient c userId server digest
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
loop
retryDone e = do
atomically . incXFTPServerStat c userId srv $ case e of
Expand All @@ -220,7 +221,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
chunkSpec = XFTPRcvChunkSpec chunkPath chSize (unFileDigest digest)
relChunkPath = fileTmpPath </> takeFileName chunkPath
agentXFTPDownloadChunk c userId digest replica chunkSpec
atomically $ waitUntilForeground c
liftIO $ waitUntilForeground c
(entityId, complete, progress) <- withStore c $ \db -> runExceptT $ do
liftIO $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId relChunkPath
RcvFile {size = FileSize currentSize, chunks, redirect} <- ExceptT $ getRcvFile db rcvFileId
Expand All @@ -239,7 +240,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
where
ipAddressProtected' :: AM Bool
ipAddressProtected' = do
cfg <- liftIO $ getNetworkConfig' c
cfg <- liftIO $ getFastNetworkConfig c
pure $ ipAddressProtected cfg srv
receivedSize :: [RcvFileChunk] -> Int64
receivedSize = foldl' (\sz ch -> sz + receivedChunkSize ch) 0
Expand Down Expand Up @@ -272,7 +273,7 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
cfg <- asks config
forever $ do
lift $ waitForWork doWork
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
runXFTPOperation cfg
where
runXFTPOperation :: AgentConfig -> AM ()
Expand All @@ -298,12 +299,12 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
Nothing -> do
notify c rcvFileEntityId $ RFDONE fsSavePath
lift $ forM_ tmpPath (removePath <=< toFSFilePath)
atomically $ waitUntilForeground c
liftIO $ waitUntilForeground c
withStore' c (`updateRcvFileComplete` rcvFileId)
Just RcvFileRedirect {redirectFileInfo, redirectDbId} -> do
let RedirectFileInfo {size = redirectSize, digest = redirectDigest} = redirectFileInfo
lift $ forM_ tmpPath (removePath <=< toFSFilePath)
atomically $ waitUntilForeground c
liftIO $ waitUntilForeground c
withStore' c (`updateRcvFileComplete` rcvFileId)
-- proceed with redirect
yaml <- liftError (FILE . FILE_IO . show) (CF.readFile $ CryptoFile fsSavePath cfArgs) `agentFinally` (lift $ toFSFilePath fsSavePath >>= removePath)
Expand Down Expand Up @@ -391,7 +392,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
cfg <- asks config
forever $ do
lift $ waitForWork doWork
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
runXFTPOperation cfg
where
runXFTPOperation :: AgentConfig -> AM ()
Expand Down Expand Up @@ -453,16 +454,17 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
SndFileChunkReplica {server} : _ -> Right server
createChunk :: Int -> SndFileChunk -> AM (ProtocolServer 'PXFTP)
createChunk numRecipients' ch = do
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
(replica, ProtoServerWithAuth srv _) <- tryCreate
withStore' c $ \db -> createSndFileReplica db ch replica
pure srv
where
tryCreate = do
usedSrvs <- newTVarIO ([] :: [XFTPServer])
let AgentClient {xftpServers} = c
userSrvCount <- length <$> atomically (TM.lookup userId xftpServers)
userSrvCount <- liftIO $ length <$> TM.lookupIO userId xftpServers
withRetryIntervalCount (riFast ri) $ \n _ loop -> do
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
let triedAllSrvs = n > userSrvCount
createWithNextSrv usedSrvs
Expand All @@ -472,7 +474,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
retryLoop loop triedAllSrvs e = do
flip catchAgentError (\_ -> pure ()) $ do
when (triedAllSrvs && serverHostError e) $ notify c sndFileEntityId $ SFWARN e
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
loop
createWithNextSrv usedSrvs = do
deleted <- withStore' c $ \db -> getSndFileDeleted db sndFileId
Expand All @@ -492,7 +494,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
cfg <- asks config
forever $ do
lift $ waitForWork doWork
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
runXFTPOperation cfg
where
runXFTPOperation :: AgentConfig -> AM ()
Expand All @@ -502,6 +504,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
atomically $ incXFTPServerStat c userId srv uploadAttempts
uploadFileChunk cfg fc replica
Expand All @@ -512,7 +515,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
when (serverHostError e) $ notify c sndFileEntityId $ SFWARN e
liftIO $ closeXFTPServerClient c userId server digest
withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
loop
retryDone e = do
atomically $ incXFTPServerStat c userId srv uploadErrs
Expand All @@ -523,9 +526,9 @@ runXFTPSndWorker c srv Worker {doWork} = do
fsFilePath <- lift $ toFSFilePath filePath
unlessM (doesFileExist fsFilePath) $ throwE $ FILE NO_FILE
let chunkSpec' = chunkSpec {filePath = fsFilePath} :: XFTPChunkSpec
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
agentXFTPUploadChunk c userId chunkDigest replica' chunkSpec'
atomically $ waitUntilForeground c
liftIO $ waitUntilForeground c
sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do
updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded
getSndFile db sndFileId
Expand Down Expand Up @@ -663,7 +666,7 @@ runXFTPDelWorker c srv Worker {doWork} = do
cfg <- asks config
forever $ do
lift $ waitForWork doWork
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
runXFTPOperation cfg
where
runXFTPOperation :: AgentConfig -> AM ()
Expand All @@ -674,6 +677,7 @@ runXFTPDelWorker c srv Worker {doWork} = do
processDeletedReplica replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} = do
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
atomically $ incXFTPServerStat c userId srv deleteAttempts
deleteChunkReplica
Expand All @@ -684,7 +688,7 @@ runXFTPDelWorker c srv Worker {doWork} = do
when (serverHostError e) $ notify c "" $ SFWARN e
liftIO $ closeXFTPServerClient c userId server chunkDigest
withStore' c $ \db -> updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId replicaDelay
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
loop
retryDone e = do
atomically $ incXFTPServerStat c userId srv deleteErrs
Expand All @@ -699,7 +703,7 @@ delWorkerInternalError c deletedSndChunkReplicaId e = do
withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId
notify c "" $ SFERR e

assertAgentForeground :: AgentClient -> STM ()
assertAgentForeground :: AgentClient -> IO ()
assertAgentForeground c = do
throwWhenInactive c
waitUntilForeground c
4 changes: 2 additions & 2 deletions src/Simplex/FileTransfer/Client/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ defaultXFTPClientAgentConfig =
data XFTPClientAgentError = XFTPClientAgentError XFTPServer XFTPClientError
deriving (Show, Exception)

newXFTPAgent :: XFTPClientAgentConfig -> STM XFTPClientAgent
newXFTPAgent :: XFTPClientAgentConfig -> IO XFTPClientAgent
newXFTPAgent config = do
xftpClients <- TM.empty
xftpClients <- TM.emptyIO
pure XFTPClientAgent {xftpClients, config}

type ME a = ExceptT XFTPClientAgentError IO a
Expand Down
8 changes: 4 additions & 4 deletions src/Simplex/FileTransfer/Client/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import Data.Int (Int64)
import Data.List (foldl', sortOn)
import Data.List.NonEmpty (NonEmpty (..), nonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map (Map)
import Data.Map.Strict (Map)
import qualified Data.Map as M
import Data.Maybe (fromMaybe, listToMaybe)
import qualified Data.Text as T
Expand Down Expand Up @@ -313,7 +313,7 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
pure (encPath, fdRcv, fdSnd, chunkSpecs, encSize)
uploadFile :: TVar ChaChaDRG -> [XFTPChunkSpec] -> TVar [Int64] -> Int64 -> ExceptT CLIError IO [SentFileChunk]
uploadFile g chunks uploadedChunks encSize = do
a <- atomically $ newXFTPAgent defaultXFTPClientAgentConfig
a <- liftIO $ newXFTPAgent defaultXFTPClientAgentConfig
gen <- newTVarIO =<< liftIO newStdGen
let xftpSrvs = fromMaybe defaultXFTPServers (nonEmpty xftpServers)
srvs <- liftIO $ replicateM (length chunks) $ getXFTPServer gen xftpSrvs
Expand Down Expand Up @@ -429,7 +429,7 @@ cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath,
receive (ValidFileDescription FileDescription {size, digest, key, nonce, chunks}) = do
encPath <- getEncPath tempPath "xftp"
createDirectory encPath
a <- atomically $ newXFTPAgent defaultXFTPClientAgentConfig
a <- liftIO $ newXFTPAgent defaultXFTPClientAgentConfig
liftIO $ printNoNewLine "Downloading file..."
downloadedChunks <- newTVarIO []
let srv FileChunk {replicas} = case replicas of
Expand Down Expand Up @@ -494,7 +494,7 @@ cliDeleteFile DeleteOptions {fileDescription, retryCount, yes} = do
where
deleteFile :: ValidFileDescription 'FSender -> ExceptT CLIError IO ()
deleteFile (ValidFileDescription FileDescription {chunks}) = do
a <- atomically $ newXFTPAgent defaultXFTPClientAgentConfig
a <- liftIO $ newXFTPAgent defaultXFTPClientAgentConfig
forM_ chunks $ deleteFileChunk a
liftIO $ do
printNoNewLine "File deleted!"
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/FileTransfer/Description.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import Data.Int (Int64)
import Data.List (foldl', sortOn)
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
import Data.Map (Map)
import Data.Map.Strict (Map)
import qualified Data.Map as M
import Data.Maybe (fromMaybe)
import Data.String
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/FileTransfer/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
Right pk' -> pure pk'
Left e -> putStrLn ("servers has no valid key: " <> show e) >> exitFailure
env <- ask
sessions <- atomically TM.empty
sessions <- liftIO TM.emptyIO
let cleanup sessionId = atomically $ TM.delete sessionId sessions
liftIO . runHTTP2Server started xftpPort defaultHTTP2BufferSize serverParams transportConfig inactiveClientExpiration cleanup $ \sessionId sessionALPN r sendResponse -> do
reqBody <- getHTTP2Body r xftpBlockSize
Expand Down Expand Up @@ -576,7 +576,7 @@ incFileStat statSel = do
saveServerStats :: M ()
saveServerStats =
asks (serverStatsBackupFile . config)
>>= mapM_ (\f -> asks serverStats >>= atomically . getFileServerStatsData >>= liftIO . saveStats f)
>>= mapM_ (\f -> asks serverStats >>= liftIO . getFileServerStatsData >>= liftIO . saveStats f)
where
saveStats f stats = do
logInfo $ "saving server stats to file " <> T.pack f
Expand Down
13 changes: 6 additions & 7 deletions src/Simplex/FileTransfer/Server/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ module Simplex.FileTransfer.Server.Env where

import Control.Logger.Simple
import Control.Monad
import Control.Monad.IO.Unlift
import Crypto.Random
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty)
Expand Down Expand Up @@ -105,17 +104,17 @@ supportedXFTPhandshakes = ["xftp/1"]

newXFTPServerEnv :: XFTPServerConfig -> IO XFTPEnv
newXFTPServerEnv config@XFTPServerConfig {storeLogFile, fileSizeQuota, caCertificateFile, certificateFile, privateKeyFile, transportConfig} = do
random <- liftIO C.newRandom
store <- atomically newFileStore
storeLog <- liftIO $ mapM (`readWriteFileStore` store) storeLogFile
random <- C.newRandom
store <- newFileStore
storeLog <- mapM (`readWriteFileStore` store) storeLogFile
used <- countUsedStorage <$> readTVarIO (files store)
atomically $ writeTVar (usedStorage store) used
forM_ fileSizeQuota $ \quota -> do
logInfo $ "Total / available storage: " <> tshow quota <> " / " <> tshow (quota - used)
when (quota < used) $ logInfo "WARNING: storage quota is less than used storage, no files can be uploaded!"
tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile (alpn transportConfig)
Fingerprint fp <- liftIO $ loadFingerprint caCertificateFile
serverStats <- atomically . newFileServerStats =<< liftIO getCurrentTime
tlsServerParams <- loadTLSServerParams caCertificateFile certificateFile privateKeyFile (alpn transportConfig)
Fingerprint fp <- loadFingerprint caCertificateFile
serverStats <- newFileServerStats =<< getCurrentTime
pure XFTPEnv {config, store, storeLog, random, tlsServerParams, serverIdentity = C.KeyHash fp, serverStats}

countUsedStorage :: M.Map k FileRec -> Int64
Expand Down
44 changes: 22 additions & 22 deletions src/Simplex/FileTransfer/Server/Stats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,34 @@ data FileServerStatsData = FileServerStatsData
}
deriving (Show)

newFileServerStats :: UTCTime -> STM FileServerStats
newFileServerStats :: UTCTime -> IO FileServerStats
newFileServerStats ts = do
fromTime <- newTVar ts
filesCreated <- newTVar 0
fileRecipients <- newTVar 0
filesUploaded <- newTVar 0
filesExpired <- newTVar 0
filesDeleted <- newTVar 0
fromTime <- newTVarIO ts
filesCreated <- newTVarIO 0
fileRecipients <- newTVarIO 0
filesUploaded <- newTVarIO 0
filesExpired <- newTVarIO 0
filesDeleted <- newTVarIO 0
filesDownloaded <- newPeriodStats
fileDownloads <- newTVar 0
fileDownloadAcks <- newTVar 0
filesCount <- newTVar 0
filesSize <- newTVar 0
fileDownloads <- newTVarIO 0
fileDownloadAcks <- newTVarIO 0
filesCount <- newTVarIO 0
filesSize <- newTVarIO 0
pure FileServerStats {fromTime, filesCreated, fileRecipients, filesUploaded, filesExpired, filesDeleted, filesDownloaded, fileDownloads, fileDownloadAcks, filesCount, filesSize}

getFileServerStatsData :: FileServerStats -> STM FileServerStatsData
getFileServerStatsData :: FileServerStats -> IO FileServerStatsData
getFileServerStatsData s = do
_fromTime <- readTVar $ fromTime (s :: FileServerStats)
_filesCreated <- readTVar $ filesCreated s
_fileRecipients <- readTVar $ fileRecipients s
_filesUploaded <- readTVar $ filesUploaded s
_filesExpired <- readTVar $ filesExpired s
_filesDeleted <- readTVar $ filesDeleted s
_fromTime <- readTVarIO $ fromTime (s :: FileServerStats)
_filesCreated <- readTVarIO $ filesCreated s
_fileRecipients <- readTVarIO $ fileRecipients s
_filesUploaded <- readTVarIO $ filesUploaded s
_filesExpired <- readTVarIO $ filesExpired s
_filesDeleted <- readTVarIO $ filesDeleted s
_filesDownloaded <- getPeriodStatsData $ filesDownloaded s
_fileDownloads <- readTVar $ fileDownloads s
_fileDownloadAcks <- readTVar $ fileDownloadAcks s
_filesCount <- readTVar $ filesCount s
_filesSize <- readTVar $ filesSize s
_fileDownloads <- readTVarIO $ fileDownloads s
_fileDownloadAcks <- readTVarIO $ fileDownloadAcks s
_filesCount <- readTVarIO $ filesCount s
_filesSize <- readTVarIO $ filesSize s
pure FileServerStatsData {_fromTime, _filesCreated, _fileRecipients, _filesUploaded, _filesExpired, _filesDeleted, _filesDownloaded, _fileDownloads, _fileDownloadAcks, _filesCount, _filesSize}

setFileServerStats :: FileServerStats -> FileServerStatsData -> STM ()
Expand Down
Loading

0 comments on commit a2b8f29

Please sign in to comment.