484 lines
9.1 KiB
Perl
484 lines
9.1 KiB
Perl
#! /usr/bin/env perl
|
|
|
|
package Process;
|
|
|
|
use v5.10.1;
|
|
use strict;
|
|
use warnings;
|
|
use Carp;
|
|
use Gearman::Worker;
|
|
use XML::LibXML;
|
|
use XML::SAX::ParserFactory;
|
|
use List::Util "reduce";
|
|
use Scalar::Util qw(looks_like_number);
|
|
|
|
use ACU::Log;
|
|
|
|
use constant COEFF_OLD => 25;
|
|
use constant WAITING_LOAD => 2;
|
|
|
|
open(my $cpuinfo, "<", "/proc/cpuinfo");
|
|
our $nb_cpus = 0;
|
|
$nb_cpus = grep {/^processor\s/} <$cpuinfo>;
|
|
close $cpuinfo;
|
|
|
|
our @servers = ("gearmand-srv:4730");
|
|
|
|
sub add_server
|
|
{
|
|
push @servers, @_;
|
|
}
|
|
|
|
sub set_servers
|
|
{
|
|
@servers = @_;
|
|
}
|
|
|
|
sub check_load ($)
|
|
{
|
|
my $priority = shift;
|
|
my $load = 0;
|
|
|
|
# Get load by parsing uptime command output
|
|
open my $fh, '-|', 'uptime';
|
|
$load = $1 if <$fh> =~ /load average: (.+?),/;
|
|
close $fh;
|
|
|
|
return (($load * 4 / $nb_cpus) < $priority);
|
|
}
|
|
|
|
sub do_work ($$$@)
|
|
{
|
|
my $subref = shift;
|
|
my $given_args = shift;
|
|
my $priority = shift;
|
|
|
|
log DEBUG, "Starting job";
|
|
log TRACE, $_[0]{argref};
|
|
|
|
my $old = 0;
|
|
# Check the load isn't to high for this process
|
|
sleep WAITING_LOAD while ! check_load ($priority + (++$old / COEFF_OLD));
|
|
|
|
# Parse arguments
|
|
my $args = {
|
|
id => undef,
|
|
priority => 10,
|
|
auth => undef,
|
|
param => {},
|
|
unamed => 0,
|
|
files => {},
|
|
subtree => undef
|
|
};
|
|
|
|
my $sax_handler = ProcessHandler->new($args);
|
|
my $parser = XML::SAX::ParserFactory->parser( Handler => $sax_handler );
|
|
|
|
eval {
|
|
$parser->parse_string(${ $_[0]{argref} });
|
|
};
|
|
if ($@) {
|
|
my $err = "Parse error: $@";
|
|
log ERROR, $err;
|
|
return $err;
|
|
}
|
|
|
|
my $ret = "";
|
|
eval {
|
|
$SIG{'__WARN__'} = sub { log WARN, $_[0]; $ret .= ">>> ".$_[0 ]; };
|
|
|
|
$ret .= $subref->($given_args, $args);
|
|
};
|
|
if ($@) {
|
|
my $err = $@;
|
|
log ERROR, $err;
|
|
$ret .= $err;
|
|
}
|
|
|
|
return $ret;
|
|
}
|
|
|
|
sub register_no_parse ($$;$)
|
|
{
|
|
my $funcname = shift;
|
|
my $subref = shift;
|
|
my $given_arg = shift;
|
|
|
|
my $worker = Gearman::Worker->new;
|
|
|
|
log INFO, "Registering function $funcname on ", join(", ", @servers);
|
|
|
|
$worker->job_servers( @servers );
|
|
$worker->register_function($funcname => sub
|
|
{
|
|
my $ret;
|
|
eval {
|
|
$ret = $subref->($given_arg, $_[0]{argref});
|
|
};
|
|
if ($@) {
|
|
my $err = $@;
|
|
log ERROR, $err;
|
|
return $err;
|
|
}
|
|
return $ret;
|
|
});
|
|
|
|
# Disable exit on warning or error
|
|
$ACU::Log::fatal_warn = 0;
|
|
$ACU::Log::fatal_error = 0;
|
|
|
|
log INFO, "$funcname registered";
|
|
|
|
$worker->work while 1;
|
|
}
|
|
|
|
sub register ($$;$$)
|
|
{
|
|
my $funcname = shift;
|
|
my $subref = shift;
|
|
my $given_arg = shift;
|
|
my $priority = shift // 1;
|
|
|
|
my $worker = Gearman::Worker->new;
|
|
|
|
log INFO, "Registering function $funcname on ", join(", ", @servers);
|
|
|
|
$worker->job_servers( @servers );
|
|
$worker->register_function($funcname => sub { return do_work($subref, $given_arg, $priority, @_); });
|
|
|
|
# Disable exit on warning or error
|
|
$ACU::Log::fatal_warn = 0;
|
|
$ACU::Log::fatal_error = 0;
|
|
|
|
log INFO, "$funcname registered";
|
|
|
|
$worker->work while 1;
|
|
}
|
|
|
|
package Process::Client;
|
|
|
|
use v5.10.1;
|
|
use strict;
|
|
use warnings;
|
|
use Carp;
|
|
use Gearman::Client;
|
|
use MIME::Base64;
|
|
use XML::LibXML;
|
|
|
|
use ACU::Log;
|
|
|
|
sub build_task_xml($;$)
|
|
{
|
|
my $params = shift;
|
|
my $files = shift;
|
|
|
|
my $doc = XML::LibXML::Document->new('1.0');
|
|
my $root = $doc->createElement("process");
|
|
$doc->setDocumentElement( $root );
|
|
|
|
log TRACE, $params;
|
|
|
|
for my $key (keys %{ $params })
|
|
{
|
|
my $param = $doc->createElement("param");
|
|
if ($key !~ /^__[0-9]+$/) {
|
|
$param->addChild( $doc->createAttribute("name", $key) );
|
|
}
|
|
$param->appendText($params->{$key});
|
|
$root->appendChild($param);
|
|
}
|
|
|
|
if ($files)
|
|
{
|
|
log TRACE, $files;
|
|
|
|
for my $key (keys %{ $files })
|
|
{
|
|
my $file = $doc->createElement("file");
|
|
$file->addChild( $doc->createAttribute("name", $key) );
|
|
$file->addChild( $doc->createAttribute("encoding", "base64") );
|
|
$file->appendText(encode_base64($files->{$key}));
|
|
$root->appendChild($file);
|
|
}
|
|
}
|
|
|
|
my $ret = $doc->toString();
|
|
log TRACE, $ret;
|
|
return $ret;
|
|
}
|
|
|
|
sub launch ($$;$$)
|
|
{
|
|
my $funcname = shift;
|
|
|
|
my $client = Gearman::Client->new;
|
|
$client->job_servers( @servers );
|
|
|
|
log DEBUG, "Launching $funcname...";
|
|
|
|
my $xml = build_task_xml(shift, shift);
|
|
|
|
if (shift) {
|
|
return $client->dispatch_background($funcname, $xml);
|
|
}
|
|
else {
|
|
return $client->do_task($funcname, $xml);
|
|
}
|
|
}
|
|
|
|
sub paralaunch ($$;$)
|
|
{
|
|
my $funcsname = shift;
|
|
log TRACE, $funcsname;
|
|
|
|
# Build XML
|
|
my $xml = build_task_xml(shift, shift);
|
|
|
|
my $client = Gearman::Client->new;
|
|
$client->job_servers( @servers );
|
|
|
|
my $taskset = $client->new_task_set;
|
|
for my $task (@{ $funcsname })
|
|
{
|
|
log DEBUG, "Launching $task...";
|
|
|
|
$taskset->add_task($task => $xml);
|
|
}
|
|
$taskset->wait;
|
|
|
|
return "Ok";
|
|
}
|
|
|
|
package SubtreeItem;
|
|
|
|
sub new ($$)
|
|
{
|
|
my $class = shift;
|
|
my $self = {
|
|
nodeName => shift,
|
|
attributes => {},
|
|
nodeValue => "",
|
|
children => []
|
|
};
|
|
|
|
bless $self, $class;
|
|
|
|
return $self;
|
|
}
|
|
|
|
sub getAttribute ($$)
|
|
{
|
|
my ($self, $name) = @_;
|
|
|
|
return $self->{attributes}->{$name};
|
|
}
|
|
|
|
sub hasAttribute ($$)
|
|
{
|
|
my ($self, $name) = @_;
|
|
|
|
return exists $self->{attributes}->{$name};
|
|
}
|
|
|
|
sub getAllChildren($)
|
|
{
|
|
my $self = shift;
|
|
|
|
my @queue;
|
|
my @elements;
|
|
|
|
for my $child (@{ $self->{children} }) {
|
|
push @queue, $child;
|
|
}
|
|
|
|
while (@queue) {
|
|
my $child = shift @queue;
|
|
|
|
for my $child2 (@{ $child->{children} }) {
|
|
push @queue, $child2;
|
|
}
|
|
|
|
push @elements, $child;
|
|
}
|
|
|
|
return @elements;
|
|
}
|
|
|
|
sub hasChildNodes ($)
|
|
{
|
|
my $self = shift;
|
|
|
|
return @{ $self->{children} } > 0;
|
|
}
|
|
|
|
sub getData ($)
|
|
{
|
|
my $self = shift;
|
|
|
|
return $self->nodeValue;
|
|
}
|
|
|
|
sub getElementsByTagName ($$)
|
|
{
|
|
my ($self, $name) = @_;
|
|
|
|
return grep { $name eq "*" or $_->{nodeName} eq $name } getAllChildren($self);
|
|
}
|
|
|
|
sub getElementById ($$)
|
|
{
|
|
my ($self, $name) = @_;
|
|
|
|
return grep { $_->{attributes}->{id} eq $name } getAllChildren($self);
|
|
}
|
|
|
|
sub getFirstChild ($)
|
|
{
|
|
my $self = shift;
|
|
|
|
return $self->{children}[0];
|
|
}
|
|
|
|
sub recreateNode
|
|
{
|
|
my $self = shift;
|
|
my $doc = shift;
|
|
my $parent = shift;
|
|
|
|
my $node = $doc->createElement($self->{nodeName});
|
|
for my $attkey (keys %{ $self->{attributes} })
|
|
{
|
|
$node->addChild( $doc->createAttribute($attkey, $self->{attributes}{ $attkey }) );
|
|
}
|
|
|
|
for my $child (@{ $self->{children} })
|
|
{
|
|
$child->recreateNode($doc, $node);
|
|
}
|
|
|
|
if ($self->{nodeValue}) {
|
|
$node->appendText( $self->{nodeValue} );
|
|
}
|
|
|
|
$parent->appendChild($node);
|
|
}
|
|
|
|
|
|
package ProcessHandler;
|
|
|
|
use Carp;
|
|
|
|
sub new ($$)
|
|
{
|
|
my $class = shift;
|
|
my $self = {
|
|
parsed => shift,
|
|
fileEnc => "",
|
|
inFile => "",
|
|
inParam => "",
|
|
subtreeStack => [],
|
|
values => ""
|
|
};
|
|
|
|
bless $self, $class;
|
|
|
|
return $self;
|
|
}
|
|
|
|
sub start_element
|
|
{
|
|
my ($self, $element) = @_;
|
|
|
|
if (@{ $self->{subtreeStack} } > 0 || $element->{Name} eq "subtree") {
|
|
my $item = SubtreeItem->new($element->{Name});
|
|
|
|
for my $attribute (keys %{ $element->{Attributes} }) {
|
|
my $attr = $attribute;
|
|
$attr =~ s/{}//;
|
|
$item->{attributes}->{$attr} = $element->{Attributes}{$attribute}{Value};
|
|
}
|
|
|
|
if (@{ $self->{subtreeStack} } > 0) {
|
|
$self->{subtreeStack}[-1]->{nodeValue} .= $self->{values};
|
|
}
|
|
push @{ $self->{subtreeStack} }, $item;
|
|
$self->{values} = "";
|
|
}
|
|
elsif ($element->{Name} eq "process") {
|
|
$self->{parsed}{auth} = $element->{Attributes}{"{}auth"}{Value};
|
|
$self->{parsed}{id} = $element->{Attributes}{"{}id"}{Value};
|
|
$self->{parsed}{priority} = $element->{Attributes}{"{}priority"}{Value};
|
|
}
|
|
elsif ($element->{Name} eq "file") {
|
|
$self->{inFile} = $element->{Attributes}{"{}name"}{Value};
|
|
$self->{fileEnc} = $element->{Attributes}{"{}encoding"}{Value} // "base64";
|
|
$self->{values} = "";
|
|
}
|
|
elsif ($element->{Name} eq "param") {
|
|
if ($element->{Attributes}{"{}name"}{Value}) {
|
|
$self->{inParam} = $element->{Attributes}{"{}name"}{Value};
|
|
} else {
|
|
$self->{inParam} = ++$self->{parsed}{unamed};
|
|
}
|
|
|
|
$self->{values} = "";
|
|
}
|
|
}
|
|
|
|
sub characters
|
|
{
|
|
my ($self, $characters) = @_;
|
|
|
|
if ($self->{inFile} || $self->{inParam} || @{ $self->{subtreeStack} } > 0) {
|
|
$self->{values} .= $characters->{Data};
|
|
}
|
|
}
|
|
|
|
sub end_element
|
|
{
|
|
my ($self, $element) = @_;
|
|
|
|
if (@{ $self->{subtreeStack} } > 0)
|
|
{
|
|
my $item = pop @{ $self->{subtreeStack} };
|
|
$item->{nodeValue} .= $self->{values};
|
|
$item->{nodeValue} =~ s/ +/ /g;
|
|
if (@{ $self->{subtreeStack} } > 0) {
|
|
push @{ $self->{subtreeStack}[-1]->{children} }, $item;
|
|
}
|
|
else {
|
|
$self->{parsed}{subtree} = $item;
|
|
}
|
|
$self->{values} = "";
|
|
}
|
|
elsif ($element->{Name} eq "param") {
|
|
$self->{parsed}{param}{ $self->{inParam} } = $self->{values};
|
|
$self->{inParam} = "";
|
|
$self->{values} = "";
|
|
}
|
|
elsif ($element->{Name} eq "file") {
|
|
$self->{parsed}{files}{ $self->{inFile} } = decode_file($self->{values}, $self->{fileEnc});
|
|
$self->{values} = "";
|
|
$self->{inFile} = "";
|
|
$self->{fileEnc} = "";
|
|
}
|
|
}
|
|
|
|
sub decode_file ($$)
|
|
{
|
|
my $content = shift;
|
|
my $encoding = shift;
|
|
|
|
if ($encoding eq "base64")
|
|
{
|
|
use MIME::Base64;
|
|
|
|
return decode_base64($content);
|
|
}
|
|
else {
|
|
croak "$encoding is not a known encoding."
|
|
}
|
|
}
|
|
|
|
1;
|