Use a queue to treat incoming messages instead of using server thread to answer

This commit is contained in:
Némunaire 2012-08-23 18:20:45 +02:00
parent c92160f041
commit fce491552b
5 changed files with 77 additions and 17 deletions

18
bot.py
View File

@ -17,8 +17,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime from datetime import datetime
from queue import Queue
import threading import threading
from consumer import Consumer
import event import event
import hooks import hooks
from server import Server from server import Server
@ -38,6 +40,10 @@ class Bot:
self.events = list() self.events = list()
self.event_timer = None self.event_timer = None
self.msg_queue = Queue()
self.msg_thrd = list()
self.msg_thrd_size = -1
def add_event(self, evt): def add_event(self, evt):
"""Register an event""" """Register an event"""
@ -131,6 +137,18 @@ class Bot:
return False return False
def receive_message(self, srv, raw_msg, private = False):
"""Queued the message for treatment"""
self.msg_queue.put_nowait((srv, raw_msg, private))
# Launch a new thread if necessary
if self.msg_queue.qsize() > self.msg_thrd_size:
c = Consumer(self)
self.msg_thrd.append(c)
c.start()
self.msg_thrd_size += 2
def quit(self, verb=False): def quit(self, verb=False):
"""Save and unload modules and disconnect servers""" """Save and unload modules and disconnect servers"""
if self.event_timer is not None: if self.event_timer is not None:

49
consumer.py Normal file
View File

@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
# Nemubot is a modulable IRC bot, built around XML configuration files.
# Copyright (C) 2012 Mercier Pierre-Olivier
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import queue
import threading
from message import Message
class Consumer(threading.Thread):
def __init__(self, context):
self.context = context
self.stop = False
threading.Thread.__init__(self)
def run(self):
try:
while not self.stop:
(srv, raw, prvt) = self.context.msg_queue.get(True, 20)
# Create the message
try:
msg = Message(srv, raw, prvt)
msg.treat(self.context.hooks)
except:
print ("\033[1;31mERROR:\033[0m occurred during the "
"processing of the message: %s" % line)
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value,
exc_traceback)
except queue.Empty:
pass
finally:
self.context.msg_thrd_size -= 2

View File

@ -13,10 +13,10 @@ def load(context):
global DATAS global DATAS
DATAS.setIndex("name", "station") DATAS.setIndex("name", "station")
evt = ModuleEvent(station_available, "42706", # evt = ModuleEvent(station_available, "42706",
(lambda a, b: a != b), None, 60, # (lambda a, b: a != b), None, 60,
station_status) # station_status)
context.add_event(evt) # context.add_event(evt)
def help_tiny (): def help_tiny ():
"""Line inserted in the response to the command !help""" """Line inserted in the response to the command !help"""

View File

@ -72,5 +72,5 @@ if __name__ == "__main__":
sys.stderr.write (traceback.format_exception_only(exc_type, sys.stderr.write (traceback.format_exception_only(exc_type,
exc_value)[0]) exc_value)[0])
print ("Bye") print ("\nWaiting for other threads shuts down...")
sys.exit(0) sys.exit(0)

View File

@ -243,14 +243,7 @@ class Server(threading.Thread):
print (" Already connected.") print (" Already connected.")
def treat_msg(self, line, private = False): def treat_msg(self, line, private = False):
try: self.context.receive_message(self, line, private)
msg = message.Message (self, line, private)
msg.treat(self.context.hooks)
except:
print ("\033[1;31mERROR:\033[0m occurred during the processing of"
" the message: %s" % line)
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_traceback)
def run(self): def run(self):
if not self.connected: if not self.connected: