#!/usr/longitude/bin/perl -w # YAPC::Europe 2001 # "Parallel Processing with Perl and PVM" - Benjamin Holzman: Longitude, Inc. # Listing Two : "web" use strict; use Parallel::Pvm; use sigtrap qw(die normal-signals); use constant SPIDER_INIT => 100; use constant SPIDER_REQUEST => 101; use constant SPIDER_DIGEST => 102; use constant SPIDER_ERROR => 103; use constant SPIDER_DUPLICATE => 104; use constant SPIDER_NEW => 105; use constant SPIDER_URIS => 106; use constant SPIDER_EXIT => 107; use constant SPIDER_TIMINGS => 108; use constant PVM_HOSTFILE => "/var/br/pvm_hosts.dat"; use constant TASKS_PER_NODE => 2; my $base = $ARGV[0] || "http://127.0.0.1/"; my(%visited); # md5(content) -> URI # make sure that pvmd is started. my $info = Parallel::Pvm::start_pvmd(1, PVM_HOSTFILE); die "start_pvmd: $info" if $info && $info != PvmDupHost; ($info, my @nodes) = Parallel::Pvm::config(); die "config: $info" if $info; my $mytid = Parallel::Pvm::mytid(); # spawn TASKS_PER_NODE strand processes per node my($ntask, @tids) = Parallel::Pvm::spawn("strand",TASKS_PER_NODE * @nodes); print STDERR "started $ntask strand processes\n"; Parallel::Pvm::initsend(); Parallel::Pvm::pack($base); Parallel::Pvm::mcast(@tids, SPIDER_INIT); my @uris = ($base); my %tid2uri; my @tasks = @tids; my $done = 0; while (!$done) { # check if we have run out of tasks or work if (!@tasks || !@uris) { my %pending = (); my $bufid; while ($bufid = Parallel::Pvm::trecv(-1, -1, 0, 1000)) { my($status, $bytes, $tag, $tid) = Parallel::Pvm::bufinfo($bufid); my $uri = delete $tid2uri{$tid}; push @tasks, $tid; if ($tag == SPIDER_DIGEST) { my $digest = Parallel::Pvm::unpack(); Parallel::Pvm::initsend(); if (exists $visited{$digest}) { Parallel::Pvm::send($tid, SPIDER_DUPLICATE); } else { $visited{$digest} = $uri; Parallel::Pvm::send($tid, SPIDER_NEW); Parallel::Pvm::recv($tid, SPIDER_URIS); if (my $numURIs = Parallel::Pvm::unpack()) { @pending{Parallel::Pvm::unpack()} = (1)x$numURIs; } } } elsif ($tag == SPIDER_ERROR) { printf STDERR "[t%x]: error retrieving [%s]\n", $tid, $uri; } # handle any messages that are waiting if (Parallel::Pvm::probe(-1, -1)) { $bufid = Parallel::Pvm::recv(-1, -1); redo; } } $done = keys %tid2uri == 0 && keys %pending == 0 && @uris == 0; push @uris, keys %pending; } # now send as many messages as we can my $numMessages = @uris > @tasks ? 0+@tasks : 0+@uris; next unless $numMessages; for (my $i = 0; $i < $numMessages; $i++) { Parallel::Pvm::initsend(); Parallel::Pvm::pack($uris[$i]); Parallel::Pvm::send($tasks[$i], SPIDER_REQUEST); $tid2uri{$tasks[$i]} = $uris[$i]; } splice(@tasks, 0, $numMessages); splice(@uris, 0, $numMessages); } print "$_\n" for sort values %visited; Parallel::Pvm::initsend(); Parallel::Pvm::mcast(@tids, SPIDER_EXIT); for my $tid (@tids) { Parallel::Pvm::recv($tid, SPIDER_TIMINGS); printf "[t%x]: user = %.2f, system = %.2f\n", $tid, Parallel::Pvm::unpack(), Parallel::Pvm::unpack(); } END { Parallel::Pvm::kill($_) for @tids; Parallel::Pvm::exit(); }