|
NAMEMCE::Shared::Queue - Hybrid-queue helper classVERSIONThis document describes MCE::Shared::Queue version 1.876DESCRIPTIONA queue helper class for use as a standalone or managed by MCE::Shared.This module is mostly compatible with MCE::Queue except for the "gather" option which is not supported in this context. It provides a queue interface supporting normal and priority queues. Data from shared queues reside under the shared-manager process, otherwise locally. SYNOPSIS# non-shared or local construction for use by a single process use MCE::Shared::Queue; my $qu = MCE::Shared::Queue->new( await => 1, queue => [ "." ] ); # construction for sharing with other threads and processes use MCE::Shared; use MCE::Shared::Queue; my $qu = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST, type => $MCE::Shared::Queue::FIFO, ); # possible values for "porder" and "type" porder => $MCE::Shared::Queue::HIGHEST # Highest priority items dequeue first $MCE::Shared::Queue::LOWEST # Lowest priority items dequeue first type => $MCE::Shared::Queue::FIFO # First in, first out $MCE::Shared::Queue::LIFO # Last in, first out $MCE::Shared::Queue::LILO # Synonym for FIFO $MCE::Shared::Queue::FILO # Synonym for LIFO # below, [ ... ] denotes optional parameters $qu->await( [ $pending_threshold ] ); $qu->clear(); $qu->end(); $qu->enqueue( $item [, $item, ... ] ); $qu->enqueuep( $priority, $item [, $item, ... ] ); $item = $qu->dequeue(); @items = $qu->dequeue( $count ); $item = $qu->dequeue_nb(); @items = $qu->dequeue_nb( $count ); $qu->insert( $index, $item [, $item, ... ] ); $qu->insertp( $priority, $index, $item [, $item, ... ] ); $count = $qu->pending(); $item = $qu->peek( [ $index ] ); $item = $qu->peekp( $priority [, $index ] ); @array = $qu->heap(); API DOCUMENTATIONMCE::Shared::Queue->new ( [ options ] )MCE::Shared->queue ( [ options ] )Constructs a new object. Supported options are queue, porder, type, and await. Note: The barrier and fast options are silentently ignored (no-op) if specified; starting with 1.867.# non-shared or local construction for use by a single process use MCE::Shared::Queue; $q1 = MCE::Shared::Queue->new(); $q2 = MCE::Shared::Queue->new( queue => [ 0, 1, 2 ] ); $q3 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::HIGHEST ); $q4 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::LOWEST ); $q5 = MCE::Shared::Queue->new( type => $MCE::Shared::Queue::FIFO ); $q6 = MCE::Shared::Queue->new( type => $MCE::Shared::Queue::LIFO ); $q7 = MCE::Shared::Queue->new( await => 1, barrier => 0 ); $q8 = MCE::Shared::Queue->new( fast => 1 ); # construction for sharing with other threads and processes use MCE::Shared; use MCE::Shared::Queue; $q1 = MCE::Shared->queue(); $q2 = MCE::Shared->queue( queue => [ 0, 1, 2 ] ); $q3 = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST ); $q4 = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST ); $q5 = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO ); $q6 = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO ); $q7 = MCE::Shared->queue( await => 1, barrier => 0 ); $q8 = MCE::Shared->queue( fast => 1 ); The "await" option, when enabled, allows workers to block (semaphore-like) until the number of items pending is equal or less than a threshold value. The "await" method is described below. Obsolete: On Unix platforms, "barrier" mode (enabled by default) prevents many workers from dequeuing simultaneously to lessen overhead for the OS kernel. Specify 0 to disable barrier mode and not allocate sockets. The barrier option has no effect if constructing the queue inside a thread or enabling "fast". Obsolete: The "fast" option speeds up dequeues and is not enabled by default. It is beneficial for queues not calling (->dequeue_nb) and not altering the count value while running; e.g. ->dequeue($count). await ( pending_threshold )Waits until the queue drops down to threshold items. The "await" method is beneficial when wanting to throttle worker(s) appending to the queue. Perhaps, consumers are running a bit behind and wanting prevent memory consumption from increasing too high. Below, the number of items pending will never go above 20.use Time::HiRes qw( sleep ); use MCE::Flow; use MCE::Shared; my $q = MCE::Shared->queue( await => 1, fast => 1 ); my ( $producers, $consumers ) = ( 1, 8 ); mce_flow { task_name => [ 'producer', 'consumer' ], max_workers => [ $producers, $consumers ], }, sub { ## producer for my $item ( 1 .. 100 ) { $q->enqueue($item); ## blocks until the # of items pending reaches <= 10 if ($item % 10 == 0) { MCE->say( 'pending: '.$q->pending() ); $q->await(10); } } ## notify consumers no more work $q->end(); }, sub { ## consumers while (defined (my $next = $q->dequeue())) { MCE->say( MCE->task_wid().': '.$next ); sleep 0.100; } }; clear ( )Clears the queue of any items.$q->clear; end ( )Stops the queue from receiving more items. Any worker blocking on "dequeue" will be unblocked automatically. Subsequent calls to "dequeue" will behave like "dequeue_nb". Current API available since MCE::Shared 1.814.$q->end(); MCE Models (e.g. MCE::Flow) may persist between runs. In that case, one might want to enqueue "undef"'s versus calling "end". The number of "undef"'s depends on how many items workers dequeue at a time. $q->enqueue((undef) x ($N_workers * 1)); # $q->dequeue() 1 item $q->enqueue((undef) x ($N_workers * 2)); # $q->dequeue(2) 2 items $q->enqueue((undef) x ($N_workers * N)); # $q->dequeue(N) N items enqueue ( item [, item, ... ] )Appends a list of items onto the end of the normal queue.$q->enqueue( 'foo' ); $q->enqueue( 'bar', 'baz' ); enqueuep ( priority, item [, item, ... ] )Appends a list of items onto the end of the priority queue with priority.$q->enqueue( $priority, 'foo' ); $q->enqueue( $priority, 'bar', 'baz' ); dequeue ( [ count ] )Returns the requested number of items (default 1) from the queue. Priority data will always dequeue first before any data from the normal queue.$q->dequeue( 2 ); $q->dequeue; # default 1 The method will block if the queue contains zero items. If the queue contains fewer than the requested number of items, the method will not block, but return whatever items there are on the queue. The $count, used for requesting the number of items, is beneficial when workers are passing parameters through the queue. For this reason, always remember to dequeue using the same multiple for the count. This is unlike Thread::Queue which will block until the requested number of items are available. # MCE::Shared::Queue 1.816 and prior releases while ( my @items = $q->dequeue(2) ) { last unless ( defined $items[0] ); ... } # MCE::Shared::Queue 1.817 and later while ( my @items = $q->dequeue(2) ) { ... } dequeue_nb ( [ count ] )Returns the requested number of items (default 1) from the queue. Like with dequeue, priority data will always dequeue first. This method is non-blocking and returns "undef" in the absence of data.$q->dequeue_nb( 2 ); $q->dequeue_nb; # default 1 insert ( index, item [, item, ... ] )Adds the list of items to the queue at the specified index position (0 is the head of the list). The head of the queue is that item which would be removed by a call to dequeue.$q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO ); $q->enqueue(1, 2, 3, 4); $q->insert(1, 'foo', 'bar'); # Queue now contains: 1, foo, bar, 2, 3, 4 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO ); $q->enqueue(1, 2, 3, 4); $q->insert(1, 'foo', 'bar'); # Queue now contains: 1, 2, 3, 'foo', 'bar', 4 insertp ( priority, index, item [, item, ... ] )Adds the list of items to the queue at the specified index position with priority. The behavior is similarly to "$q->insert" otherwise.pending ( )Returns the number of items in the queue. The count includes both normal and priority data. Returns "undef" if the queue has been ended, and there are no more items in the queue.$q = MCE::Shared->queue(); $q->enqueuep(5, 'foo', 'bar'); $q->enqueue('sunny', 'day'); print $q->pending(), "\n"; # Output: 4 peek ( [ index ] )Returns an item from the normal queue, at the specified index, without dequeuing anything. It defaults to the head of the queue if index is not specified. The head of the queue is that item which would be removed by a call to dequeue. Negative index values are supported, similarly to arrays.$q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO ); $q->enqueue(1, 2, 3, 4, 5); print $q->peek(1), ' ', $q->peek(-2), "\n"; # Output: 2 4 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO ); $q->enqueue(1, 2, 3, 4, 5); print $q->peek(1), ' ', $q->peek(-2), "\n"; # Output: 4 2 peekp ( priority [, index ] )Returns an item from the queue with priority, at the specified index, without dequeuing anything. It defaults to the head of the queue if index is not specified. The behavior is similarly to "$q->peek" otherwise.peekh ( [ index ] )Returns an item from the head of the heap or at the specified index.$q = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST ); $q->enqueuep(5, 'foo'); $q->enqueuep(6, 'bar'); $q->enqueuep(4, 'sun'); print $q->peekh(0), "\n"; # Output: 6 $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST ); $q->enqueuep(5, 'foo'); $q->enqueuep(6, 'bar'); $q->enqueuep(4, 'sun'); print $q->peekh(0), "\n"; # Output: 4 heap ( )Returns an array containing the heap data. Heap data consists of priority numbers, not the data.@h = $q->heap; # $MCE::Shared::Queue::HIGHEST # Heap contains: 6, 5, 4 @h = $q->heap; # $MCE::Shared::Queue::LOWEST # Heap contains: 4, 5, 6 ACKNOWLEDGMENTS
LIMITATIONSPerl must have IO::FDPass for constructing a shared "condvar" or "queue" while the shared-manager process is running. For platforms where IO::FDPass isn't possible, construct "condvar" and "queue" before other classes. On systems without "IO::FDPass", the manager process is delayed until sharing other classes or started explicitly.use MCE::Shared; my $has_IO_FDPass = $INC{'IO/FDPass.pm'} ? 1 : 0; my $cv = MCE::Shared->condvar(); my $que = MCE::Shared->queue(); MCE::Shared->start() unless $has_IO_FDPass; Regarding mce_open, "IO::FDPass" is needed for constructing a shared-handle from a non-shared handle not yet available inside the shared-manager process. The workaround is to have the non-shared handle made before the shared-manager is started. Passing a file by reference is fine for the three STD* handles. # The shared-manager knows of \*STDIN, \*STDOUT, \*STDERR. mce_open my $shared_in, "<", \*STDIN; # ok mce_open my $shared_out, ">>", \*STDOUT; # ok mce_open my $shared_err, ">>", \*STDERR; # ok mce_open my $shared_fh1, "<", "/path/to/sequence.fasta"; # ok mce_open my $shared_fh2, ">>", "/path/to/results.log"; # ok mce_open my $shared_fh, ">>", \*NON_SHARED_FH; # requires IO::FDPass The IO::FDPass module is known to work reliably on most platforms. Install 1.1 or later to rid of limitations described above. perl -MIO::FDPass -le "print 'Cheers! Perl has IO::FDPass.'" INDEXMCE, MCE::Hobo, MCE::SharedAUTHORMario E. Roy, <marioeroy AT gmail DOT com>
Visit the GSP FreeBSD Man Page Interface. |