Messages queue is now use to execute events

This commit is contained in:
Némunaire 2012-10-04 13:36:25 +02:00
parent 46d2854955
commit 5d2d218708
2 changed files with 78 additions and 39 deletions

32
bot.py
View File

@ -20,7 +20,7 @@ from datetime import datetime
from queue import Queue
import threading
from consumer import Consumer
import consumer
import event
import hooks
from networkbot import NetworkBot
@ -54,9 +54,9 @@ class Bot:
self.hooks_cache = dict()
# Messages to be treated
self.msg_queue = Queue()
self.msg_thrd = list()
self.msg_thrd_size = -1
self.cnsr_queue = Queue()
self.cnsr_thrd = list()
self.cnsr_thrd_size = -1
def add_event(self, evt, eid=None):
@ -122,9 +122,9 @@ class Bot:
while len(self.events)>0 and datetime.now() >= self.events[0].current:
#print ("end timer: while")
evt = self.events.pop(0)
evt.launch_check()
if evt.next is not None:
self.add_event(evt, evt.id)
self.cnsr_queue.put_nowait(consumer.EventConsumer(self, evt))
self.update_consumers()
self.update_timer()
@ -169,6 +169,7 @@ class Bot:
def unload_module(self, name, verb=False):
"""Unload a module"""
if name in self.modules:
print (name)
self.modules[name].save()
if hasattr(self.modules[name], "unload"):
self.modules[name].unload(self)
@ -180,17 +181,22 @@ class Bot:
return True
return False
def update_consumers(self):
"""Launch new consumer thread if necessary"""
if self.cnsr_queue.qsize() > self.cnsr_thrd_size:
c = consumer.Consumer(self)
self.cnsr_thrd.append(c)
c.start()
self.cnsr_thrd_size += 2
def receive_message(self, srv, raw_msg, private=False, data=None):
"""Queued the message for treatment"""
self.msg_queue.put_nowait((srv, raw_msg, datetime.now(), private, data))
#print (raw_msg)
self.cnsr_queue.put_nowait(consumer.MessageConsumer(srv, raw_msg, datetime.now(), private, data))
# Launch a new thread if necessary
if self.msg_queue.qsize() > self.msg_thrd_size:
c = Consumer(self)
self.msg_thrd.append(c)
c.start()
self.msg_thrd_size += 2
self.update_consumers()
def add_networkbot(self, srv, dest, dcc=None):

View File

@ -24,7 +24,63 @@ import sys
from message import Message
from response import Response
class MessageConsumer:
"""Store a message before treating"""
def __init__(self, srv, raw, time, prvt, data):
self.srv = srv
self.raw = raw
self.time = time
self.prvt = prvt
self.data = data
def run(self):
# Create, parse and treat the message
try:
msg = Message(self.srv, self.raw, self.time, self.prvt)
res = msg.treat()
except:
print ("\033[1;31mERROR:\033[0m occurred during the "
"processing of the message: %s" % self.raw)
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value,
exc_traceback)
return
# Send message
if res is not None:
if isinstance(res, list):
for r in res:
if isinstance(r, Response):
self.srv.send_response(r, self.data)
elif isinstance(res, Response):
self.srv.send_response(res, self.data)
# Inform that the message has been treated
self.srv.msg_treated(self.data)
class EventConsumer:
"""Store a event before treating"""
def __init__(self, context, evt, timeout=20):
self.context = context
self.evt = evt
self.timeout = timeout
def run(self):
try:
self.evt.launch_check()
except:
print ("\033[1;31mError:.\033[0 during event end")
exc_type, exc_value, exc_traceback = sys.exc_info()
sys.stderr.write (traceback.format_exception_only(exc_type,
exc_value)[0])
if self.evt.next is not None:
self.context.add_event(self.evt, self.evt.id)
class Consumer(threading.Thread):
"""Dequeue and exec requested action"""
def __init__(self, context):
self.context = context
self.stop = False
@ -33,33 +89,10 @@ class Consumer(threading.Thread):
def run(self):
try:
while not self.stop:
(srv, raw, time, prvt, data) = self.context.msg_queue.get(True, 20)
# Create, parse and treat the message
try:
msg = Message(srv, raw, time, prvt)
res = msg.treat()
except:
print ("\033[1;31mERROR:\033[0m occurred during the "
"processing of the message: %s" % raw)
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value,
exc_traceback)
return
# Send message
if res is not None:
if isinstance(res, list):
for r in res:
if isinstance(r, Response):
srv.send_response(r, data)
elif isinstance(res, Response):
srv.send_response(res, data)
# Inform that the message has been treated
srv.msg_treated(data)
stm = self.context.cnsr_queue.get(True, 20)
stm.run()
except queue.Empty:
pass
finally:
self.context.msg_thrd_size -= 2
self.context.cnsr_thrd_size -= 2