|
|
| |
MCE::Hobo(3) |
User Contributed Perl Documentation |
MCE::Hobo(3) |
MCE::Hobo - A threads-like parallelization module
This document describes MCE::Hobo version 1.876
use MCE::Hobo;
MCE::Hobo->init(
max_workers => 'auto', # default undef, unlimited
# Specify a percentage. MCE::Hobo 1.874+.
max_workers => '25%', # 4 on HW with 16 lcores
max_workers => '50%', # 8 on HW with 16 lcores
hobo_timeout => 20, # default undef, no timeout
posix_exit => 1, # default undef, CORE::exit
void_context => 1, # default undef
on_start => sub {
my ( $pid, $ident ) = @_;
...
},
on_finish => sub {
my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
...
}
);
MCE::Hobo->create( sub { print "Hello from hobo\n" } )->join();
sub parallel {
my ($arg1) = @_;
print "Hello again, $arg1\n" if defined($arg1);
print "Hello again, $_\n"; # same thing
}
MCE::Hobo->create( \¶llel, $_ ) for 1 .. 3;
my @hobos = MCE::Hobo->list();
my @pids = MCE::Hobo->list_pids();
my @running = MCE::Hobo->list_running();
my @joinable = MCE::Hobo->list_joinable();
my @count = MCE::Hobo->pending();
# Joining is orderly, e.g. hobo1 is joined first, hobo2, hobo3.
$_->join() for @hobos; # (or)
$_->join() for @joinable;
# Joining occurs immediately as hobo processes complete execution.
1 while MCE::Hobo->wait_one();
my $hobo = mce_async { foreach (@files) { ... } };
$hobo->join();
if ( my $err = $hobo->error() ) {
warn "Hobo error: $err\n";
}
# Get a hobo's object
$hobo = MCE::Hobo->self();
# Get a hobo's ID
$pid = MCE::Hobo->pid(); # $$
$pid = $hobo->pid();
$pid = MCE::Hobo->tid(); # tid is an alias for pid
$pid = $hobo->tid();
# Test hobo objects
if ( $hobo1 == $hobo2 ) {
...
}
# Give other workers a chance to run
MCE::Hobo->yield();
MCE::Hobo->yield(0.05);
# Return context, wantarray aware
my ($value1, $value2) = $hobo->join();
my $value = $hobo->join();
# Check hobo's state
if ( $hobo->is_running() ) {
sleep 1;
}
if ( $hobo->is_joinable() ) {
$hobo->join();
}
# Send a signal to a hobo
$hobo->kill('SIGUSR1');
# Exit a hobo
MCE::Hobo->exit(0);
MCE::Hobo->exit(0, @ret); # MCE::Hobo 1.827+
A hobo is a migratory worker inside the machine that carries the asynchronous
gene. Hobo processes are equipped with
"threads"-like capability for running code
asynchronously. Unlike threads, each hobo is a unique process to the
underlying OS. The IPC is managed by
"MCE::Shared", which runs on all the major
platforms including Cygwin and Strawberry Perl.
An exception was made on the Windows platform to spawn threads
versus children in "MCE::Hobo" 1.807
through 1.816. For consistency, the 1.817 release reverts back to spawning
children on all supported platforms.
"MCE::Hobo" may be used as a
standalone or together with "MCE"
including running alongside "threads".
use MCE::Hobo;
use MCE::Shared;
# synopsis: head -20 file.txt | perl script.pl
my $ifh = MCE::Shared->handle( "<", \*STDIN ); # shared
my $ofh = MCE::Shared->handle( ">", \*STDOUT );
my $ary = MCE::Shared->array();
sub parallel_task {
my ( $id ) = @_;
while ( <$ifh> ) {
printf {$ofh} "[ %4d ] %s", $., $_;
# $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" ); # dereferencing
$ary->set( $. - 1, "[ ID $id ] read line $.\n" ); # faster via OO
}
}
my $hobo1 = MCE::Hobo->new( "parallel_task", 1 );
my $hobo2 = MCE::Hobo->new( \¶llel_task, 2 );
my $hobo3 = MCE::Hobo->new( sub { parallel_task(3) } );
$_->join for MCE::Hobo->list(); # ditto: MCE::Hobo->wait_all();
# search array (total one round-trip via IPC)
my @vals = $ary->vals( "val =~ / ID 2 /" );
print {*STDERR} join("", @vals);
- $hobo = MCE::Hobo->create( FUNCTION, ARGS )
- $hobo = MCE::Hobo->new( FUNCTION, ARGS )
- This will create a new hobo process that will begin execution with
function as the entry point, and optionally ARGS for list of parameters.
It will return the corresponding MCE::Hobo object, or undef if hobo
creation failed.
FUNCTION may either be the name of a function, an
anonymous subroutine, or a code ref.
my $hobo = MCE::Hobo->create( "func_name", ... );
# or
my $hobo = MCE::Hobo->create( sub { ... }, ... );
# or
my $hobo = MCE::Hobo->create( \&func, ... );
- $hobo = MCE::Hobo->create( { options }, FUNCTION, ARGS )
- $hobo = MCE::Hobo->create( IDENT, FUNCTION, ARGS )
- Options, excluding "ident", may be
specified globally via the "init"
function. Otherwise, "ident",
"hobo_timeout",
"posix_exit", and
"void_context" may be set uniquely.
The "ident" option,
available since 1.827, is used by callback functions
"on_start" and
"on_finish" for identifying the
started and finished hobo process respectively.
my $hobo1 = MCE::Hobo->create( { posix_exit => 1 }, sub {
...
} );
$hobo1->join;
my $hobo2 = MCE::Hobo->create( { hobo_timeout => 3 }, sub {
sleep 1 for ( 1 .. 9 );
} );
$hobo2->join;
if ( $hobo2->error() eq "Hobo timed out\n" ) {
...
}
The "new()" method is an
alias for "create()".
- mce_async { BLOCK } ARGS;
- mce_async { BLOCK };
- "mce_async" runs the block
asynchronously similarly to
"MCE::Hobo->create()". It returns the
hobo object, or undef if hobo creation failed.
my $hobo = mce_async { foreach (@files) { ... } };
$hobo->join();
if ( my $err = $hobo->error() ) {
warn("Hobo error: $err\n");
}
- $hobo->join()
- This will wait for the corresponding hobo process to complete its
execution. In non-voided context,
"join()" will return the value(s) of the
entry point function.
The context (void, scalar or list) for the return value(s) for
"join" is determined at the time of
joining and mostly "wantarray"
aware.
my $hobo1 = MCE::Hobo->create( sub {
my @res = qw(foo bar baz);
return (@res);
});
my @res1 = $hobo1->join(); # ( foo, bar, baz )
my $res1 = $hobo1->join(); # baz
my $hobo2 = MCE::Hobo->create( sub {
return 'foo';
});
my @res2 = $hobo2->join(); # ( foo )
my $res2 = $hobo2->join(); # foo
- $hobo1->equal( $hobo2 )
- Tests if two hobo objects are the same hobo or not. Hobo comparison is
based on process IDs. This is overloaded to the more natural forms.
if ( $hobo1 == $hobo2 ) {
print("Hobo objects are the same\n");
}
# or
if ( $hobo1 != $hobo2 ) {
print("Hobo objects differ\n");
}
- $hobo->error()
- Hobo processes are executed in an "eval"
context. This method will return "undef"
if the hobo terminates normally. Otherwise, it returns the value of
$@ associated with the hobo's execution status in
its "eval" context.
- $hobo->exit()
- This sends 'SIGQUIT' to the hobo process,
notifying the hobo to exit. It returns the hobo object to allow for method
chaining. It is important to join later if not immediately to not leave a
zombie or defunct process.
$hobo->exit()->join();
...
$hobo->join(); # later
- MCE::Hobo->exit( 0 )
- MCE::Hobo->exit( 0, @ret )
- A hobo can exit at any time by calling
"MCE::Hobo->exit()". Otherwise, the
behavior is the same as "exit(status)"
when called from the main process. Current since 1.827, the hobo process
may optionally return data, to be sent via IPC.
- MCE::Hobo->finish()
- This class method is called automatically by
"END", but may be called explicitly. An
error is emitted via croak if there are active hobo processes not yet
joined.
MCE::Hobo->create( 'task1', $_ ) for 1 .. 4;
$_->join for MCE::Hobo->list();
MCE::Hobo->create( 'task2', $_ ) for 1 .. 4;
$_->join for MCE::Hobo->list();
MCE::Hobo->create( 'task3', $_ ) for 1 .. 4;
$_->join for MCE::Hobo->list();
MCE::Hobo->finish();
- MCE::Hobo->init( options )
- The init function accepts a list of MCE::Hobo options.
MCE::Hobo->init(
max_workers => 'auto', # default undef, unlimited
# Specify a percentage. MCE::Hobo 1.874+.
max_workers => '25%', # 4 on HW with 16 lcores
max_workers => '50%', # 8 on HW with 16 lcores
hobo_timeout => 20, # default undef, no timeout
posix_exit => 1, # default undef, CORE::exit
void_context => 1, # default undef
on_start => sub {
my ( $pid, $ident ) = @_;
...
},
on_finish => sub {
my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
...
}
);
# Identification given as an option or the 1st argument.
# Current API available since 1.827.
for my $key ( 'aa' .. 'zz' ) {
MCE::Hobo->create( { ident => $key }, sub { ... } );
MCE::Hobo->create( $key, sub { ... } );
}
MCE::Hobo->wait_all;
Set "max_workers" if you
want to limit the number of workers by waiting automatically for an
available slot. Specify a percentage or
"auto" to obtain the number of logical
cores via "MCE::Util::get_ncpu()".
Set "hobo_timeout", in
number of seconds, if you want the hobo process to terminate after some
time. The default is 0 for no timeout.
Set "posix_exit" to avoid
all END and destructor processing. Constructing MCE::Hobo inside a
thread implies 1 or if present CGI, FCGI, Coro, Curses, Gearman::Util,
Gearman::XS, LWP::UserAgent, Mojo::IOLoop, STFL, Tk, Wx, or
Win32::GUI.
Set "void_context" to create
the hobo process in void context for the return value. Otherwise, the
return context is wantarray-aware for
"join()" and
"result()" and determined when
retrieving the data.
The callback options
"on_start" and
"on_finish" are called in the parent
process after starting the worker and later when terminated. The
arguments for the subroutines were inspired by
Parallel::ForkManager.
The parameters for
"on_start" are the following:
- pid of the hobo process
- identification (ident option or 1st arg to create)
The parameters for
"on_finish" are the following:
- pid of the hobo process
- program exit code
- identification (ident option or 1st arg to create)
- exit signal id
- error message from eval inside MCE::Hobo
- returned data
- $hobo->is_running()
- Returns true if a hobo is still running.
- $hobo->is_joinable()
- Returns true if the hobo has finished running and not yet joined.
- $hobo->kill( 'SIG...' )
- Sends the specified signal to the hobo. Returns the hobo object to allow
for method chaining. As with "exit", it
is important to join eventually if not immediately to not leave a zombie
or defunct process.
$hobo->kill('SIG...')->join();
The following is a parallel demonstration comparing
"MCE::Shared" against
"Redis" and
"Redis::Fast" on a Fedora 23 VM.
Joining begins after all workers have been notified to quit.
use Time::HiRes qw(time);
use Redis;
use Redis::Fast;
use MCE::Hobo;
use MCE::Shared;
my $redis = Redis->new();
my $rfast = Redis::Fast->new();
my $array = MCE::Shared->array();
sub parallel_redis {
my ($_redis) = @_;
my ($count, $quit, $len) = (0, 0);
# instead, use a flag to exit loop
$SIG{'QUIT'} = sub { $quit = 1 };
while () {
$len = $_redis->rpush('list', $count++);
last if $quit;
}
$count;
}
sub parallel_array {
my ($count, $quit, $len) = (0, 0);
# do not exit from inside handler
$SIG{'QUIT'} = sub { $quit = 1 };
while () {
$len = $array->push($count++);
last if $quit;
}
$count;
}
sub benchmark_this {
my ($desc, $num_procs, $timeout, $code, @args) = @_;
my ($start, $total) = (time(), 0);
MCE::Hobo->new($code, @args) for 1..$num_procs;
sleep $timeout;
# joining is not immediate; ok
$_->kill('QUIT') for MCE::Hobo->list();
# joining later; ok
$total += $_->join() for MCE::Hobo->list();
printf "$desc <> duration: %0.03f secs, count: $total\n",
time() - $start;
sleep 0.2;
}
benchmark_this('Redis ', 8, 5.0, \¶llel_redis, $redis);
benchmark_this('Redis::Fast', 8, 5.0, \¶llel_redis, $rfast);
benchmark_this('MCE::Shared', 8, 5.0, \¶llel_array);
- MCE::Hobo->list()
- Returns a list of all hobo objects not yet joined.
@hobos = MCE::Hobo->list();
- MCE::Hobo->list_pids()
- Returns a list of all hobo pids not yet joined (available since 1.849).
@pids = MCE::Hobo->list_pids();
$SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
# Signal workers and the shared manager all at once
CORE::kill('KILL', MCE::Hobo->list_pids(), MCE::Shared->pid());
exec('reset');
};
- MCE::Hobo->list_running()
- Returns a list of all hobo objects that are still running.
@hobos = MCE::Hobo->list_running();
- MCE::Hobo->list_joinable()
- Returns a list of all hobo objects that have completed running. Thus,
ready to be joined without blocking.
@hobos = MCE::Hobo->list_joinable();
- MCE::Hobo->max_workers([ N ])
- Getter and setter for max_workers. Specify a number or 'auto' to acquire
the total number of cores via MCE::Util::get_ncpu. Specify a false value
to set back to no limit.
API available since 1.835.
- MCE::Hobo->pending()
- Returns a count of all hobo objects not yet joined.
$count = MCE::Hobo->pending();
- $hobo->result()
- Returns the result obtained by "join",
"wait_one", or
"wait_all". If the process has not yet
exited, waits for the corresponding hobo to complete its execution.
use MCE::Hobo;
use Time::HiRes qw(sleep);
sub task {
my ($id) = @_;
sleep $id * 0.333;
return $id;
}
MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
# 1 while MCE::Hobo->wait_one();
while ( my $hobo = MCE::Hobo->wait_one() ) {
my $err = $hobo->error() || 'no error';
my $res = $hobo->result();
my $pid = $hobo->pid();
print "[$pid] $err : $res\n";
}
Like "join" described above,
the context (void, scalar or list) for the return value(s) is determined
at the time "result" is called and
mostly "wantarray" aware.
my $hobo1 = MCE::Hobo->create( sub {
my @res = qw(foo bar baz);
return (@res);
});
my @res1 = $hobo1->result(); # ( foo, bar, baz )
my $res1 = $hobo1->result(); # baz
my $hobo2 = MCE::Hobo->create( sub {
return 'foo';
});
my @res2 = $hobo2->result(); # ( foo )
my $res2 = $hobo2->result(); # foo
- MCE::Hobo->self()
- Class method that allows a hobo to obtain it's own MCE::Hobo
object.
- $hobo->pid()
- $hobo->tid()
- Returns the ID of the hobo.
pid: $$ process id
tid: $$ alias for pid
- MCE::Hobo->pid()
- MCE::Hobo->tid()
- Class methods that allows a hobo to obtain its own ID.
pid: $$ process id
tid: $$ alias for pid
- MCE::Hobo->wait_one()
- MCE::Hobo->waitone()
- MCE::Hobo->wait_all()
- MCE::Hobo->waitall()
- Meaningful for the manager process only, waits for one or all hobo
processes to complete execution. Afterwards, returns the corresponding
hobo objects. If a hobo doesn't exist, returns the
"undef" value or an empty list for
"wait_one" and
"wait_all" respectively.
The "waitone" and
"waitall" methods are aliases since
1.827 for backwards compatibility.
use MCE::Hobo;
use Time::HiRes qw(sleep);
sub task {
my $id = shift;
sleep $id * 0.333;
return $id;
}
MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
# join, traditional use case
$_->join() for MCE::Hobo->list();
# wait_one, simplistic use case
1 while MCE::Hobo->wait_one();
# wait_one
while ( my $hobo = MCE::Hobo->wait_one() ) {
my $err = $hobo->error() || 'no error';
my $res = $hobo->result();
my $pid = $hobo->pid();
print "[$pid] $err : $res\n";
}
# wait_all
my @hobos = MCE::Hobo->wait_all();
for ( @hobos ) {
my $err = $_->error() || 'no error';
my $res = $_->result();
my $pid = $_->pid();
print "[$pid] $err : $res\n";
}
- MCE::Hobo->yield( [ floating_seconds ] )
- Prior API till 1.826.
Let this hobo yield CPU time to other workers. By default, the
class method calls "sleep(0.008)" on
UNIX and "sleep(0.015)" on Windows
including Cygwin.
MCE::Hobo->yield();
MCE::Hobo->yield(0.05);
# total run time: 0.25 seconds, sleep occuring in parallel
MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
MCE::Hobo->wait_all();
Current API available since 1.827.
Give other workers a chance to run, optionally for given time.
Yield behaves similarly to MCE's interval option. It throttles workers
from running too fast. A demonstration is provided in the next section
for fetching URLs in parallel.
The default
"floating_seconds" is 0.008 and 0.015
on UNIX and Windows, respectively. Pass 0 if simply wanting to give
other workers a chance to run.
# total run time: 1.00 second
MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
MCE::Hobo->wait_all();
Threads-like detach capability was added starting with the 1.867 release.
A threads example is shown first followed by the MCE::Hobo
example. All one needs to do is set the CHLD signal handler to IGNORE.
Unfortunately, this works on UNIX platforms only. The hobo process restores
the CHLD handler to default, so is able to deeply spin workers and reap if
desired.
use threads;
for ( 1 .. 8 ) {
async {
# do something
}->detach;
}
use MCE::Hobo;
# Have the OS reap workers automatically when exiting.
# The on_finish option is ignored if specified (no-op).
# Ensure not inside a thread on UNIX platforms.
$SIG{CHLD} = 'IGNORE';
for ( 1 .. 8 ) {
mce_async {
# do something
};
}
# Optionally, wait for any remaining workers before leaving.
# This is necessary if workers are consuming shared objects,
# constructed via MCE::Shared.
MCE::Hobo->wait_all;
The following is another way and works on Windows. Here, the
on_finish handler works as usual.
use MCE::Hobo;
MCE::Hobo->init(
on_finish = sub {
...
},
);
for ( 1 .. 8 ) {
$_->join for MCE::Hobo->list_joinable;
mce_async {
# do something
};
}
MCE::Hobo->wait_all;
MCE::Hobo behaves similarly to threads for the most part. It also provides
Parallel::ForkManager-like capabilities. The
"Parallel::ForkManager" example is shown
first followed by a version using
"MCE::Hobo".
- Parallel::ForkManager
-
use strict;
use warnings;
use Parallel::ForkManager;
use Time::HiRes 'time';
my $start = time;
my $pm = Parallel::ForkManager->new(10);
$pm->set_waitpid_blocking_sleep(0);
$pm->run_on_finish( sub {
my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
print "child $pid completed: $ident => ", $resp->[0], "\n";
});
DATA_LOOP:
foreach my $data ( 1..2000 ) {
# forks and returns the pid for the child
my $pid = $pm->start($data) and next DATA_LOOP;
my $ret = [ $data * 2 ];
$pm->finish(0, $ret);
}
$pm->wait_all_children;
printf STDERR "duration: %0.03f seconds\n", time - $start;
- MCE::Hobo
-
use strict;
use warnings;
use MCE::Hobo 1.843;
use Time::HiRes 'time';
my $start = time;
MCE::Hobo->init(
max_workers => 10,
on_finish => sub {
my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
print "child $pid completed: $ident => ", $resp->[0], "\n";
}
);
foreach my $data ( 1..2000 ) {
MCE::Hobo->create( $data, sub {
[ $data * 2 ];
});
}
MCE::Hobo->wait_all;
printf STDERR "duration: %0.03f seconds\n", time - $start;
- Time to spin 2,000 workers and obtain results (in seconds).
- Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo
Boost). Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again with
Moo loaded at the top of the script.
MCE::Hobo uses MCE::Shared to retrieve data during reaping.
MCE::Child uses MCE::Channel, no shared-manager.
Version Cygwin Windows Linux macOS FreeBSD
MCE::Child 1.843 19.099s 17.091s 0.965s 1.534s 1.229s
MCE::Hobo 1.843 20.514s 19.594s 1.246s 1.629s 1.613s
P::FM 1.20 19.703s 19.235s 0.875s 1.445s 1.346s
MCE::Child 1.843 20.426s 18.417s 1.116s 1.632s 1.338s Moo loaded
MCE::Hobo 1.843 21.809s 20.810s 1.407s 1.759s 1.722s Moo loaded
P::FM 2.02 21.668s 25.927s 1.882s 2.612s 2.483s Moo used
- Set posix_exit to avoid all END and destructor processing.
- This is helpful for reducing overhead when workers exit. Ditto if using a
Perl module not parallel safe. The option is ignored on Windows
"$^O eq 'MSWin32'".
MCE::Child->init( posix_exit => 1, ... );
MCE::Hobo->init( posix_exit => 1, ... );
Version Cygwin Windows Linux macOS FreeBSD
MCE::Child 1.843 19.815s ignored 0.824s 1.284s 1.245s Moo loaded
MCE::Hobo 1.843 21.029s ignored 0.953s 1.335s 1.439s Moo loaded
This demonstration constructs two queues, two handles, starts the shared-manager
process if needed, and spawns four workers. For this demonstration, am
chunking 64 URLs per job. In reality, one may run with 200 workers and chunk
300 URLs on a 24-way box.
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# perl demo.pl -- all output
# perl demo.pl >/dev/null -- mngr/hobo output
# perl demo.pl 2>/dev/null -- show results only
#
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
use strict;
use warnings;
use AnyEvent;
use AnyEvent::HTTP;
use Time::HiRes qw( time );
use MCE::Hobo;
use MCE::Shared;
# Construct two queues, input and return.
my $que = MCE::Shared->queue();
my $ret = MCE::Shared->queue();
# Construct shared handles for serializing output from many workers
# writing simultaneously. This prevents garbled output.
mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
# Spawn workers early for minimum memory consumption.
MCE::Hobo->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
# Obtain or generate input data for workers to process.
my ( $count, @urls ) = ( 0 );
push @urls, map { "http://127.0.0.$_/" } 1..254;
push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
while ( @urls ) {
my @chunk = splice(@urls, 0, 64);
$que->enqueue( { ID => ++$count, INPUT => \@chunk } );
}
# So that workers leave the loop after consuming the queue.
$que->end();
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Loop for the manager process. The manager may do other work if
# need be and periodically check $ret->pending() not shown here.
#
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
my $start = time;
printf {$ERR} "Mngr - entering loop\n";
while ( $count ) {
my ( $result, $failed ) = $ret->dequeue( 2 );
# Remove ID from result, so not treated as a URL item.
printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
# Display the URL and the size captured.
foreach my $url ( keys %{ $result } ) {
printf {$OUT} "%s: %d\n", $url, length($result->{$url})
if $result->{$url}; # url has content
}
# Display URLs could not reach.
if ( @{ $failed } ) {
foreach my $url ( @{ $failed } ) {
print {$OUT} "Failed: $url\n";
}
}
# Decrement the count.
$count--;
}
MCE::Hobo->wait_all();
printf {$ERR} "Mngr - exiting loop\n\n";
printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
exit;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Hobo processes enqueue two items ( $result and $failed ) per each
# job for the manager process. Likewise, the manager process dequeues
# two items above. Optionally, hobo processes may include the ID in
# the result.
#
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
sub task {
my ( $id ) = @_;
printf {$ERR} "Hobo $id entering loop\n";
while ( my $job = $que->dequeue() ) {
my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
# Walk URLs, provide a hash and array refs for data.
printf {$ERR} "Hobo $id running job $job->{ID}\n";
walk( $job, $result, $failed );
# Send results to the manager process.
$ret->enqueue( $result, $failed );
}
printf {$ERR} "Hobo $id exiting loop\n";
}
sub walk {
my ( $job, $result, $failed ) = @_;
# Yielding is critical when running an event loop in parallel.
# Not doing so means that the app may reach contention points
# with the firewall and likely impose unnecessary hardship at
# the OS level. The idea here is not to have multiple workers
# initiate HTTP requests to a batch of URLs at the same time.
# Yielding in 1.827+ behaves similarly like scatter to have
# the hobo process run solo for a fraction of time.
MCE::Hobo->yield( 0.03 ); # MCE::Hobo 1.827+
my $cv = AnyEvent->condvar();
# Populate the hash ref for the URLs it could reach.
# Do not mix AnyEvent timeout with hobo timeout.
# Therefore, choose event timeout when available.
foreach my $url ( @{ $job->{INPUT} } ) {
$cv->begin();
http_get $url, timeout => 2, sub {
my ( $data, $headers ) = @_;
$result->{$url} = $data;
$cv->end();
};
}
$cv->recv();
# Populate the array ref for URLs it could not reach.
foreach my $url ( @{ $job->{INPUT} } ) {
push @{ $failed }, $url unless (exists $result->{ $url });
}
return;
}
__END__
$ perl demo.pl
Hobo 1 entering loop
Hobo 2 entering loop
Hobo 3 entering loop
Mngr - entering loop
Hobo 2 running job 2
Hobo 3 running job 3
Hobo 1 running job 1
Hobo 4 entering loop
Hobo 4 running job 4
Hobo 2 running job 5
Mngr - received job 2
Hobo 3 running job 6
Mngr - received job 3
Hobo 1 running job 7
Mngr - received job 1
Hobo 4 running job 8
Mngr - received job 4
http://192.168.0.1/: 3729
Hobo 2 exiting loop
Mngr - received job 5
Hobo 3 exiting loop
Mngr - received job 6
Hobo 1 exiting loop
Mngr - received job 7
Hobo 4 exiting loop
Mngr - received job 8
Mngr - exiting loop
Duration: 4.131 seconds
Making an executable is possible with the PAR::Packer module. On the Windows
platform, threads, threads::shared, and exiting via threads are necessary for
the binary to exit successfully.
# https://metacpan.org/pod/PAR::Packer
# https://metacpan.org/pod/pp
#
# pp -o demo.exe demo.pl
# ./demo.exe
use strict;
use warnings;
use if $^O eq "MSWin32", "threads";
use if $^O eq "MSWin32", "threads::shared";
# Include minimum dependencies for MCE::Hobo.
# Add other modules required by your application here.
use Storable ();
use Time::HiRes ();
# use IO::FDPass (); # optional: for condvar, handle, queue
# use Sereal (); # optional: for faster serialization
use MCE::Hobo;
use MCE::Shared;
# For PAR to work on the Windows platform, one must include manually
# any shared modules used by the application.
# use MCE::Shared::Array; # if using MCE::Shared->array
# use MCE::Shared::Cache; # if using MCE::Shared->cache
# use MCE::Shared::Condvar; # if using MCE::Shared->condvar
# use MCE::Shared::Handle; # if using MCE::Shared->handle, mce_open
# use MCE::Shared::Hash; # if using MCE::Shared->hash
# use MCE::Shared::Minidb; # if using MCE::Shared->minidb
# use MCE::Shared::Ordhash; # if using MCE::Shared->ordhash
# use MCE::Shared::Queue; # if using MCE::Shared->queue
# use MCE::Shared::Scalar; # if using MCE::Shared->scalar
# Et cetera. Only load modules needed for your application.
use MCE::Shared::Sequence; # if using MCE::Shared->sequence
my $seq = MCE::Shared->sequence( 1, 9 );
sub task {
my ( $id ) = @_;
while ( defined ( my $num = $seq->next() ) ) {
print "$id: $num\n";
sleep 1;
}
}
sub main {
MCE::Hobo->new( \&task, $_ ) for 1 .. 3;
MCE::Hobo->wait_all();
}
# Main must run inside a thread on the Windows platform or workers
# will fail duing exiting, causing the exe to crash. The reason is
# that PAR or a dependency isn't multi-process safe.
( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
threads->exit(0) if $INC{"threads.pm"};
The inspiration for "MCE::Hobo" comes from
wanting "threads"-like behavior for
processes. Both can run side-by-side including safe-use by MCE workers.
Likewise, the documentation resembles
"threads".
The inspiration for "wait_all"
and "wait_one" comes from the
"Parallel::WorkUnit" module.
- forks
- forks::BerkeleyDB
- MCE::Child
- Parallel::ForkManager
- Parallel::Loops
- Parallel::Prefork
- Parallel::WorkUnit
- Proc::Fork
- Thread::Tie
- threads
MCE, MCE::Channel, MCE::Shared
Mario E. Roy, <marioeroy AT gmail DOT com>
Visit the GSP FreeBSD Man Page Interface. Output converted with ManDoc. |