Start a huge refactor of events

This commit is contained in:
nemunaire 2017-07-20 23:34:20 +02:00 committed by nemunaire
commit 69dcd53937
5 changed files with 85 additions and 177 deletions

View file

@ -26,10 +26,8 @@ def load(context):
for evt in context.data.index.keys():
if context.data.index[evt].hasAttribute("end"):
event = ModuleEvent(call=fini, call_data=dict(strend=context.data.index[evt]))
event._end = context.data.index[evt].getDate("end")
idt = context.add_event(event)
if idt is not None:
context.data.index[evt]["_id"] = idt
event.schedule(context.data.index[evt].getDate("end"))
context.add_event(event)
def fini(d, strend):
@ -100,8 +98,8 @@ def start_countdown(msg):
strnd["end"] = datetime(now.year, now.month, now.day, hou, minu, sec, timezone.utc)
else:
strnd["end"] = datetime(now.year, now.month, now.day + 1, hou, minu, sec, timezone.utc)
evt._end = strnd.getDate("end")
strnd["_id"] = context.add_event(evt)
evt.schedule(strnd.getDate("end"))
context.add_event(evt)
except:
context.data.delChild(strnd)
raise IMException("Mauvais format de date pour l'événement %s. Il n'a pas été créé." % msg.args[0])
@ -121,10 +119,8 @@ def start_countdown(msg):
strnd["end"] += timedelta(days=int(t)*365)
else:
strnd["end"] += timedelta(seconds=int(t))
evt._end = strnd.getDate("end")
eid = context.add_event(evt)
if eid is not None:
strnd["_id"] = eid
evt.schedule(strnd.getDate("end"))
context.add_event(evt)
context.save()
if "end" in strnd:
@ -147,7 +143,7 @@ def end_countdown(msg):
if msg.args[0] in context.data.index:
if context.data.index[msg.args[0]]["proprio"] == msg.frm or (msg.cmd == "forceend" and msg.frm_owner):
duration = countdown(msg.date - context.data.index[msg.args[0]].getDate("start"))
context.del_event(context.data.index[msg.args[0]]["_id"])
context.del_event(context.data.index[msg.args[0]])
context.data.delChild(context.data.index[msg.args[0]])
context.save()
return Response("%s a duré %s." % (msg.args[0], duration),

View file

@ -19,6 +19,7 @@ from datetime import datetime, timezone
import logging
from multiprocessing import JoinableQueue
import threading
import traceback
import select
import sys
import weakref
@ -78,10 +79,6 @@ class Bot(threading.Thread):
self.modules = dict()
self.modules_configuration = dict()
# Events
self.events = list()
self.event_timer = None
# Own hooks
from nemubot.treatment import MessageTreater
self.treater = MessageTreater()
@ -245,7 +242,7 @@ class Bot(threading.Thread):
# Events methods
def add_event(self, evt, eid=None, module_src=None):
def add_event(self, evt):
"""Register an event and return its identifiant for futur update
Return:
@ -254,129 +251,31 @@ class Bot(threading.Thread):
Argument:
evt -- The event object to add
Keyword arguments:
eid -- The desired event ID (object or string UUID)
module_src -- The module to which the event is attached to
"""
if hasattr(self, "stop") and self.stop:
logger.warn("The bot is stopped, can't register new events")
return
if hasattr(evt, "handle") and evt.handle is not None:
raise Exception("Try to launch an already launched event.")
import uuid
def _end_event_timer(event):
"""Function called at the end of the event timer"""
# Generate the event id if no given
if eid is None:
eid = uuid.uuid1()
# Fill the id field of the event
if type(eid) is uuid.UUID:
evt.id = str(eid)
else:
# Ok, this is quiet useless...
try:
evt.id = str(uuid.UUID(eid))
except ValueError:
evt.id = eid
# TODO: mutex here plz
# Add the event in its place
t = evt.current
i = 0 # sentinel
for i in range(0, len(self.events)):
if self.events[i].current > t:
break
self.events.insert(i, evt)
if i == 0:
# First event changed, reset timer
self._update_event_timer()
if len(self.events) <= 0 or self.events[i] != evt:
# Our event has been executed and removed from queue
return None
# Register the event in the source module
if module_src is not None:
module_src.__nemubot_context__.events.append(evt.id)
evt.module_src = module_src
logger.info("New event registered in %d position: %s", i, t)
return evt.id
def del_event(self, evt, module_src=None):
"""Find and remove an event from list
Return:
True if the event has been found and removed, False else
Argument:
evt -- The ModuleEvent object to remove or just the event identifier
Keyword arguments:
module_src -- The module to which the event is attached to (ignored if evt is a ModuleEvent)
"""
logger.info("Removing event: %s from %s", evt, module_src)
from nemubot.event import ModuleEvent
if type(evt) is ModuleEvent:
id = evt.id
module_src = evt.module_src
else:
id = evt
if len(self.events) > 0 and id == self.events[0].id:
self.events.remove(self.events[0])
self._update_event_timer()
if module_src is not None:
module_src.__nemubot_context__.events.remove(id)
return True
for evt in self.events:
if evt.id == id:
self.events.remove(evt)
if module_src is not None:
module_src.__nemubot_context__.events.remove(evt.id)
return True
return False
def _update_event_timer(self):
"""(Re)launch the timer to end with the closest event"""
# Reset the timer if this is the first item
if self.event_timer is not None:
self.event_timer.cancel()
if len(self.events):
try:
remaining = self.events[0].time_left.total_seconds()
except:
logger.exception("An error occurs during event time calculation:")
self.events.pop(0)
return self._update_event_timer()
logger.debug("Update timer: next event in %d seconds", remaining)
self.event_timer = threading.Timer(remaining if remaining > 0 else 0, self._end_event_timer)
self.event_timer.start()
else:
logger.debug("Update timer: no timer left")
def _end_event_timer(self):
"""Function called at the end of the event timer"""
while len(self.events) > 0 and datetime.now(timezone.utc) >= self.events[0].current:
evt = self.events.pop(0)
self.cnsr_queue.put_nowait(EventConsumer(evt))
logger.debug("Trigering event")
event.handle = None
self.cnsr_queue.put_nowait(EventConsumer(event))
sync_act("launch_consumer")
self._update_event_timer()
evt.start(self.loop)
@asyncio.coroutine
def _add_event():
return self.loop.call_at(evt._next, _end_event_timer, evt)
future = asyncio.run_coroutine_threadsafe(_add_event(), loop=self.loop)
evt.handle = future.result()
logger.debug("New event registered in %ss", evt._next - self.loop.time())
return evt.handle
# Consumers methods
@ -509,10 +408,6 @@ class Bot(threading.Thread):
def quit(self):
"""Save and unload modules and disconnect servers"""
if self.event_timer is not None:
logger.info("Stop the event timer...")
self.event_timer.cancel()
logger.info("Save and unload all modules...")
for mod in [m for m in self.modules.keys()]:
self.unload_module(mod)

View file

@ -88,13 +88,8 @@ class EventConsumer:
logger.exception("Error during event end")
# Reappend the event in the queue if it has next iteration
if self.evt.next is not None:
context.add_event(self.evt, eid=self.evt.id)
# Or remove reference of this event
elif (hasattr(self.evt, "module_src") and
self.evt.module_src is not None):
self.evt.module_src.__nemubot_context__.events.remove(self.evt.id)
if self.evt.next():
context.add_event(self.evt)

View file

@ -65,37 +65,28 @@ class ModuleEvent:
# Store times
self.offset = timedelta(seconds=offset) # Time to wait before the first check
self.interval = timedelta(seconds=interval)
self._end = None # Cache
self._next = None # Cache
# How many times do this event?
self.times = times
@property
def current(self):
"""Return the date of the near check"""
if self.times != 0:
if self._end is None:
self._end = datetime.now(timezone.utc) + self.offset + self.interval
return self._end
return None
@property
def start(self, loop):
if self._next is None:
self._next = loop.time() + self.offset.total_seconds() + self.interval.total_seconds()
def schedule(self, end):
self.interval = timedelta(seconds=0)
self.offset = end - datetime.now(timezone.utc)
def next(self):
"""Return the date of the next check"""
if self.times != 0:
if self._end is None:
return self.current
elif self._end < datetime.now(timezone.utc):
self._end += self.interval
return self._end
return None
self._next += self.interval.total_seconds()
return True
return False
@property
def time_left(self):
"""Return the time left before/after the near check"""
if self.current is not None:
return self.current - datetime.now(timezone.utc)
return timedelta.max
def check(self):
"""Run a check and realized the event if this is time"""

View file

@ -14,6 +14,19 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
class _FakeHandle:
def __init__(self, true_handle, callback):
self.handle = true_handle
self.callback = callback
def cancel(self):
self.handle.cancel()
if self.callback:
return self.callback()
class _ModuleContext:
def __init__(self, module=None):
@ -24,8 +37,8 @@ class _ModuleContext:
else:
self.module_name = ""
self.hooks = list()
self.events = list()
self.hooks = list()
self.debug = False
from nemubot.config.module import Module
@ -36,6 +49,7 @@ class _ModuleContext:
from nemubot.tools.xmlparser import module_state
return module_state.ModuleState("nemubotstate")
def add_hook(self, hook, *triggers):
from nemubot.hooks import Abstract as AbstractHook
assert isinstance(hook, AbstractHook), hook
@ -46,19 +60,17 @@ class _ModuleContext:
assert isinstance(hook, AbstractHook), hook
self.hooks.remove((triggers, hook))
def subtreat(self, msg):
return None
def add_event(self, evt, eid=None):
return self.events.append((evt, eid))
def add_event(self, evt):
return self.events.append(evt)
def del_event(self, evt):
for i in self.events:
e, eid = i
if e == evt:
self.events.remove(i)
return True
return False
return self.events.remove(evt)
def send_response(self, server, res):
self.module.logger.info("Send response: %s", res)
@ -114,6 +126,10 @@ class ModuleContext(_ModuleContext):
def load_data(self):
return self.context.datastore.load(self.module_name)
def save(self):
self.context.datastore.save(self.module_name, self.data)
def add_hook(self, hook, *triggers):
from nemubot.hooks import Abstract as AbstractHook
assert isinstance(hook, AbstractHook), hook
@ -126,14 +142,29 @@ class ModuleContext(_ModuleContext):
self.hooks.remove((triggers, hook))
return self.context.treater.hm.del_hooks(*triggers, hook=hook)
def subtreat(self, msg):
yield from self.context.treater.treat_msg(msg)
def add_event(self, evt, eid=None):
return self.context.add_event(evt, eid, module_src=self.module)
def add_event(self, evt):
if evt in self.events:
return None
def _cancel_event():
logger.debug("Cancel event")
evt.handle = None
return super().del_event(evt)
hd = self.context.add_event(evt)
evt.handle = _FakeHandle(hd, _cancel_event)
return super().add_event(evt)
def del_event(self, evt):
return self.context.del_event(evt, module_src=self.module)
# Call to super().del_event is done in the _FakeHandle.cancel
return evt.handle.cancel()
def send_response(self, server, res):
if server in self.context.servers: