Skip to content

III. Task Basics: Scheduling and Job Production

Lucio Tolentino edited this page Oct 31, 2017 · 4 revisions

Kraken comes bundled with a scheduler (built on a library called Rufus, if you want to read more about it). It works similar to the cron system utilized on the old MashStat scripts.

Scheduling Formats

Kraken has a variety of ways for us to specify how to schedule a job. A common format would be to utilize Ruby's time and date methods on numbers. The following are a few examples on how we can schedule a job.

# ... any smaller formats as needed
produce(every: 15.minutes) # run a job every 15 minutes
produce(every: 1.hour) # run a job every 1 hour
produce(every: 2.days) # run a job every 2 days
produce(every: 1.week) # run a job every 1 week
# ... and other day formats, months, years, etc.

We can also schedule jobs with a cron format.

produce(cron: "0 4 * * *") # run a job at 4am every day

Creating a worker and scheduling your jobs

Create a new directory in kraken_root_directory/app/workers called tutorial.

In kraken_root_directory/app/workers/tutorial, create a file called tutorial_worker.rb and add the following:

# kraken_root_directory/app/workers/tutorial/tutorial_worker.rb
module Tutorial
  class TutorialWorker < ::BaseWorker
  end
end

(See the Ruby tutorial for an explanation of modules and namespaces.)

When you create a new worker, you're going to need to tell it how often it should try to collect data. By inheriting the BaseWorker class, you have access to the BaseWorker.produce class method, which is used to define a callback which will be run on a regular period. Kraken automatically takes care of making sure that the schedule is only run once per period, so you don't have to do any special kind of locking or deduplication.

#produce takes two parameters: a hash (dictionary in python) which defines the schedule to run on, and a block (anonymous function / lambda), which is the code to run on that schedule. Its method signature is:

def produce(schedule, &block)

When Kraken boots, it will load all the workers, and then run their produce methods, which configures the scheduler. The scheduler automatically runs in the background and will run the configured callbacks on the configured schedule.

Lets schedule our tutorial worker to run every minute.

# kraken_root_directory/app/workers/tutorial/tutorial_worker.rb
module Tutorial
  class TutorialWorker < ::BaseWorker
    produce({every: 1.minute}) do
      # `puts` is Ruby's equivalent to `print` in Python.
      puts("Hello World")
    end
  end
end

This worker will run every minute and print "Hello World" to stdout. Not very useful, but it's something.

Using the work queue

Now, let's have it actually queue some jobs:

module Tutorial
  class TutorialWorker < ::BaseWorker
    produce({every: 1.minute}) do
      perform_async(Time.now.to_i)
    end
 
    def perform(time)
      puts "This job was queued at #{time}"
    end
  end
end

Rather than doing work directly in produce, we're going to call perform_async with some parameters. We do this because we don't want long-running work running on the scheduler thread - we want to just create tasks that need to be done. Those tasks can run for however long is needed without backing up the rest of the system.

Calling perform_async pushes a job into Kraken's work queue. Kraken maintains a pool of instances for every worker, and idle workers wait for jobs to become available for processing. Kraken will pop jobs off the queue and hand them off to the next waiting worker. When there are no idle workers, the queue will fill up with work to be done. Once the workers complete their current task, they'll receive another one from Kraken.

Kraken handles removing jobs from the work queue and giving them to waiting workers by calling the worker's produce method with the arguments used when the job was queued with perform_async. So, our worker above should expect to have its perform method invoked with an integer representation of the time when the job was queued.

NOTE: The payloads received by perform go through JSON serialization/deserialization - passing raw Ruby objects won't work. In general, you should be passing strings, numerics, arrays, and hashes - all stuff that serializes and deserializes cleanly.

Our worker will run, print a message to stdout about when the job was queued, and exit.