Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

every() funcs final implementation #98

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions Sources/Queues/Application+Queues.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ extension Application {
public var queues: Queues {
.init(application: self)
}

/// Represents a `Queues` configuration object
public struct Queues {

/// The provider of the `Queues` configuration
public struct Provider {
let run: (Application) -> ()
Expand All @@ -31,7 +31,7 @@ extension Application {
self.commands = [command]
application.commands.use(command, as: "queues")
}

public func add(command: QueuesCommand) {
self.commands.append(command)
}
Expand Down Expand Up @@ -98,7 +98,7 @@ extension Application {
)
)
}

/// Adds a new queued job
/// - Parameter job: The job to add
public func add<J>(_ job: J) where J: Job {
Expand All @@ -125,12 +125,12 @@ extension Application {

/// Schedule a new job
/// - Parameter job: The job to schedule
public func schedule<J>(_ job: J) -> ScheduleBuilder
where J: ScheduledJob
public func schedule<J>(_ job: J) -> ScheduleContainer
where J: ScheduledJob
{
let builder = ScheduleBuilder()
_ = self.storage.configuration.schedule(job, builder: builder)
return builder
let container = ScheduleContainer(job: job)
self.storage.configuration.schedule(container: container)
return container
}

/// Starts an in-process worker to dequeue and run jobs
Expand All @@ -140,14 +140,14 @@ extension Application {
try inProcessJobs.startJobs(on: queue)
self.storage.add(command: inProcessJobs)
}

/// Starts an in-process worker to run scheduled jobs
public func startScheduledJobs() throws {
let scheduledJobs = QueuesCommand(application: application, scheduled: true)
try scheduledJobs.startScheduledJobs()
self.storage.add(command: scheduledJobs)
}

func initialize() {
self.application.lifecycle.use(Lifecycle())
self.application.storage[Key.self] = .init(application)
Expand Down
16 changes: 8 additions & 8 deletions Sources/Queues/Job.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Vapor
public protocol Job: AnyJob {
/// The data associated with a job
associatedtype Payload

/// Called when it's this Job's turn to be dequeued.
/// - Parameters:
/// - context: The JobContext. Can be used to store and retrieve services
Expand All @@ -15,7 +15,7 @@ public protocol Job: AnyJob {
_ context: QueueContext,
_ payload: Payload
) -> EventLoopFuture<Void>

/// Called when there is an error at any stage of the Job's execution.
/// - Parameters:
/// - context: The JobContext. Can be used to store and retrieve services
Expand All @@ -26,19 +26,19 @@ public protocol Job: AnyJob {
_ error: Error,
_ payload: Payload
) -> EventLoopFuture<Void>

static func serializePayload(_ payload: Payload) throws -> [UInt8]
static func parsePayload(_ bytes: [UInt8]) throws -> Payload
}

extension Job where Payload: Codable {

/// Serialize a payload into Data
/// - Parameter payload: The payload
public static func serializePayload(_ payload: Payload) throws -> [UInt8] {
try .init(JSONEncoder().encode(payload))
}

/// Parse bytes into the payload
/// - Parameter bytes: The Payload
public static func parsePayload(_ bytes: [UInt8]) throws -> Payload {
Expand All @@ -51,7 +51,7 @@ extension Job {
public static var name: String {
return String(describing: Self.self)
}

/// See `Job`.`error`
public func error(
_ context: QueueContext,
Expand All @@ -60,7 +60,7 @@ extension Job {
) -> EventLoopFuture<Void> {
context.eventLoop.makeSucceededFuture(())
}

public func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) -> EventLoopFuture<Void> {
var contextCopy = context
contextCopy.logger[metadataKey: "job_id"] = .string(id)
Expand All @@ -70,7 +70,7 @@ extension Job {
return context.eventLoop.makeFailedFuture(error)
}
}

public func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) -> EventLoopFuture<Void> {
var contextCopy = context
contextCopy.logger[metadataKey: "job_id"] = .string(id)
Expand Down
10 changes: 5 additions & 5 deletions Sources/Queues/JobData.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
public struct JobData: Codable {
/// The job data to be encoded.
public let payload: [UInt8]

/// The maxRetryCount for the `Job`.
public let maxRetryCount: Int

/// A date to execute this job after
public let delayUntil: Date?

/// The date this job was queued
public let queuedAt: Date

/// The name of the `Job`
public let jobName: String

/// Creates a new `JobStorage` holding object
public init(
payload: [UInt8],
Expand Down
6 changes: 3 additions & 3 deletions Sources/Queues/JobIdentifier.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ import struct Foundation.UUID

/// An identifier for a job
public struct JobIdentifier: Hashable, Equatable {

/// The string value of the ID
public let string: String

/// Creates a new id from a string
public init(string: String) {
self.string = string
}

/// Creates a new id with a default UUID value
public init() {
self.init(string: UUID().uuidString)
Expand Down
20 changes: 10 additions & 10 deletions Sources/Queues/Queue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@
public protocol Queue {
/// The job context
var context: QueueContext { get }

/// Gets the next job to be run
/// - Parameter id: The ID of the job
func get(_ id: JobIdentifier) -> EventLoopFuture<JobData>

/// Sets a job that should be run in the future
/// - Parameters:
/// - id: The ID of the job
/// - data: Data for the job
func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture<Void>

/// Removes a job from the queue
/// - Parameter id: The ID of the job
func clear(_ id: JobIdentifier) -> EventLoopFuture<Void>

/// Pops the next job in the queue
func pop() -> EventLoopFuture<JobIdentifier?>

/// Pushes the next job into a queue
/// - Parameter id: The ID of the job
func push(_ id: JobIdentifier) -> EventLoopFuture<Void>
Expand All @@ -30,27 +30,27 @@ extension Queue {
public var eventLoop: EventLoop {
self.context.eventLoop
}

/// A logger
public var logger: Logger {
self.context.logger
}

/// The configuration for the queue
public var configuration: QueuesConfiguration {
self.context.configuration
}

/// The queue's name
public var queueName: QueueName {
self.context.queueName
}

/// The key name of the queue
public var key: String {
self.queueName.makeKey(with: self.configuration.persistenceKey)
}

/// Dispatch a job into the queue for processing
/// - Parameters:
/// - job: The Job type
Expand All @@ -64,7 +64,7 @@ extension Queue {
delayUntil: Date? = nil,
id: JobIdentifier = JobIdentifier()
) -> EventLoopFuture<Void>
where J: Job
where J: Job
{
let bytes: [UInt8]
do {
Expand Down
10 changes: 5 additions & 5 deletions Sources/Queues/QueueContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import Vapor
public struct QueueContext {
/// The name of the queue
public let queueName: QueueName

/// The configuration object
public let configuration: QueuesConfiguration

/// The application object
public let application: Application

/// The logger object
public var logger: Logger

/// An event loop to run the process on
public let eventLoop: EventLoop

/// Creates a new JobContext
/// - Parameters:
/// - queueName: The name of the queue
Expand Down
6 changes: 3 additions & 3 deletions Sources/Queues/QueueName.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ public struct QueueName {

/// Makes the name of the queue
///
/// - Parameter persistanceKey: The base persistence key
/// - Parameter persistenceKey: The base persistence key
/// - Returns: A string of the queue's fully qualified name
public func makeKey(with persistanceKey: String) -> String {
return persistanceKey + "[\(self.string)]"
public func makeKey(with persistenceKey: String) -> String {
return persistenceKey + "[\(self.string)]"
}
}
2 changes: 1 addition & 1 deletion Sources/Queues/QueueWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public struct QueueWorker {
init(queue: Queue) {
self.queue = queue
}

/// Logic to run the queue
public func run() -> EventLoopFuture<Void> {
queue.logger.trace("Popping job from queue")
Expand Down
Loading