|
|
| |
IPC::DirQueue(3) |
User Contributed Perl Documentation |
IPC::DirQueue(3) |
IPC::DirQueue - disk-based many-to-many task queue
my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" });
$dq->enqueue_file("filename");
my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" });
my $job = $dq->pickup_queued_job();
if (!$job) { print "no jobs left\n"; exit; }
# ...do something interesting with $job->get_data_path() ...
$job->finish();
This module implements a FIFO queueing infrastructure, using a directory as the
communications and storage media. No daemon process is required to manage the
queue; all communication takes place via the filesystem.
A common UNIX system design pattern is to use a tool like
"lpr" as a task queueing system; for
example,
"http://patrick.wagstrom.net/old/weblog/archives/000128.html"
describes the use of "lpr" as an MP3
jukebox.
However, "lpr" isn't as
efficient as it could be. When used in this way, you have to restart each
task processor for every new task. If you have a lot of startup overhead,
this can be very inefficient. With
"IPC::DirQueue", a processing server can
run persistently and cache data needed across multiple tasks efficiently; it
will not be restarted unless you restart it.
Multiple enqueueing and dequeueing processes on multiple hosts
(NFS-safe locking is used) can run simultaneously, and safely, on the same
queue.
Since multiple dequeuers can run simultaneously, this provides a
good way to process a variable level of incoming tasks using a pre-defined
number of worker processes.
If you need more CPU power working on a queue, you can simply
start another dequeuer to help out. If you need less, kill off a few
dequeuers.
If you need to take down the server to perform some maintainance
or upgrades, just kill the dequeuer processes, perform the work, and start
up new ones. Since there's no 'socket' or similar point of failure aside
from the directory itself, the queue will just quietly fill with waiting
jobs until the new dequeuer is ready.
Arbitrary 'name = value' string-pair metadata can be transferred
alongside data files. In fact, in some cases, you may find it easier to send
unused and empty data files, and just use the 'metadata' fields to transfer
the details of what will be worked on.
- $dq->new ($opts);
- Create a new queue object, suitable for either enqueueing jobs or picking
up already-queued jobs for processing.
$opts is a reference to a hash, which
may contain the following options:
- dir => $path_to_directory (no default)
- Name the directory where the queue files are stored. This is
required.
- data_file_mode => $mode (default: 0666)
- The "chmod"-style file mode for data
files. This should be specified as a string with a leading 0. It will be
affected by the current process
"umask".
- queue_file_mode => $mode (default: 0666)
- The "chmod"-style file mode for queue
control files. This should be specified as a string with a leading 0. It
will be affected by the current process
"umask".
- ordered => { 0 | 1 } (default: 1)
- Whether the jobs should be processed in order of submission, or in no
particular order.
- queue_fanout => { 0 | 1 } (default: 0)
- Whether the queue directory should be 'fanned out'. This allows better
scalability with NFS-shared queues with large numbers of pending files,
but hurts performance otherwise. It also implies ordered = 0. (This
is strictly experimental, has overall poor performance, and is not
recommended.)
- indexd_uri => $uri (default: undef)
- A URI of a "dq-indexd" daemon, used to
maintain the list of waiting jobs. The URI must be of the form
"dq://hostname[:port]" . (This is
strictly experimental, and is not recommended.)
- buf_size => $number (default: 65536)
- The buffer size to use when copying files, in bytes.
- active_file_lifetime => $number (default: 600)
- The lifetime of an untouched active lockfile, in seconds. See 'STALE LOCKS
AND SIGNAL HANDLING', below, for more details.
- $dq->enqueue_file ($filename [, $metadata [, $pri] ] );
- Enqueue a new job for processing. Returns 1 if the
job was enqueued, or "undef" on failure.
$filename is the path to the file to
be enqueued. Its contents will be read, and will be used as the contents
of the data file available to dequeuers using
"IPC::DirQueue::Job::get_data_path()".
$metadata is an optional hash
reference; every item of metadata will be available to worker processes
on the "IPC::DirQueue::Job" object, in
the "$job->{metadata}" hashref.
Note that using this channel for metadata brings with it several
restrictions:
- 1. it requires that the metadata be stored as 'name' => 'value' string
pairs
- 2. neither 'name' nor 'value' may contain newline (\n) or NUL (\0)
characters
- 3. 'name' cannot contain colon (:) characters
- 4. 'name' cannot start with a capital letter 'Q' and be 4 characters in
length
If those restrictions are broken, die() will be called with
the following error:
die "IPC::DirQueue: invalid metadatum: '$k'";
This is a change added in release 0.06; prior to that, that
metadatum would be silently dropped.
An optional priority can be specified; lower priorities are run
first. Priorities range from 0 to 99, and 50 is default.
- $dq->enqueue_fh ($filehandle [, $metadata [, $pri] ] );
- Enqueue a new job for processing. Returns 1 if the
job was enqueued, or "undef" on failure.
$pri and $metadata are as
described in "$dq->enqueue_file()".
$filehandle is a perl file handle that
must be open for reading. It will be closed on completion, regardless of
success or failure. Its contents will be read, and will be used as the
contents of the data file available to dequeuers using
"IPC::DirQueue::Job::get_data_path()".
- $dq->enqueue_string ($string [, $metadata [, $pri] ] );
- Enqueue a new job for processing. The job data is entirely read from
$string. Returns 1 if the
job was enqueued, or "undef" on failure.
$pri and $metadata are as
described in
"$dq->enqueue_file()".
- $dq->enqueue_sub ($subref [, $metadata [, $pri] ] );
- Enqueue a new job for processing. Returns 1 if the
job was enqueued, or "undef" on failure.
$pri and $metadata are as
described in "$dq->enqueue_file()".
$subref is a perl subroutine, which is
expected to return one of the following each time it is called:
- a string of data bytes to be appended to any existing data. (the
string may be empty, C<''>, in which case it's a no-op.)
- C<undef> when the enqueued data has ended, ie. EOF.
- C<die()> if an error occurs. The C<die()> message will be converted into
a warning, and the C<enqueue_sub()> call will return C<undef>.
(Tip: note that this is a closure, so variables outside the
subroutine can be accessed safely.)
- $job = $dq->pickup_queued_job( [ path => $path ] );
- Pick up the next job in the queue, so that it can be processed.
If no job is available for processing, either because the
queue is empty or because other worker processes are already working on
them, "undef" is returned; otherwise,
a new instance of "IPC::DirQueue::Job"
is returned.
Note that the job is marked as active until
"$job->finish()" is called.
If the (optional) parameter
"path" is used, its value indicates
the path of the desired job's data file. By using this, it is possible
to cancel not-yet-active items from anywhere in the queue, or pick up
jobs out of sequence. The data path must match the value of the
pathqueue member of the
"IPC::DirQueue::Job" object passed to
the "visit_all_jobs()" callback.
- $job = $dq->wait_for_queued_job ([ $timeout [, $pollinterval] ]);
- Wait for a job to be queued within the next
$timeout seconds.
If there is already a job ready for processing, this will
return immediately. If one is not available, it will sleep, wake up
periodically, check for job availabilty, and either carry on sleeping or
return the new job if one is now available.
If a job becomes available, a new instance of
"IPC::DirQueue::Job" is returned. If
the timeout is reached, "undef" is
returned.
If $timeout is not specified, or is
less than 1, this function will wait indefinitely.
The optional parameter $pollinterval
indicates how frequently to wake up and check for new jobs. It is
specified in seconds, and floating-point precision is supported. The
default is 1.
Note that if $timeout is not a round
multiple of $pollinterval, the nearest round
multiple of $pollinterval greater than
$timeout will be used instead. Also note that
$timeout is used as an integer.
- $dq->visit_all_jobs($visitor, $visitcontext);
- Visit all the jobs in the queue, in a read-only mode. Used to list the
entire queue.
The callback function $visitor will be
called for each job in the queue, like so:
&$visitor ($visitcontext, $job);
$visitcontext is whatever you pass in
that variable above. $job is a new, read-only
instance of "IPC::DirQueue::Job"
representing that job.
If a job is active (being processed), the
$job object also contains the following
additional data:
'active_host': the hostname on which the job is active
'active_pid': the process ID of the process which picked up the job
If interrupted or terminated, dequeueing processes should be careful to either
call "$job->finish()" or
"$job->return_to_queue()" on any active
tasks before exiting -- otherwise those jobs will remain marked active.
Dequeueing processes can also call
"$job->touch_active_lock()"
periodically, while processing large tasks, to ensure that the task is still
marked as active.
Stale locks are normally dealt with automatically. If a lock is
still active after about 10 minutes of inactivity, the other
dequeuers on that machine will probe the process ID listed in that lock file
using kill(0). If that process ID is no longer
running, the lock is presumed likely to be stale. If a given timeout (10
minutes plus a random value between 0 and 256 seconds) has elapsed since the
lock file was last modified, the lock file is deleted.
This 10-minute default can be modified using the
"active_file_lifetime" parameter to the
"IPC::DirQueue" constructor.
Note: this means that if the dequeueing processes are spread among
multiple machines, and there is no longer a dequeuer running on the machine
that initially 'locked' the task, it will never be unlocked, unless you
delete the active file for that task.
"IPC::DirQueue" maintains the following
structure for a queue directory:
- queue directory
- The queue directory is used to store the queue control files. Queue
control files determine what jobs are in the queue; if a job has a queue
control file in this directory, it is listed in the queue.
The filename format is as follows:
50.20040909232529941258.HASH[.PID.RAND]
The first two digits (50) are the
priority of the job. Lower priority numbers are run first.
20040909232529 is the current date and time when
the enqueueing process was run, in
"YYYYMMDDHHMMSS" format.
941258 is the time in microseconds, as returned
by "gettimeofday()". And finally,
"HASH" is a variable-length hash of
some semi-random data, used to increase the chance of uniqueness.
If there is a collision, the timestamps are regenerated after
a 250 msec sleep, and further randomness will be added at the end of the
string (namely, the current process ID and a random integer value). Up
to 10 retries will be attempted. Once the file is atomically moved into
the queue directory without collision, the retries cease.
If queue_fanout was used in the
"IPC::DirQueue" constructor, then the
queue directory does not contain the queue control files
directly; instead, there is an interposing set of 16 "fan-out"
directories, named according to the hex digits from
0 to "f".
- active directory
- The active directory is used to store active queue control files.
When a job becomes 'active' -- ie. is picked up by
"pickup_queued_job()" -- its control
file is moved from the queue directory into the active
directory while it is processed.
- data directory
- The data directory is used to store enqueued data files.
It contains a two-level "fan-out" hashed directory
structure; each data file is stored under a single-letter directory,
which in turn is under a single-letter directory. This increases the
efficiency of directory lookups under many filesystems.
The format of filenames here is similar to that used in the
queue directory, except that the last two characters are removed
and used instead for the "fan-out" directory names.
- tmp directory
- The tmp directory contains temporary work files that are in the
process of enqueueing, and not ready ready for processing.
The filename format here is similar to the above, with
suffixes indicating the type of file (".ctrl",
".data").
Atomic, NFS-safe renaming is used to avoid collisions, overwriting
or other unsafe operations.
Justin Mason <dq /at/ jmason.org>
The IPC::DirQueue mailing list is at <ipc-dirqueue-subscribe@perl.org>.
"IPC::DirQueue" is distributed under the same
license as perl itself.
The latest version of this library is likely to be available from CPAN.
Visit the GSP FreeBSD Man Page Interface. Output converted with ManDoc. |