Class: Rufus::Scheduler
The Scheduler is used by OpenWFEru for registering ‘at’ and ‘cron’ jobs. ‘at’ jobs to execute once at a given point in time. ‘cron’ jobs execute a specified intervals. The two main methods are thus schedule_at() and schedule().
schedule_at() and schedule() await either a Schedulable instance and params (usually an array or nil), either a block, which is more in the Ruby way.
The gem "rufus-scheduler"
This scheduler was previously known as the "openwferu-scheduler" gem.
To ensure that code tapping the previous gem still runs fine with "rufus-scheduler", this new gem has ‘pointers’ for the old class names.
require 'rubygems' require 'openwfe/util/scheduler' s = OpenWFE::Scheduler.new
will still run OK with "rufus-scheduler".
Examples
require 'rubygems'
require 'rufus/scheduler'
scheduler = Rufus::Scheduler.start_new
scheduler.schedule_in("3d") do
regenerate_monthly_report()
end
#
# will call the regenerate_monthly_report method
# in 3 days from now
scheduler.schedule "0 22 * * 1-5" do
log.info "activating security system..."
activate_security_system()
end
job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do
init_self_destruction_sequence()
end
an example that uses a Schedulable class :
class Regenerator < Schedulable
def trigger (frequency)
self.send(frequency)
end
def monthly
# ...
end
def yearly
# ...
end
end
regenerator = Regenerator.new
scheduler.schedule_in("4d", regenerator)
#
# will regenerate the report in four days
scheduler.schedule_in(
"5d",
{ :schedulable => regenerator, :scope => :month })
#
# will regenerate the monthly report in 5 days
There is also schedule_every() :
scheduler.schedule_every("1h20m") do
regenerate_latest_report()
end
(note : a schedule every isn‘t triggered immediately, thus this example will first trigger 1 hour and 20 minutes after being scheduled)
The scheduler has a "exit_when_no_more_jobs" attribute. When set to ‘true’, the scheduler will exit as soon as there are no more jobs to run. Use with care though, if you create a scheduler, set this attribute to true and start the scheduler, the scheduler will immediately exit. This attribute is best used indirectly : the method join_until_no_more_jobs() wraps it.
The :scheduler_precision can be set when instantiating the scheduler.
scheduler = Rufus::Scheduler.new(:scheduler_precision => 0.500)
scheduler.start
#
# instatiates a scheduler that checks its jobs twice per second
# (the default is 4 times per second (0.250))
Note that rufus-scheduler places a constraint on the values for the precision : 0.0 < p <= 1.0 Thus
scheduler.precision = 4.0
or
scheduler = Rufus::Scheduler.new :scheduler_precision => 5.0
will raise an exception.
Tags
Tags can be attached to jobs scheduled :
scheduler.schedule_in "2h", :tags => "backup" do
init_backup_sequence()
end
scheduler.schedule "0 24 * * *", :tags => "new_day" do
do_this_or_that()
end
jobs = find_jobs 'backup'
jobs.each { |job| job.unschedule }
Multiple tags may be attached to a single job :
scheduler.schedule_in "2h", :tags => [ "backup", "important" ] do
init_backup_sequence()
end
The vanilla case for tags assume they are String instances, but nothing prevents you from using anything else. The scheduler has no persistence by itself, so no serialization issue.
Cron up to the second
A cron schedule can be set at the second level :
scheduler.schedule "7 * * * * *" do
puts "it's now the seventh second of the minute"
end
The rufus scheduler recognizes an optional first column for second scheduling. This column can, like for the other columns, specify a value ("7"), a list of values ("7,8,9,27") or a range ("7-12").
information passed to schedule blocks
When calling schedule_every(), schedule_in() or schedule_at(), the block expects zero or 3 parameters like in
scheduler.schedule_every("1h20m") do |job_id, at, params|
puts "my job_id is #{job_id}"
end
For schedule(), zero or two parameters can get passed
scheduler.schedule "7 * * * * *" do |job_id, cron_line, params|
puts "my job_id is #{job_id}"
end
In both cases, params corresponds to the params passed to the schedule method (:tags, :first_at, :first_in, :dont_reschedule, …)
Exceptions
The rufus scheduler will output a stacktrace to the STDOUT in case of exception. There are two ways to change that behaviour.
# 1 - providing a lwarn method to the scheduler instance :
class << scheduler
def lwarn (&block)
puts "oops, something wrong happened : "
puts block.call
end
end
# 2 - overriding the [protected] method log_exception(e) :
class << scheduler
def log_exception (e)
puts "something wrong happened : "+e.to_s
end
end
‘Every jobs’ and rescheduling
Every jobs can reschedule/unschedule themselves. A reschedule example :
schedule.schedule_every "5h" do |job_id, at, params|
mails = $inbox.fetch_mails
mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }
params[:every] = if mails.size > 100
"1h" # lots of spam, check every hour
else
"5h" # normal schedule, every 5 hours
end
end
Unschedule example :
schedule.schedule_every "10s" do |job_id, at, params|
#
# polls every 10 seconds until a mail arrives
$mail = $inbox.fetch_last_mail
params[:dont_reschedule] = true if $mail
end
‘Every jobs’, :first_at and :first_in
Since rufus-scheduler 1.0.2, the schedule_every methods recognizes two optional parameters, :first_at and :first_in
scheduler.schedule_every "2d", :first_in => "5h" do
# schedule something every two days, start in 5 hours...
end
scheduler.schedule_every "2d", :first_at => "5h" do
# schedule something every two days, start in 5 hours...
end
job.next_time()
Jobs, be they at, every or cron have a next_time() method, which tells when the job will be fired next time (for at and in jobs, this is also the last time).
For cron jobs, the current implementation is quite brutal. It takes three seconds on my 2006 macbook to reach a cron schedule 1 year away.
When is the next friday 13th ?
require 'rubygems'
require 'rufus/scheduler'
puts Rufus::CronLine.new("* * 13 * fri").next_time
:thread_name option
You can specify the name of the scheduler‘s thread. Should make it easier in some debugging situations.
scheduler.new :thread_name => "the crazy scheduler"
job.trigger_thread
Since rufus-scheduler 1.0.8, you can have access to the thread of a job currently being triggered.
job = scheduler.get_job(job_id) thread = job.trigger_thread
This new method will return nil if the job is not currently being triggered. Not that in case of an every or cron job, this method will return the thread of the last triggered instance, thus, in case of overlapping executions, you only get the most recent thread.
Attributes
| Name | Read/write? |
|---|---|
| precision | R |
| stopped | RW |
Public Class Methods
is_cron_string (s)
Returns true if the given string seems to be a cron string.
# File lib/rufus/scheduler.rb, line 721 721: def self.is_cron_string (s) 722: 723: s.match ".+ .+ .+ .+ .+" # well... 724: end
new (params={})
# File lib/rufus/scheduler.rb, line 338 338: def initialize (params={}) 339: 340: super() 341: 342: @pending_jobs = [] 343: @cron_jobs = {} 344: @non_cron_jobs = {} 345: 346: @schedule_queue = Queue.new 347: @unschedule_queue = Queue.new 348: # 349: # sync between the step() method and the [un]schedule 350: # methods is done via these queues, no more mutex 351: 352: @scheduler_thread = nil 353: 354: @precision = 0.250 355: # every 250ms, the scheduler wakes up (default value) 356: begin 357: self.precision = Float(params[:scheduler_precision]) 358: rescue Exception => e 359: # let precision at its default value 360: end 361: 362: @thread_name = params[:thread_name] || "rufus scheduler" 363: 364: #@correction = 0.00045 365: 366: @exit_when_no_more_jobs = false 367: #@dont_reschedule_every = false 368: 369: @last_cron_second = -1 370: 371: @stopped = true 372: end
start_new ()
Instantiates a new Rufus::Scheduler instance, starts it and returns it
# File lib/rufus/scheduler.rb, line 410 410: def self.start_new 411: 412: s = self.new 413: s.start 414: s 415: end
Public Instance Methods
at (at, params={}, &block)
Alias for schedule_at
at_job_count ()
Returns the current count of ‘at’ jobs scheduled (not ‘every’).
# File lib/rufus/scheduler.rb, line 713 713: def at_job_count 714: 715: @non_cron_jobs.values.select { |j| j.class == AtJob }.size 716: end
cron (cron_line, params={}, &block)
Alias for schedule
cron_job_count ()
Returns the number of cron jobs currently active in this scheduler.
# File lib/rufus/scheduler.rb, line 697 697: def cron_job_count 698: 699: @cron_jobs.size 700: end
every_job_count ()
Returns the current count of ‘every’ jobs scheduled.
# File lib/rufus/scheduler.rb, line 705 705: def every_job_count 706: 707: @non_cron_jobs.values.select { |j| j.class == EveryJob }.size 708: end
find_jobs (tag)
Returns an array of jobs that have the given tag.
# File lib/rufus/scheduler.rb, line 668 668: def find_jobs (tag) 669: 670: @cron_jobs.values.find_all { |job| job.has_tag?(tag) } + 671: @non_cron_jobs.values.find_all { |job| job.has_tag?(tag) } 672: end
find_schedulables (tag)
Finds the jobs with the given tag and then returns an array of the wrapped Schedulable objects. Jobs that haven‘t a wrapped Schedulable won‘t be included in the result.
# File lib/rufus/scheduler.rb, line 680 680: def find_schedulables (tag) 681: 682: find_jobs(tag).find_all { |job| job.respond_to?(:schedulable) } 683: end
get_job (job_id)
Returns the job corresponding to job_id, an instance of AtJob or CronJob will be returned.
# File lib/rufus/scheduler.rb, line 650 650: def get_job (job_id) 651: 652: @cron_jobs[job_id] || @non_cron_jobs[job_id] 653: end
get_schedulable (job_id)
Finds a job (via get_job()) and then returns the wrapped schedulable if any.
# File lib/rufus/scheduler.rb, line 659 659: def get_schedulable (job_id) 660: 661: j = get_job(job_id) 662: j.respond_to?(:schedulable) ? j.schedulable : nil 663: end
in (duration, params={}, &block)
Alias for schedule_in
join ()
Joins on the scheduler thread
# File lib/rufus/scheduler.rb, line 436 436: def join 437: 438: @scheduler_thread.join 439: end
join_until_no_more_jobs ()
Like join() but takes care of setting the ‘exit_when_no_more_jobs’ attribute of this scheduler to true before joining. Thus the scheduler will exit (and the join terminates) as soon as there aren‘t no more ‘at’ (or ‘every’) jobs in the scheduler.
Currently used only in unit tests.
# File lib/rufus/scheduler.rb, line 449 449: def join_until_no_more_jobs 450: 451: @exit_when_no_more_jobs = true 452: join 453: end
pending_job_count ()
Returns the number of currently pending jobs in this scheduler (‘at’ jobs and ‘every’ jobs).
# File lib/rufus/scheduler.rb, line 689 689: def pending_job_count 690: 691: @pending_jobs.size 692: end
precision= (f)
Setting the precision ( 0.0 < p <= 1.0 )
# File lib/rufus/scheduler.rb, line 318 318: def precision= (f) 319: 320: raise "precision must be 0.0 < p <= 1.0" \ 321: if f <= 0.0 or f > 1.0 322: 323: @precision = f 324: end
schedule (cron_line, params={}, &block)
Schedules a cron job, the ‘cron_line’ is a string following the Unix cron standard (see "man 5 crontab" in your command line, or www.google.com/search?q=man%205%20crontab).
For example :
scheduler.schedule("5 0 * * *", s)
# will trigger the schedulable s every day
# five minutes after midnight
scheduler.schedule("15 14 1 * *", s)
# will trigger s at 14:15 on the first of every month
scheduler.schedule("0 22 * * 1-5") do
puts "it's break time..."
end
# outputs a message every weekday at 10pm
Returns the job id attributed to this ‘cron job’, this id can be used to unschedule the job.
This method returns a job identifier which can be used to unschedule() the job.
# File lib/rufus/scheduler.rb, line 588 588: def schedule (cron_line, params={}, &block) 589: 590: params = prepare_params(params) 591: 592: # 593: # is a job with the same id already scheduled ? 594: 595: cron_id = params[:cron_id] || params[:job_id] 596: 597: #@unschedule_queue << cron_id 598: 599: # 600: # schedule 601: 602: b = to_block(params, &block) 603: job = CronJob.new(self, cron_id, cron_line, params, &b) 604: 605: @schedule_queue << job 606: 607: job.job_id 608: end
schedule_at (at, params={}, &block)
Schedules a job by specifying at which time it should trigger. Returns the a job_id that can be used to unschedule the job.
This method returns a job identifier which can be used to unschedule() the job.
If the job is specified in the past, it will be triggered immediately but not scheduled. To avoid the triggering, the parameter :discard_past may be set to true :
jobid = scheduler.schedule_at(yesterday, :discard_past => true) do
puts "you'll never read this message"
end
And ‘jobid’ will hold a nil (not scheduled).
# File lib/rufus/scheduler.rb, line 480 480: def schedule_at (at, params={}, &block) 481: 482: do_schedule_at( 483: at, 484: prepare_params(params), 485: &block) 486: end
schedule_every (freq, params={}, &block)
Schedules a job in a loop. After an execution, it will not execute before the time specified in ‘freq’.
This method returns a job identifier which can be used to unschedule() the job.
In case of exception in the job, it will be rescheduled. If you don‘t want the job to be rescheduled, set the parameter :try_again to false.
scheduler.schedule_every "500", :try_again => false do
do_some_prone_to_error_stuff()
# won't get rescheduled in case of exception
end
Since rufus-scheduler 1.0.2, the params :first_at and :first_in are accepted.
scheduler.schedule_every "2d", :first_in => "5h" do
# schedule something every two days, start in 5 hours...
end
(without setting a :first_in (or :first_at), our example schedule would have had been triggered after two days).
# File lib/rufus/scheduler.rb, line 539 539: def schedule_every (freq, params={}, &block) 540: 541: params = prepare_params params 542: params[:every] = freq 543: 544: first_at = params[:first_at] 545: first_in = params[:first_in] 546: 547: first_at = if first_at 548: at_to_f(first_at) 549: elsif first_in 550: Time.now.to_f + Rufus.duration_to_f(first_in) 551: else 552: Time.now.to_f + Rufus.duration_to_f(freq) # not triggering immediately 553: end 554: 555: do_schedule_at(first_at, params, &block) 556: end
schedule_in (duration, params={}, &block)
Schedules a job by stating in how much time it should trigger. Returns the a job_id that can be used to unschedule the job.
This method returns a job identifier which can be used to unschedule() the job.
# File lib/rufus/scheduler.rb, line 501 501: def schedule_in (duration, params={}, &block) 502: 503: do_schedule_at( 504: Time.new.to_f + Rufus::duration_to_f(duration), 505: prepare_params(params), 506: &block) 507: end
start ()
Starts this scheduler (or restart it if it was previously stopped)
# File lib/rufus/scheduler.rb, line 377 377: def start 378: 379: @stopped = false 380: 381: @scheduler_thread = Thread.new do 382: 383: Thread.current[:name] = @thread_name 384: 385: if defined?(JRUBY_VERSION) 386: require 'java' 387: java.lang.Thread.current_thread.name = @thread_name 388: end 389: 390: loop do 391: 392: break if @stopped 393: 394: t0 = Time.now.to_f 395: 396: step 397: 398: d = Time.now.to_f - t0 # + @correction 399: 400: next if d > @precision 401: 402: sleep (@precision - d) 403: end 404: end 405: end
stop ()
The scheduler is stoppable via sstop()
# File lib/rufus/scheduler.rb, line 420 420: def stop 421: 422: @stopped = true 423: end
unschedule (job_id)
Unschedules an ‘at’ or a ‘cron’ job identified by the id it was given at schedule time.
# File lib/rufus/scheduler.rb, line 625 625: def unschedule (job_id) 626: 627: @unschedule_queue << job_id 628: end
unschedule_cron_job (job_id)
Unschedules a cron job
(deprecated : use unschedule(job_id) for all the jobs !)
# File lib/rufus/scheduler.rb, line 635 635: def unschedule_cron_job (job_id) 636: 637: unschedule(job_id) 638: end