diff --git a/bot.py b/bot.py index 01e3130..9e48ce3 100644 --- a/bot.py +++ b/bot.py @@ -17,27 +17,33 @@ # along with this program. If not, see . from datetime import datetime +from queue import Queue import threading +from consumer import Consumer import event import hooks from server import Server class Bot: def __init__(self, servers=dict(), modules=dict(), mp=list()): - self.version = 3.2 + self.version = 3.2 self.version_txt = "3.2" self.servers = servers self.modules = modules self.modules_path = mp - self.datas_path = './datas/' + self.datas_path = './datas/' - self.hooks = hooks.MessagesHook() - self.events = list() + self.hooks = hooks.MessagesHook() + self.events = list() self.event_timer = None + self.msg_queue = Queue() + self.msg_thrd = list() + self.msg_thrd_size = -1 + def add_event(self, evt): """Register an event""" @@ -131,6 +137,18 @@ class Bot: return False + def receive_message(self, srv, raw_msg, private = False): + """Queued the message for treatment""" + self.msg_queue.put_nowait((srv, raw_msg, private)) + + # Launch a new thread if necessary + if self.msg_queue.qsize() > self.msg_thrd_size: + c = Consumer(self) + self.msg_thrd.append(c) + c.start() + self.msg_thrd_size += 2 + + def quit(self, verb=False): """Save and unload modules and disconnect servers""" if self.event_timer is not None: diff --git a/consumer.py b/consumer.py new file mode 100644 index 0000000..98322c2 --- /dev/null +++ b/consumer.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- + +# Nemubot is a modulable IRC bot, built around XML configuration files. +# Copyright (C) 2012 Mercier Pierre-Olivier +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import queue +import threading + +from message import Message + +class Consumer(threading.Thread): + def __init__(self, context): + self.context = context + self.stop = False + threading.Thread.__init__(self) + + def run(self): + try: + while not self.stop: + (srv, raw, prvt) = self.context.msg_queue.get(True, 20) + + # Create the message + try: + msg = Message(srv, raw, prvt) + msg.treat(self.context.hooks) + except: + print ("\033[1;31mERROR:\033[0m occurred during the " + "processing of the message: %s" % line) + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_exception(exc_type, exc_value, + exc_traceback) + + except queue.Empty: + pass + finally: + self.context.msg_thrd_size -= 2 diff --git a/modules/velib.py b/modules/velib.py index 5c79fb3..dfa2530 100644 --- a/modules/velib.py +++ b/modules/velib.py @@ -13,10 +13,10 @@ def load(context): global DATAS DATAS.setIndex("name", "station") - evt = ModuleEvent(station_available, "42706", - (lambda a, b: a != b), None, 60, - station_status) - context.add_event(evt) +# evt = ModuleEvent(station_available, "42706", +# (lambda a, b: a != b), None, 60, +# station_status) +# context.add_event(evt) def help_tiny (): """Line inserted in the response to the command !help""" diff --git a/nemubot.py b/nemubot.py index 4f07b6b..f984e25 100755 --- a/nemubot.py +++ b/nemubot.py @@ -72,5 +72,5 @@ if __name__ == "__main__": sys.stderr.write (traceback.format_exception_only(exc_type, exc_value)[0]) - print ("Bye") + print ("\nWaiting for other threads shuts down...") sys.exit(0) diff --git a/server.py b/server.py index 94d76c1..f01ff74 100644 --- a/server.py +++ b/server.py @@ -243,14 +243,7 @@ class Server(threading.Thread): print (" Already connected.") def treat_msg(self, line, private = False): - try: - msg = message.Message (self, line, private) - msg.treat(self.context.hooks) - except: - print ("\033[1;31mERROR:\033[0m occurred during the processing of" - " the message: %s" % line) - exc_type, exc_value, exc_traceback = sys.exc_info() - traceback.print_exception(exc_type, exc_value, exc_traceback) + self.context.receive_message(self, line, private) def run(self): if not self.connected: