diff --git a/src/NServiceBus.SqlServer/TableBasedQueue.cs b/src/NServiceBus.SqlServer/TableBasedQueue.cs index 409ee82ca..770067f62 100644 --- a/src/NServiceBus.SqlServer/TableBasedQueue.cs +++ b/src/NServiceBus.SqlServer/TableBasedQueue.cs @@ -78,7 +78,7 @@ static object[] ExtractTransportMessageData(TransportMessage message, SendOption } else { - data[TimeToBeReceivedColumn] = DateTime.UtcNow.Add(message.TimeToBeReceived); + data[TimeToBeReceivedColumn] = message.TimeToBeReceived.TotalMilliseconds; } data[HeadersColumn] = HeaderSerializer.SerializeObject(message.Headers); if (message.Body == null) @@ -129,14 +129,14 @@ MessageReadResult ExecuteReader(SqlCommand command) { var id = rowData[0].ToString(); - DateTime? expireDateTime = null; + int? millisecondsToExpiry = null; if (rowData[TimeToBeReceivedColumn] != DBNull.Value) { - expireDateTime = (DateTime)rowData[TimeToBeReceivedColumn]; + millisecondsToExpiry = (int)rowData[TimeToBeReceivedColumn]; } //Has message expired? - if (expireDateTime.HasValue && expireDateTime.Value < DateTime.UtcNow) + if (millisecondsToExpiry.HasValue && millisecondsToExpiry.Value < 0L) { Logger.InfoFormat("Message with ID={0} has expired. Removing it from queue.", id); return MessageReadResult.NoMessage; @@ -161,9 +161,9 @@ MessageReadResult ExecuteReader(SqlCommand command) message.Headers[Headers.ReplyToAddress] = replyToAddress; } - if (expireDateTime.HasValue) + if (millisecondsToExpiry.HasValue) { - message.TimeToBeReceived = TimeSpan.FromTicks(expireDateTime.Value.Ticks - DateTime.UtcNow.Ticks); + message.TimeToBeReceived = TimeSpan.FromMilliseconds(millisecondsToExpiry.Value); } return MessageReadResult.Success(message); @@ -195,8 +195,6 @@ public int PurgeBatchOfExpiredMessages(SqlConnection connection, int purgeBatchS using (var command = new SqlCommand(commandText, connection)) { - command.Parameters.Add("UTCNow", SqlDbType.DateTime).Value = DateTime.UtcNow; - return command.ExecuteNonQuery(); } } @@ -228,7 +226,7 @@ public override string ToString() readonly string schema; static readonly JsonMessageSerializer HeaderSerializer = new JsonMessageSerializer(null); - static readonly string[] Parameters = { "Id", "CorrelationId", "ReplyToAddress", "Recoverable", "Expires", "Headers", "Body" }; + static readonly string[] Parameters = { "Id", "CorrelationId", "ReplyToAddress", "Recoverable", "TimeToBeReceivedMs", "Headers", "Body" }; static readonly SqlDbType[] ParameterTypes = { @@ -236,23 +234,23 @@ public override string ToString() SqlDbType.VarChar, SqlDbType.VarChar, SqlDbType.Bit, - SqlDbType.DateTime, + SqlDbType.Int, SqlDbType.VarChar, SqlDbType.VarBinary }; const string SqlSend = @"INSERT INTO [{0}].[{1}] ([Id],[CorrelationId],[ReplyToAddress],[Recoverable],[Expires],[Headers],[Body]) - VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,@Expires,@Headers,@Body)"; + VALUES (@Id,@CorrelationId,@ReplyToAddress,@Recoverable,IIF(@TimeToBeReceivedMs IS NOT NULL, DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()), NULL),@Headers,@Body)"; const string SqlReceive = @"WITH message AS (SELECT TOP(1) * FROM [{0}].[{1}] WITH (UPDLOCK, READPAST, ROWLOCK) ORDER BY [RowVersion] ASC) DELETE FROM message OUTPUT deleted.Id, deleted.CorrelationId, deleted.ReplyToAddress, - deleted.Recoverable, deleted.Expires, deleted.Headers, deleted.Body;"; + deleted.Recoverable, IIF(deleted.Expires IS NOT NULL, DATEDIFF(ms, GETUTCDATE(), deleted.Expires), NULL), deleted.Headers, deleted.Body;"; const string SqlPurgeBatchOfExpiredMessages = - @"DELETE TOP({0}) FROM [{1}].[{2}] WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < @UTCNow"; + @"DELETE TOP({0}) FROM [{1}].[{2}] WITH (UPDLOCK, READPAST, ROWLOCK) WHERE [Expires] < GETUTCDATE()"; const string SqlCheckIfExpiresIndexIsPresent = @"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('[{1}].[{2}]')";