Class: Rufus::Scheduler

The Scheduler is used by OpenWFEru for registering ‘at’ and ‘cron’ jobs. ‘at’ jobs to execute once at a given point in time. ‘cron’ jobs execute a specified intervals. The two main methods are thus schedule_at() and schedule().

schedule_at() and schedule() await either a Schedulable instance and params (usually an array or nil), either a block, which is more in the Ruby way.

The gem "rufus-scheduler"

This scheduler was previously known as the "openwferu-scheduler" gem.

To ensure that code tapping the previous gem still runs fine with "rufus-scheduler", this new gem has ‘pointers’ for the old class names.

 require 'rubygems'
 require 'openwfe/util/scheduler'
 s = OpenWFE::Scheduler.new

will still run OK with "rufus-scheduler".

Examples

 require 'rubygems'
 require 'rufus/scheduler'

 scheduler = Rufus::Scheduler.start_new

 scheduler.schedule_in("3d") do
   regenerate_monthly_report()
 end
   #
   # will call the regenerate_monthly_report method
   # in 3 days from now

  scheduler.schedule "0 22 * * 1-5" do
    log.info "activating security system..."
    activate_security_system()
  end

  job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do
    init_self_destruction_sequence()
  end

an example that uses a Schedulable class :

 class Regenerator < Schedulable
   def trigger (frequency)
     self.send(frequency)
   end
   def monthly
     # ...
   end
   def yearly
     # ...
   end
 end

 regenerator = Regenerator.new

 scheduler.schedule_in("4d", regenerator)
   #
   # will regenerate the report in four days

 scheduler.schedule_in(
   "5d",
   { :schedulable => regenerator, :scope => :month })
     #
     # will regenerate the monthly report in 5 days

There is also schedule_every() :

  scheduler.schedule_every("1h20m") do
    regenerate_latest_report()
  end

(note : a schedule every isn‘t triggered immediately, thus this example will first trigger 1 hour and 20 minutes after being scheduled)

The scheduler has a "exit_when_no_more_jobs" attribute. When set to ‘true’, the scheduler will exit as soon as there are no more jobs to run. Use with care though, if you create a scheduler, set this attribute to true and start the scheduler, the scheduler will immediately exit. This attribute is best used indirectly : the method join_until_no_more_jobs() wraps it.

The :scheduler_precision can be set when instantiating the scheduler.

  scheduler = Rufus::Scheduler.new(:scheduler_precision => 0.500)
  scheduler.start
    #
    # instatiates a scheduler that checks its jobs twice per second
    # (the default is 4 times per second (0.250))

Note that rufus-scheduler places a constraint on the values for the precision : 0.0 < p <= 1.0 Thus

  scheduler.precision = 4.0

or

  scheduler = Rufus::Scheduler.new :scheduler_precision => 5.0

will raise an exception.

Tags

Tags can be attached to jobs scheduled :

  scheduler.schedule_in "2h", :tags => "backup" do
    init_backup_sequence()
  end

  scheduler.schedule "0 24 * * *", :tags => "new_day" do
    do_this_or_that()
  end

  jobs = find_jobs 'backup'
  jobs.each { |job| job.unschedule }

Multiple tags may be attached to a single job :

  scheduler.schedule_in "2h", :tags => [ "backup", "important" ]  do
    init_backup_sequence()
  end

The vanilla case for tags assume they are String instances, but nothing prevents you from using anything else. The scheduler has no persistence by itself, so no serialization issue.

Cron up to the second

A cron schedule can be set at the second level :

  scheduler.schedule "7 * * * * *" do
    puts "it's now the seventh second of the minute"
  end

The rufus scheduler recognizes an optional first column for second scheduling. This column can, like for the other columns, specify a value ("7"), a list of values ("7,8,9,27") or a range ("7-12").

information passed to schedule blocks

When calling schedule_every(), schedule_in() or schedule_at(), the block expects zero or 3 parameters like in

  scheduler.schedule_every("1h20m") do |job_id, at, params|
      puts "my job_id is #{job_id}"
  end

For schedule(), zero or two parameters can get passed

  scheduler.schedule "7 * * * * *" do |job_id, cron_line, params|
    puts "my job_id is #{job_id}"
  end

In both cases, params corresponds to the params passed to the schedule method (:tags, :first_at, :first_in, :dont_reschedule, …)

Exceptions

The rufus scheduler will output a stacktrace to the STDOUT in case of exception. There are two ways to change that behaviour.

  # 1 - providing a lwarn method to the scheduler instance :

  class << scheduler
    def lwarn (&block)
      puts "oops, something wrong happened : "
      puts block.call
    end
  end

  # 2 - overriding the [protected] method log_exception(e) :

  class << scheduler
    def log_exception (e)
      puts "something wrong happened : "+e.to_s
    end
  end

‘Every jobs’ and rescheduling

Every jobs can reschedule/unschedule themselves. A reschedule example :

  schedule.schedule_every "5h" do |job_id, at, params|

    mails = $inbox.fetch_mails
    mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }

    params[:every] = if mails.size > 100
      "1h" # lots of spam, check every hour
    else
      "5h" # normal schedule, every 5 hours
    end
  end

Unschedule example :

  schedule.schedule_every "10s" do |job_id, at, params|
    #
    # polls every 10 seconds until a mail arrives

    $mail = $inbox.fetch_last_mail

    params[:dont_reschedule] = true if $mail
  end

‘Every jobs’, :first_at and :first_in

Since rufus-scheduler 1.0.2, the schedule_every methods recognizes two optional parameters, :first_at and :first_in

  scheduler.schedule_every "2d", :first_in => "5h" do
    # schedule something every two days, start in 5 hours...
  end

  scheduler.schedule_every "2d", :first_at => "5h" do
    # schedule something every two days, start in 5 hours...
  end

job.next_time()

Jobs, be they at, every or cron have a next_time() method, which tells when the job will be fired next time (for at and in jobs, this is also the last time).

For cron jobs, the current implementation is quite brutal. It takes three seconds on my 2006 macbook to reach a cron schedule 1 year away.

When is the next friday 13th ?

  require 'rubygems'
  require 'rufus/scheduler'

  puts Rufus::CronLine.new("* * 13 * fri").next_time

:thread_name option

You can specify the name of the scheduler‘s thread. Should make it easier in some debugging situations.

  scheduler.new :thread_name => "the crazy scheduler"

job.trigger_thread

Since rufus-scheduler 1.0.8, you can have access to the thread of a job currently being triggered.

  job = scheduler.get_job(job_id)
  thread = job.trigger_thread

This new method will return nil if the job is not currently being triggered. Not that in case of an every or cron job, this method will return the thread of the last triggered instance, thus, in case of overlapping executions, you only get the most recent thread.

Attributes

NameRead/write?
precision R
stopped RW

Public Class Methods


is_cron_string (s)

Returns true if the given string seems to be a cron string.

     # File lib/rufus/scheduler.rb, line 721
721:     def self.is_cron_string (s)
722: 
723:       s.match ".+ .+ .+ .+ .+" # well...
724:     end

new (params={})

     # File lib/rufus/scheduler.rb, line 338
338:     def initialize (params={})
339: 
340:       super()
341: 
342:       @pending_jobs = []
343:       @cron_jobs = {}
344:       @non_cron_jobs = {}
345: 
346:       @schedule_queue = Queue.new
347:       @unschedule_queue = Queue.new
348:         #
349:         # sync between the step() method and the [un]schedule
350:         # methods is done via these queues, no more mutex
351: 
352:       @scheduler_thread = nil
353: 
354:       @precision = 0.250
355:         # every 250ms, the scheduler wakes up (default value)
356:       begin
357:         self.precision = Float(params[:scheduler_precision])
358:       rescue Exception => e
359:         # let precision at its default value
360:       end
361: 
362:       @thread_name = params[:thread_name] || "rufus scheduler"
363: 
364:       #@correction = 0.00045
365: 
366:       @exit_when_no_more_jobs = false
367:       #@dont_reschedule_every = false
368: 
369:       @last_cron_second = -1
370: 
371:       @stopped = true
372:     end

start_new ()

Instantiates a new Rufus::Scheduler instance, starts it and returns it

     # File lib/rufus/scheduler.rb, line 410
410:     def self.start_new
411: 
412:       s = self.new
413:       s.start
414:       s
415:     end

Public Instance Methods



at_job_count ()

Returns the current count of ‘at’ jobs scheduled (not ‘every’).

     # File lib/rufus/scheduler.rb, line 713
713:     def at_job_count
714: 
715:       @non_cron_jobs.values.select { |j| j.class == AtJob }.size
716:     end


cron_job_count ()

Returns the number of cron jobs currently active in this scheduler.

     # File lib/rufus/scheduler.rb, line 697
697:     def cron_job_count
698: 
699:       @cron_jobs.size
700:     end


every_job_count ()

Returns the current count of ‘every’ jobs scheduled.

     # File lib/rufus/scheduler.rb, line 705
705:     def every_job_count
706: 
707:       @non_cron_jobs.values.select { |j| j.class == EveryJob }.size
708:     end

find_jobs (tag)

Returns an array of jobs that have the given tag.

     # File lib/rufus/scheduler.rb, line 668
668:     def find_jobs (tag)
669: 
670:       @cron_jobs.values.find_all { |job| job.has_tag?(tag) } +
671:       @non_cron_jobs.values.find_all { |job| job.has_tag?(tag) }
672:     end

find_schedulables (tag)

Finds the jobs with the given tag and then returns an array of the wrapped Schedulable objects. Jobs that haven‘t a wrapped Schedulable won‘t be included in the result.

     # File lib/rufus/scheduler.rb, line 680
680:     def find_schedulables (tag)
681: 
682:       find_jobs(tag).find_all { |job| job.respond_to?(:schedulable) }
683:     end

get_job (job_id)

Returns the job corresponding to job_id, an instance of AtJob or CronJob will be returned.

     # File lib/rufus/scheduler.rb, line 650
650:     def get_job (job_id)
651: 
652:       @cron_jobs[job_id] || @non_cron_jobs[job_id]
653:     end

get_schedulable (job_id)

Finds a job (via get_job()) and then returns the wrapped schedulable if any.

     # File lib/rufus/scheduler.rb, line 659
659:     def get_schedulable (job_id)
660: 
661:       j = get_job(job_id)
662:       j.respond_to?(:schedulable) ? j.schedulable : nil
663:     end


join ()

Joins on the scheduler thread

     # File lib/rufus/scheduler.rb, line 436
436:     def join
437: 
438:       @scheduler_thread.join
439:     end

join_until_no_more_jobs ()

Like join() but takes care of setting the ‘exit_when_no_more_jobs’ attribute of this scheduler to true before joining. Thus the scheduler will exit (and the join terminates) as soon as there aren‘t no more ‘at’ (or ‘every’) jobs in the scheduler.

Currently used only in unit tests.

     # File lib/rufus/scheduler.rb, line 449
449:     def join_until_no_more_jobs
450: 
451:       @exit_when_no_more_jobs = true
452:       join
453:     end

pending_job_count ()

Returns the number of currently pending jobs in this scheduler (‘at’ jobs and ‘every’ jobs).

     # File lib/rufus/scheduler.rb, line 689
689:     def pending_job_count
690: 
691:       @pending_jobs.size
692:     end

precision= (f)

Setting the precision ( 0.0 < p <= 1.0 )

     # File lib/rufus/scheduler.rb, line 318
318:     def precision= (f)
319: 
320:       raise "precision must be 0.0 < p <= 1.0" \
321:         if f <= 0.0 or f > 1.0
322: 
323:       @precision = f
324:     end

schedule (cron_line, params={}, &block)

Schedules a cron job, the ‘cron_line’ is a string following the Unix cron standard (see "man 5 crontab" in your command line, or www.google.com/search?q=man%205%20crontab).

For example :

 scheduler.schedule("5 0 * * *", s)
   # will trigger the schedulable s every day
   # five minutes after midnight

 scheduler.schedule("15 14 1 * *", s)
   # will trigger s at 14:15 on the first of every month

 scheduler.schedule("0 22 * * 1-5") do
   puts "it's break time..."
 end
   # outputs a message every weekday at 10pm

Returns the job id attributed to this ‘cron job’, this id can be used to unschedule the job.

This method returns a job identifier which can be used to unschedule() the job.

     # File lib/rufus/scheduler.rb, line 588
588:     def schedule (cron_line, params={}, &block)
589: 
590:       params = prepare_params(params)
591: 
592:       #
593:       # is a job with the same id already scheduled ?
594: 
595:       cron_id = params[:cron_id] || params[:job_id]
596: 
597:       #@unschedule_queue << cron_id
598: 
599:       #
600:       # schedule
601: 
602:       b = to_block(params, &block)
603:       job = CronJob.new(self, cron_id, cron_line, params, &b)
604: 
605:       @schedule_queue << job
606: 
607:       job.job_id
608:     end

schedule_at (at, params={}, &block)

Schedules a job by specifying at which time it should trigger. Returns the a job_id that can be used to unschedule the job.

This method returns a job identifier which can be used to unschedule() the job.

If the job is specified in the past, it will be triggered immediately but not scheduled. To avoid the triggering, the parameter :discard_past may be set to true :

  jobid = scheduler.schedule_at(yesterday, :discard_past => true) do
    puts "you'll never read this message"
  end

And ‘jobid’ will hold a nil (not scheduled).

     # File lib/rufus/scheduler.rb, line 480
480:     def schedule_at (at, params={}, &block)
481: 
482:       do_schedule_at(
483:         at,
484:         prepare_params(params),
485:         &block)
486:     end

schedule_every (freq, params={}, &block)

Schedules a job in a loop. After an execution, it will not execute before the time specified in ‘freq’.

This method returns a job identifier which can be used to unschedule() the job.

In case of exception in the job, it will be rescheduled. If you don‘t want the job to be rescheduled, set the parameter :try_again to false.

  scheduler.schedule_every "500", :try_again => false do
    do_some_prone_to_error_stuff()
      # won't get rescheduled in case of exception
  end

Since rufus-scheduler 1.0.2, the params :first_at and :first_in are accepted.

  scheduler.schedule_every "2d", :first_in => "5h" do
    # schedule something every two days, start in 5 hours...
  end

(without setting a :first_in (or :first_at), our example schedule would have had been triggered after two days).

     # File lib/rufus/scheduler.rb, line 539
539:     def schedule_every (freq, params={}, &block)
540: 
541:       params = prepare_params params
542:       params[:every] = freq
543: 
544:       first_at = params[:first_at]
545:       first_in = params[:first_in]
546: 
547:       first_at = if first_at
548:         at_to_f(first_at)
549:       elsif first_in
550:         Time.now.to_f + Rufus.duration_to_f(first_in)
551:       else
552:         Time.now.to_f + Rufus.duration_to_f(freq) # not triggering immediately
553:       end
554: 
555:       do_schedule_at(first_at, params, &block)
556:     end

schedule_in (duration, params={}, &block)

Schedules a job by stating in how much time it should trigger. Returns the a job_id that can be used to unschedule the job.

This method returns a job identifier which can be used to unschedule() the job.

     # File lib/rufus/scheduler.rb, line 501
501:     def schedule_in (duration, params={}, &block)
502: 
503:       do_schedule_at(
504:         Time.new.to_f + Rufus::duration_to_f(duration),
505:         prepare_params(params),
506:         &block)
507:     end

sstart ()

Alias for start


sstop ()

Alias for stop


start ()

Starts this scheduler (or restart it if it was previously stopped)

     # File lib/rufus/scheduler.rb, line 377
377:     def start
378: 
379:       @stopped = false
380: 
381:       @scheduler_thread = Thread.new do
382: 
383:         Thread.current[:name] = @thread_name
384: 
385:         if defined?(JRUBY_VERSION)
386:           require 'java'
387:           java.lang.Thread.current_thread.name = @thread_name
388:         end
389: 
390:         loop do
391: 
392:           break if @stopped
393: 
394:           t0 = Time.now.to_f
395: 
396:           step
397: 
398:           d = Time.now.to_f - t0 # + @correction
399: 
400:           next if d > @precision
401: 
402:           sleep (@precision - d)
403:         end
404:       end
405:     end

stop ()

The scheduler is stoppable via sstop()

     # File lib/rufus/scheduler.rb, line 420
420:     def stop
421: 
422:       @stopped = true
423:     end

unschedule (job_id)

Unschedules an ‘at’ or a ‘cron’ job identified by the id it was given at schedule time.

     # File lib/rufus/scheduler.rb, line 625
625:     def unschedule (job_id)
626: 
627:       @unschedule_queue << job_id
628:     end

unschedule_cron_job (job_id)

Unschedules a cron job

(deprecated : use unschedule(job_id) for all the jobs !)

     # File lib/rufus/scheduler.rb, line 635
635:     def unschedule_cron_job (job_id)
636: 
637:       unschedule(job_id)
638:     end