Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Got this working with standard2.1 and the latest mongo/quartz libraries. #43

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,5 @@ paket-files/
__pycache__/
*.pyc
*.DotSettings

tests/data
4 changes: 2 additions & 2 deletions src/Quartz.Spi.MongoDbJobStore/Models/JobDetail.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public IJobDetail GetJobDetail()
// The missing properties are figured out at runtime from the job type attributes
return new JobDetailImpl()
{
Key = new JobKey(Id.Name, Id.Group),
Description = Description,
Name = Id.Name,
Group = Id.Group,
JobType = JobType,
JobDataMap = JobDataMap,
Durable = Durable,
Expand Down
7 changes: 6 additions & 1 deletion src/Quartz.Spi.MongoDbJobStore/MongoDbJobStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,11 @@ public async Task<TriggerState> GetTriggerState(TriggerKey triggerKey,
}
}

public Task ResetTriggerFromErrorState(TriggerKey triggerKey, CancellationToken cancellationToken = new CancellationToken())
{
throw new NotImplementedException();
}

public async Task PauseTrigger(TriggerKey triggerKey, CancellationToken token = default(CancellationToken))
{
try
Expand Down Expand Up @@ -1347,7 +1352,7 @@ await _triggerRepository.UpdateTriggerState(triggerKey, Models.TriggerState.Erro

var operableTrigger = (IOperableTrigger) nextTrigger.GetTrigger();
operableTrigger.FireInstanceId = GetFiredTriggerRecordId();

var firedTrigger = new FiredTrigger(operableTrigger.FireInstanceId, nextTrigger, null)
{
State = Models.TriggerState.Acquired,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net452;net462;netstandard2.0</TargetFrameworks>
<TargetFramework>netstandard2.1</TargetFramework>
<AssemblyName>Quartz.Spi.MongoDbJobStore</AssemblyName>
<IsPackable>true</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MongoDB.Driver" Version="2.4.2" />
<PackageReference Include="Quartz" Version="3.0.4" />
<PackageReference Include="MongoDB.Driver" Version="2.19.1" />
<PackageReference Include="Quartz" Version="3.6.2" />
<PackageReference Include="Common.Logging" Version="3.4.1" />
<PackageReference Update="Microsoft.SourceLink.GitHub" Version="1.1.1" />
<PackageReference Include="Quartz.Serialization.Json" Version="3.6.2" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Driver;
using Quartz.Impl.Matchers;
using Quartz.Spi.MongoDbJobStore.Extensions;
Expand All @@ -24,12 +25,16 @@ public async Task<JobDetail> GetJob(JobKey jobKey)

public async Task<List<JobKey>> GetJobsKeys(GroupMatcher<JobKey> matcher)
{
return
await Collection.Find(FilterBuilder.And(
var items = await Collection.Find(FilterBuilder.And(
FilterBuilder.Eq(detail => detail.Id.InstanceName, InstanceName),
FilterBuilder.Regex(detail => detail.Id.Group, matcher.ToBsonRegularExpression())))
.Project(detail => detail.Id.GetJobKey())
.ToListAsync().ConfigureAwait(false);
.Project(detail => new
{
Name = detail.Id.Name,
Group = detail.Id.Group
})
.ToListAsync().ConfigureAwait(false);
return items.Select(jd => JobKey.Create(jd.Name, jd.Group)).ToList();
}

public async Task<IEnumerable<string>> GetJobGroupNames()
Expand Down
20 changes: 13 additions & 7 deletions src/Quartz.Spi.MongoDbJobStore/Repositories/JobStoreClassMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,43 @@ namespace Quartz.Spi.MongoDbJobStore.Repositories
{
internal static class JobStoreClassMap
{
private static BsonClassMap<TClass> RegisterClassMap<TClass>(
Action<BsonClassMap<TClass>> classMapInitializer)
{
return BsonClassMap.RegisterClassMap<TClass>(classMapInitializer);
}

public static void RegisterClassMaps()
{
BsonSerializer.RegisterGenericSerializerDefinition(typeof (ISet<>), typeof (SetSerializer<>));
BsonSerializer.RegisterSerializer(new JobDataMapSerializer());

BsonClassMap.RegisterClassMap<Key<JobKey>>(map =>
RegisterClassMap<Key<JobKey>>(map =>
{
map.AutoMap();
map.MapProperty(key => key.Group);
map.MapProperty(key => key.Name);
map.AddKnownType(typeof(JobKey));
});
BsonClassMap.RegisterClassMap<Key<TriggerKey>>(map =>
RegisterClassMap<Key<TriggerKey>>(map =>
{
map.AutoMap();
map.MapProperty(key => key.Group);
map.MapProperty(key => key.Name);
map.AddKnownType(typeof(TriggerKey));
});
BsonClassMap.RegisterClassMap<JobKey>(map =>
RegisterClassMap<JobKey>(map =>
{
map.MapCreator(jobKey => new JobKey(jobKey.Name));
map.MapCreator(jobKey => new JobKey(jobKey.Name, jobKey.Group));
});

BsonClassMap.RegisterClassMap<TriggerKey>(map =>
RegisterClassMap<TriggerKey>(map =>
{
map.MapCreator(triggerKey => new TriggerKey(triggerKey.Name));
map.MapCreator(triggerKey => new TriggerKey(triggerKey.Name, triggerKey.Group));
});
BsonClassMap.RegisterClassMap<TimeOfDay>(map =>
RegisterClassMap<TimeOfDay>(map =>
{
map.AutoMap();
map.MapProperty(day => day.Hour);
Expand All @@ -51,13 +57,13 @@ public static void RegisterClassMaps()
map.MapCreator(day => new TimeOfDay(day.Hour, day.Minute));
});

BsonClassMap.RegisterClassMap<JobDetail>(map =>
RegisterClassMap<JobDetail>(map =>
{
map.AutoMap();
map.MapProperty(detail => detail.JobType).SetSerializer(new TypeSerializer());
});

BsonClassMap.RegisterClassMap<DailyTimeIntervalTrigger>(map =>
RegisterClassMap<DailyTimeIntervalTrigger>(map =>
{
map.AutoMap();
var serializer =
Expand Down
37 changes: 28 additions & 9 deletions src/Quartz.Spi.MongoDbJobStore/Repositories/TriggerRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,28 @@ public async Task<List<Trigger>> GetTriggers(JobKey jobKey)

public async Task<List<TriggerKey>> GetTriggerKeys(GroupMatcher<TriggerKey> matcher)
{
return await Collection.Find(FilterBuilder.And(
var items = await Collection.Find(FilterBuilder.And(
FilterBuilder.Eq(trigger => trigger.Id.InstanceName, InstanceName),
FilterBuilder.Regex(trigger => trigger.Id.Group, matcher.ToBsonRegularExpression())))
.Project(trigger => trigger.Id.GetTriggerKey())
.Project(detail => new
{
Name = detail.Id.Name,
Group = detail.Id.Group
})
.ToListAsync().ConfigureAwait(false);
return items.Select(tk => new TriggerKey(tk.Name, tk.Group)).ToList();
}

public async Task<List<TriggerKey>> GetTriggerKeys(Models.TriggerState state)
{
return await Collection.Find(trigger => trigger.Id.InstanceName == InstanceName && trigger.State == state)
.Project(trigger => trigger.Id.GetTriggerKey())
var items = await Collection.Find(trigger => trigger.Id.InstanceName == InstanceName && trigger.State == state)
.Project(detail => new
{
Name = detail.Id.Name,
Group = detail.Id.Group
})
.ToListAsync().ConfigureAwait(false);
return items.Select(tk => new TriggerKey(tk.Name, tk.Group)).ToList();
}

public async Task<List<string>> GetTriggerGroupNames()
Expand Down Expand Up @@ -102,7 +112,7 @@ public async Task<List<TriggerKey>> GetTriggersToAcquire(DateTimeOffset noLaterT
var noLaterThanDateTime = noLaterThan.UtcDateTime;
var noEarlierThanDateTime = noEarlierThan.UtcDateTime;

return await Collection.Find(trigger => trigger.Id.InstanceName == InstanceName &&
var items = await Collection.Find(trigger => trigger.Id.InstanceName == InstanceName &&
trigger.State == Models.TriggerState.Waiting &&
trigger.NextFireTime <= noLaterThanDateTime &&
(trigger.MisfireInstruction == -1 ||
Expand All @@ -113,8 +123,13 @@ public async Task<List<TriggerKey>> GetTriggersToAcquire(DateTimeOffset noLaterT
SortBuilder.Descending(trigger => trigger.Priority)
))
.Limit(maxCount)
.Project(trigger => trigger.Id.GetTriggerKey())
.Project(detail => new
{
Name = detail.Id.Name,
Group = detail.Id.Group
})
.ToListAsync().ConfigureAwait(false);
return items.Select(tk => new TriggerKey(tk.Name, tk.Group)).ToList();
}

public async Task<long> GetCount()
Expand Down Expand Up @@ -236,7 +251,11 @@ public bool HasMisfiredTriggers(DateTime nextFireTime, int maxResults, out List<
trigger.MisfireInstruction != MisfireInstruction.IgnoreMisfirePolicy &&
trigger.NextFireTime < nextFireTime &&
trigger.State == Models.TriggerState.Waiting)
.Project(trigger => trigger.Id.GetTriggerKey())
.Project(detail => new
{
Name = detail.Id.Name,
Group = detail.Id.Group
})
.Sort(SortBuilder.Combine(
SortBuilder.Ascending(trigger => trigger.NextFireTime),
SortBuilder.Descending(trigger => trigger.Priority)
Expand All @@ -247,15 +266,15 @@ public bool HasMisfiredTriggers(DateTime nextFireTime, int maxResults, out List<
var hasReachedLimit = false;
while (cursor.MoveNext() && !hasReachedLimit)
{
foreach (var triggerKey in cursor.Current)
foreach (var tk in cursor.Current)
{
if (results.Count == maxResults)
{
hasReachedLimit = true;
}
else
{
results.Add(triggerKey);
results.Add(new TriggerKey(tk.Name, tk.Group));
}
}
}
Expand Down
38 changes: 29 additions & 9 deletions src/Quartz.Spi.MongoDbJobStore/Serializers/JobDataMapSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,56 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.Serializers;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Quartz.Simpl;

namespace Quartz.Spi.MongoDbJobStore.Serializers
{
internal class JobDataMapSerializer : SerializerBase<JobDataMap>
{
private readonly DefaultObjectSerializer _objectSerializer = new DefaultObjectSerializer();
private readonly JsonSerializerSettings _serializerSettings;

public JobDataMapSerializer()
{
_serializerSettings = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore,
TypeNameHandling = TypeNameHandling.Auto,
TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Simple
};
}


public override void Serialize(BsonSerializationContext context, BsonSerializationArgs args, JobDataMap value)
{
if (value == null)
if (value.Count == 0)
{
context.Writer.WriteNull();
return;
}

var base64 = Convert.ToBase64String(_objectSerializer.Serialize(value));
context.Writer.WriteString(base64);

var json = JsonConvert.SerializeObject(value, _serializerSettings);
var document = BsonDocument.Parse(json);

var serializer = BsonSerializer.LookupSerializer(typeof(BsonDocument));
serializer.Serialize(context, document);
}

public override JobDataMap Deserialize(BsonDeserializationContext context, BsonDeserializationArgs args)
{
if (context.Reader.CurrentBsonType == BsonType.Null)
{
context.Reader.ReadNull();
return null;
return new JobDataMap();
}

var bytes = Convert.FromBase64String(context.Reader.ReadString());
return _objectSerializer.DeSerialize<JobDataMap>(bytes);

var serializer = BsonSerializer.LookupSerializer<BsonDocument>();
var document = serializer.Deserialize(context);
var json = BsonExtensionMethods.ToJson(document);
var result = JsonConvert.DeserializeObject<JobDataMap>(json, _serializerSettings)!;
return result;
}
}
}
15 changes: 1 addition & 14 deletions src/Quartz.Spi.MongoDbJobStore/Util/LogicalThreadContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

using System.Collections.Concurrent;
using System.Security;
using System.Threading;

// Workaround for getting off remoting removed in NET Core: http://www.cazzulino.com/callcontext-netstandard-netcore.html
#if NET452
Expand All @@ -51,17 +52,11 @@ public static class LogicalThreadContext
/// <param name="name">The name of the item.</param>
/// <returns>The object in the call context associated with the specified name or null if no object has been stored previously</returns>

#if NET462 || NETSTANDARD2_0
static ConcurrentDictionary<string, AsyncLocal<object>> state = new ConcurrentDictionary<string, AsyncLocal<object>>();
#endif

public static T GetData<T>(string name)
{
#if NET452
return (T)CallContext.GetData(name);
#elif NET462 || NETSTANDARD2_0
return state.TryGetValue(name, out AsyncLocal<object> data) ? (T)data.Value : default(T);
#endif
}

/// <summary>
Expand All @@ -71,11 +66,7 @@ public static T GetData<T>(string name)
/// <param name="value">The object to store in the call context.</param>
public static void SetData(string name, object value)
{
#if NET452
CallContext.SetData(name, value);
#elif NET462 || NETSTANDARD2_0
state.GetOrAdd(name, _ => new AsyncLocal<object>()).Value = value;
#endif
}

/// <summary>
Expand All @@ -84,11 +75,7 @@ public static void SetData(string name, object value)
/// <param name="name">The name of the data slot to empty.</param>
public static void FreeNamedDataSlot(string name)
{
#if NET452
CallContext.FreeNamedDataSlot(name);
#elif NET462 || NETSTANDARD2_0
state.TryRemove(name, out AsyncLocal<object> discard);
#endif
}
}
}
4 changes: 2 additions & 2 deletions tests/Quartz.Spi.MongoDbJobStore.Tests/BaseStoreTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ public abstract class BaseStoreTests
{
public const string Barrier = "BARRIER";
public const string DateStamps = "DATE_STAMPS";
public static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(125);
public static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(120);

protected async Task<IScheduler> CreateScheduler(string instanceName = "QUARTZ_TEST")
{
var properties = new NameValueCollection
{
["quartz.serializer.type"] = "binary",
["quartz.serializer.type"] = "json",
[StdSchedulerFactory.PropertySchedulerInstanceName] = instanceName,
[StdSchedulerFactory.PropertySchedulerInstanceId] = $"{Environment.MachineName}-{Guid.NewGuid()}",
[StdSchedulerFactory.PropertyJobStoreType] = typeof(MongoDbJobStore).AssemblyQualifiedName,
Expand Down
Loading