diff --git a/bot.py b/bot.py index 0bdc98d..dd98a2e 100644 --- a/bot.py +++ b/bot.py @@ -20,7 +20,7 @@ from datetime import datetime from queue import Queue import threading -from consumer import Consumer +import consumer import event import hooks from networkbot import NetworkBot @@ -54,9 +54,9 @@ class Bot: self.hooks_cache = dict() # Messages to be treated - self.msg_queue = Queue() - self.msg_thrd = list() - self.msg_thrd_size = -1 + self.cnsr_queue = Queue() + self.cnsr_thrd = list() + self.cnsr_thrd_size = -1 def add_event(self, evt, eid=None): @@ -122,9 +122,9 @@ class Bot: while len(self.events)>0 and datetime.now() >= self.events[0].current: #print ("end timer: while") evt = self.events.pop(0) - evt.launch_check() - if evt.next is not None: - self.add_event(evt, evt.id) + self.cnsr_queue.put_nowait(consumer.EventConsumer(self, evt)) + self.update_consumers() + self.update_timer() @@ -169,6 +169,7 @@ class Bot: def unload_module(self, name, verb=False): """Unload a module""" if name in self.modules: + print (name) self.modules[name].save() if hasattr(self.modules[name], "unload"): self.modules[name].unload(self) @@ -180,17 +181,22 @@ class Bot: return True return False + def update_consumers(self): + """Launch new consumer thread if necessary""" + if self.cnsr_queue.qsize() > self.cnsr_thrd_size: + c = consumer.Consumer(self) + self.cnsr_thrd.append(c) + c.start() + self.cnsr_thrd_size += 2 + def receive_message(self, srv, raw_msg, private=False, data=None): """Queued the message for treatment""" - self.msg_queue.put_nowait((srv, raw_msg, datetime.now(), private, data)) + #print (raw_msg) + self.cnsr_queue.put_nowait(consumer.MessageConsumer(srv, raw_msg, datetime.now(), private, data)) # Launch a new thread if necessary - if self.msg_queue.qsize() > self.msg_thrd_size: - c = Consumer(self) - self.msg_thrd.append(c) - c.start() - self.msg_thrd_size += 2 + self.update_consumers() def add_networkbot(self, srv, dest, dcc=None): diff --git a/consumer.py b/consumer.py index 391e156..0f08727 100644 --- a/consumer.py +++ b/consumer.py @@ -24,7 +24,63 @@ import sys from message import Message from response import Response +class MessageConsumer: + """Store a message before treating""" + def __init__(self, srv, raw, time, prvt, data): + self.srv = srv + self.raw = raw + self.time = time + self.prvt = prvt + self.data = data + + def run(self): + # Create, parse and treat the message + try: + msg = Message(self.srv, self.raw, self.time, self.prvt) + res = msg.treat() + except: + print ("\033[1;31mERROR:\033[0m occurred during the " + "processing of the message: %s" % self.raw) + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_exception(exc_type, exc_value, + exc_traceback) + return + + # Send message + if res is not None: + if isinstance(res, list): + for r in res: + if isinstance(r, Response): + self.srv.send_response(r, self.data) + elif isinstance(res, Response): + self.srv.send_response(res, self.data) + + # Inform that the message has been treated + self.srv.msg_treated(self.data) + + +class EventConsumer: + """Store a event before treating""" + def __init__(self, context, evt, timeout=20): + self.context = context + self.evt = evt + self.timeout = timeout + + def run(self): + try: + self.evt.launch_check() + except: + print ("\033[1;31mError:.\033[0 during event end") + exc_type, exc_value, exc_traceback = sys.exc_info() + sys.stderr.write (traceback.format_exception_only(exc_type, + exc_value)[0]) + if self.evt.next is not None: + self.context.add_event(self.evt, self.evt.id) + + + class Consumer(threading.Thread): + """Dequeue and exec requested action""" def __init__(self, context): self.context = context self.stop = False @@ -33,33 +89,10 @@ class Consumer(threading.Thread): def run(self): try: while not self.stop: - (srv, raw, time, prvt, data) = self.context.msg_queue.get(True, 20) - - # Create, parse and treat the message - try: - msg = Message(srv, raw, time, prvt) - res = msg.treat() - except: - print ("\033[1;31mERROR:\033[0m occurred during the " - "processing of the message: %s" % raw) - exc_type, exc_value, exc_traceback = sys.exc_info() - traceback.print_exception(exc_type, exc_value, - exc_traceback) - return - - # Send message - if res is not None: - if isinstance(res, list): - for r in res: - if isinstance(r, Response): - srv.send_response(r, data) - elif isinstance(res, Response): - srv.send_response(res, data) - - # Inform that the message has been treated - srv.msg_treated(data) + stm = self.context.cnsr_queue.get(True, 20) + stm.run() except queue.Empty: pass finally: - self.context.msg_thrd_size -= 2 + self.context.cnsr_thrd_size -= 2