Directory::Queue::Normal - object oriented interface to a normal directory based
queue
use Directory::Queue::Normal;
#
# simple schema:
# - there must be a "body" which is a string
# - there can be a "header" which is a table/hash
#
$schema = { "body" => "string", "header" => "table?" };
$queuedir = "/tmp/test";
#
# sample producer
#
$dirq = Directory::Queue::Normal->new(path => $queuedir, schema => $schema);
foreach $count (1 .. 100) {
$name = $dirq->add(body => "element $count\n", header => \%ENV);
printf("# added element %d as %s\n", $count, $name);
}
#
# sample consumer (one pass only)
#
$dirq = Directory::Queue::Normal->new(path => $queuedir, schema => $schema);
for ($name = $dirq->first(); $name; $name = $dirq->next()) {
next unless $dirq->lock($name);
printf("# reading element %s\n", $name);
%data = $dirq->get($name);
# one can use $data{body} and $data{header} here...
# one could use $dirq->unlock($name) to only browse the queue...
$dirq->remove($name);
}
#
# looping consumer (sleeping to avoid using all CPU time)
#
$dirq = Directory::Queue::Normal->new(path => $queuedir, schema => $schema);
while (1) {
sleep(1) unless $dirq->count();
for ($name = $dirq->first(); $name; $name = $dirq->next()) {
... same as above ...
}
}
The goal of this module is to offer a "normal" (as opposed to
"simple") queue system using the underlying filesystem for storage,
security and to prevent race conditions via atomic operations.
It allows arbitrary data to be stored (see the "SCHEMA"
section for more information) but it has a significant disk space and speed
overhead.
Please refer to Directory::Queue for general information about
directory queues.
The new() method can be used to create a Directory::Queue::Normal object
that will later be used to interact with the queue. The following attributes
are supported:
- path
- the queue toplevel directory (mandatory)
- rndhex
- the "random" hexadecimal digit to use in element names (aka
R) as a number between 0 and 15 (default: randomly generated)
- umask
- the umask to use when creating files and directories (default: use the
running process' umask)
- maxelts
- the maximum number of elements that an intermediate directory can hold
(default: 16,000)
- maxlock
- default maximum time for a locked element (in seconds, default 600) as
used by the purge() method
- maxtemp
- default maximum time for a temporary element (in seconds, default 300) as
used by the purge() method
- schema
- the schema defining how to interpret user supplied data (mandatory if
elements are added or read)
The schema defines how user supplied data is stored in the queue. It is only
required by the add() and get() methods.
The schema must be a reference to a hash containing key/value
pairs.
The key must contain only alphanumerical characters. It identifies
the piece of data and will be used as file name when storing the data inside
the element directory.
The value represents the type of the given piece of data. It can
be:
- binary
- the data is a binary string (i.e. a sequence of bytes), it will be stored
directly in a plain file with no further encoding
- string
- the data is a text string (i.e. a sequence of characters), it will be
UTF-8 encoded before being stored in a file
- table
- the data is a reference to a hash of text strings, it will be serialized
and UTF-8 encoded before being stored in a file
By default, all pieces of data are mandatory. If you append a
question mark to the type, this piece of data will be marked as optional.
See the comments in the "SYNOPSIS" section for an example.
By default, string or binary data is used directly. If you append
an asterisk to the type, the data that you add or get will be by reference.
This can be useful to avoid string copies of large amounts of data.
The following methods are available:
- new()
- return a new Directory::Queue::Normal object (class method)
- copy()
- return a copy of the object; this can be useful to have independent
iterators on the same queue
- path()
- return the queue toplevel path
- id()
- return a unique identifier for the queue
- count()
- return the number of elements in the queue
- first()
- return the first element in the queue, resetting the iterator; return an
empty string if the queue is empty
- next()
- return the next element in the queue, incrementing the iterator; return an
empty string if there is no next element
- add(DATA)
- add the given data (a hash or hash reference) to the queue and return the
corresponding element name; the schema must be known and the data must
conform to it
- lock(ELEMENT[, PERMISSIVE])
- attempt to lock the given element and return true on success; if the
PERMISSIVE option is true (which is the default), it is not a fatal error
if the element cannot be locked and false is returned
- unlock(ELEMENT[, PERMISSIVE])
- attempt to unlock the given element and return true on success; if the
PERMISSIVE option is true (which is not the default), it is not a
fatal error if the element cannot be unlocked and false is returned
- touch(ELEMENT)
- update the access and modification times on the element's directory to
indicate that it is still being used; this is useful for elements that are
locked for long periods of time (see the purge() method)
- remove(ELEMENT)
- remove the given element (which must be locked) from the queue
- get(ELEMENT)
- get the data from the given element (which must be locked) and return
basically the same hash as what add() got (in list context, the
hash is returned directly while in scalar context, the hash reference is
returned instead); the schema must be knownand the data must conform to
it
- purge([OPTIONS])
- purge the queue by removing unused intermediate directories, removing too
old temporary elements and unlocking too old locked elements (aka staled
locks); note: this can take a long time on queues with many elements;
OPTIONS can be:
- maxtemp
- maximum time for a temporary element (in seconds); if set to 0, temporary
elements will not be removed
- maxlock
- maximum time for a locked element (in seconds); if set to 0, locked
elements will not be unlocked
All the directories holding the elements and all the files holding the data
pieces are located under the queue toplevel directory. This directory can
contain:
- temporary
- the directory holding temporary elements, i.e. the elements being
added
- obsolete
- the directory holding obsolete elements, i.e. the elements being
removed
- NNNNNNNN
- an intermediate directory holding elements; NNNNNNNN is an 8-digits
long hexadecimal number
In any of the above directories, an element is stored as a single
directory with a 14-digits long hexadecimal name SSSSSSSSMMMMMR
where:
- SSSSSSSS
- represents the number of seconds since the Epoch
- MMMMM
- represents the microsecond part of the time since the Epoch
- R
- is a random hexadecimal digit used to reduce name collisions
Finally, inside an element directory, the different pieces of data
are stored into different files, named according to the schema. A locked
element contains in addition a directory named
"locked".
Lionel Cons <http://cern.ch/lionel.cons>
Copyright (C) CERN 2010-2021