epita-std
/
ACU
Archived
1
0
Fork 0
This repository has been archived on 2021-10-08. You can view files and clone it, but cannot push or open issues or pull requests.
ACU/ACU/Process.pm

484 lines
9.1 KiB
Perl

#! /usr/bin/env perl
package Process;
use v5.10.1;
use strict;
use warnings;
use Carp;
use Gearman::Worker;
use XML::LibXML;
use XML::SAX::ParserFactory;
use List::Util "reduce";
use Scalar::Util qw(looks_like_number);
use ACU::Log;
use constant COEFF_OLD => 25;
use constant WAITING_LOAD => 2;
open(my $cpuinfo, "<", "/proc/cpuinfo");
our $nb_cpus = 0;
$nb_cpus = grep {/^processor\s/} <$cpuinfo>;
close $cpuinfo;
our @servers = ("gearmand-srv:4730");
sub add_server
{
push @servers, @_;
}
sub set_servers
{
@servers = @_;
}
sub check_load ($)
{
my $priority = shift;
my $load = 0;
# Get load by parsing uptime command output
open my $fh, '-|', 'uptime';
$load = $1 if <$fh> =~ /load average: (.+?),/;
close $fh;
return (($load * 4 / $nb_cpus) < $priority);
}
sub do_work ($$$@)
{
my $subref = shift;
my $given_args = shift;
my $priority = shift;
log DEBUG, "Starting job";
log TRACE, $_[0]{argref};
my $old = 0;
# Check the load isn't to high for this process
sleep WAITING_LOAD while ! check_load ($priority + (++$old / COEFF_OLD));
# Parse arguments
my $args = {
id => undef,
priority => 10,
auth => undef,
param => {},
unamed => 0,
files => {},
subtree => undef
};
my $sax_handler = ProcessHandler->new($args);
my $parser = XML::SAX::ParserFactory->parser( Handler => $sax_handler );
eval {
$parser->parse_string(${ $_[0]{argref} });
};
if ($@) {
my $err = "Parse error: $@";
log ERROR, $err;
return $err;
}
my $ret = "";
eval {
$SIG{'__WARN__'} = sub { log WARN, $_[0]; $ret .= ">>> ".$_[0 ]; };
$ret .= $subref->($given_args, $args);
};
if ($@) {
my $err = $@;
log ERROR, $err;
$ret .= $err;
}
return $ret;
}
sub register_no_parse ($$;$)
{
my $funcname = shift;
my $subref = shift;
my $given_arg = shift;
my $worker = Gearman::Worker->new;
log INFO, "Registering function $funcname on ", join(", ", @servers);
$worker->job_servers( @servers );
$worker->register_function($funcname => sub
{
my $ret;
eval {
$ret = $subref->($given_arg, $_[0]{argref});
};
if ($@) {
my $err = $@;
log ERROR, $err;
return $err;
}
return $ret;
});
# Disable exit on warning or error
$ACU::Log::fatal_warn = 0;
$ACU::Log::fatal_error = 0;
log INFO, "$funcname registered";
$worker->work while 1;
}
sub register ($$;$$)
{
my $funcname = shift;
my $subref = shift;
my $given_arg = shift;
my $priority = shift // 1;
my $worker = Gearman::Worker->new;
log INFO, "Registering function $funcname on ", join(", ", @servers);
$worker->job_servers( @servers );
$worker->register_function($funcname => sub { return do_work($subref, $given_arg, $priority, @_); });
# Disable exit on warning or error
$ACU::Log::fatal_warn = 0;
$ACU::Log::fatal_error = 0;
log INFO, "$funcname registered";
$worker->work while 1;
}
package Process::Client;
use v5.10.1;
use strict;
use warnings;
use Carp;
use Gearman::Client;
use MIME::Base64;
use XML::LibXML;
use ACU::Log;
sub build_task_xml($;$)
{
my $params = shift;
my $files = shift;
my $doc = XML::LibXML::Document->new('1.0');
my $root = $doc->createElement("process");
$doc->setDocumentElement( $root );
log TRACE, $params;
for my $key (keys %{ $params })
{
my $param = $doc->createElement("param");
if ($key !~ /^__[0-9]+$/) {
$param->addChild( $doc->createAttribute("name", $key) );
}
$param->appendText($params->{$key});
$root->appendChild($param);
}
if ($files)
{
log TRACE, $files;
for my $key (keys %{ $files })
{
my $file = $doc->createElement("file");
$file->addChild( $doc->createAttribute("name", $key) );
$file->addChild( $doc->createAttribute("encoding", "base64") );
$file->appendText(encode_base64($files->{$key}));
$root->appendChild($file);
}
}
my $ret = $doc->toString();
log TRACE, $ret;
return $ret;
}
sub launch ($$;$$)
{
my $funcname = shift;
my $client = Gearman::Client->new;
$client->job_servers( @servers );
log DEBUG, "Launching $funcname...";
my $xml = build_task_xml(shift, shift);
if (shift) {
return $client->dispatch_background($funcname, $xml);
}
else {
return $client->do_task($funcname, $xml);
}
}
sub paralaunch ($$;$)
{
my $funcsname = shift;
log TRACE, $funcsname;
# Build XML
my $xml = build_task_xml(shift, shift);
my $client = Gearman::Client->new;
$client->job_servers( @servers );
my $taskset = $client->new_task_set;
for my $task (@{ $funcsname })
{
log DEBUG, "Launching $task...";
$taskset->add_task($task => $xml);
}
$taskset->wait;
return "Ok";
}
package SubtreeItem;
sub new ($$)
{
my $class = shift;
my $self = {
nodeName => shift,
attributes => {},
nodeValue => "",
children => []
};
bless $self, $class;
return $self;
}
sub getAttribute ($$)
{
my ($self, $name) = @_;
return $self->{attributes}->{$name};
}
sub hasAttribute ($$)
{
my ($self, $name) = @_;
return exists $self->{attributes}->{$name};
}
sub getAllChildren($)
{
my $self = shift;
my @queue;
my @elements;
for my $child (@{ $self->{children} }) {
push @queue, $child;
}
while (@queue) {
my $child = shift @queue;
for my $child2 (@{ $child->{children} }) {
push @queue, $child2;
}
push @elements, $child;
}
return @elements;
}
sub hasChildNodes ($)
{
my $self = shift;
return @{ $self->{children} } > 0;
}
sub getData ($)
{
my $self = shift;
return $self->nodeValue;
}
sub getElementsByTagName ($$)
{
my ($self, $name) = @_;
return grep { $name eq "*" or $_->{nodeName} eq $name } getAllChildren($self);
}
sub getElementById ($$)
{
my ($self, $name) = @_;
return grep { $_->{attributes}->{id} eq $name } getAllChildren($self);
}
sub getFirstChild ($)
{
my $self = shift;
return $self->{children}[0];
}
sub recreateNode
{
my $self = shift;
my $doc = shift;
my $parent = shift;
my $node = $doc->createElement($self->{nodeName});
for my $attkey (keys %{ $self->{attributes} })
{
$node->addChild( $doc->createAttribute($attkey, $self->{attributes}{ $attkey }) );
}
for my $child (@{ $self->{children} })
{
$child->recreateNode($doc, $node);
}
if ($self->{nodeValue}) {
$node->appendText( $self->{nodeValue} );
}
$parent->appendChild($node);
}
package ProcessHandler;
use Carp;
sub new ($$)
{
my $class = shift;
my $self = {
parsed => shift,
fileEnc => "",
inFile => "",
inParam => "",
subtreeStack => [],
values => ""
};
bless $self, $class;
return $self;
}
sub start_element
{
my ($self, $element) = @_;
if (@{ $self->{subtreeStack} } > 0 || $element->{Name} eq "subtree") {
my $item = SubtreeItem->new($element->{Name});
for my $attribute (keys %{ $element->{Attributes} }) {
my $attr = $attribute;
$attr =~ s/{}//;
$item->{attributes}->{$attr} = $element->{Attributes}{$attribute}{Value};
}
if (@{ $self->{subtreeStack} } > 0) {
$self->{subtreeStack}[-1]->{nodeValue} .= $self->{values};
}
push @{ $self->{subtreeStack} }, $item;
$self->{values} = "";
}
elsif ($element->{Name} eq "process") {
$self->{parsed}{auth} = $element->{Attributes}{"{}auth"}{Value};
$self->{parsed}{id} = $element->{Attributes}{"{}id"}{Value};
$self->{parsed}{priority} = $element->{Attributes}{"{}priority"}{Value};
}
elsif ($element->{Name} eq "file") {
$self->{inFile} = $element->{Attributes}{"{}name"}{Value};
$self->{fileEnc} = $element->{Attributes}{"{}encoding"}{Value} // "base64";
$self->{values} = "";
}
elsif ($element->{Name} eq "param") {
if ($element->{Attributes}{"{}name"}{Value}) {
$self->{inParam} = $element->{Attributes}{"{}name"}{Value};
} else {
$self->{inParam} = ++$self->{parsed}{unamed};
}
$self->{values} = "";
}
}
sub characters
{
my ($self, $characters) = @_;
if ($self->{inFile} || $self->{inParam} || @{ $self->{subtreeStack} } > 0) {
$self->{values} .= $characters->{Data};
}
}
sub end_element
{
my ($self, $element) = @_;
if (@{ $self->{subtreeStack} } > 0)
{
my $item = pop @{ $self->{subtreeStack} };
$item->{nodeValue} .= $self->{values};
$item->{nodeValue} =~ s/ +/ /g;
if (@{ $self->{subtreeStack} } > 0) {
push @{ $self->{subtreeStack}[-1]->{children} }, $item;
}
else {
$self->{parsed}{subtree} = $item;
}
$self->{values} = "";
}
elsif ($element->{Name} eq "param") {
$self->{parsed}{param}{ $self->{inParam} } = $self->{values};
$self->{inParam} = "";
$self->{values} = "";
}
elsif ($element->{Name} eq "file") {
$self->{parsed}{files}{ $self->{inFile} } = decode_file($self->{values}, $self->{fileEnc});
$self->{values} = "";
$self->{inFile} = "";
$self->{fileEnc} = "";
}
}
sub decode_file ($$)
{
my $content = shift;
my $encoding = shift;
if ($encoding eq "base64")
{
use MIME::Base64;
return decode_base64($content);
}
else {
croak "$encoding is not a known encoding."
}
}
1;