144 lines
5.1 KiB
Python
144 lines
5.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Nemubot is a smart and modulable IM bot.
|
|
# Copyright (C) 2012-2015 Mercier Pierre-Olivier
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
|
|
class ModuleEvent:
|
|
|
|
"""Representation of a event initiated by a bot module"""
|
|
|
|
def __init__(self, call=None, call_data=None, func=None, func_data=None,
|
|
cmp=None, cmp_data=None, end_call=None, end_data=None,
|
|
interval=60, offset=0, times=1, max_attempt=-1):
|
|
"""Initialize the event
|
|
|
|
Keyword arguments:
|
|
call -- Function to call when the event is realized
|
|
call_data -- Argument(s) (single or dict) to pass as argument
|
|
func -- Function called to check
|
|
func_data -- Argument(s) (single or dict) to pass as argument OR if no func, initial data to watch
|
|
cmp -- Boolean function called to check changes
|
|
cmp_data -- Argument(s) (single or dict) to pass as argument OR if no cmp, data compared to previous
|
|
end_call -- Function called when times or max_attempt reach 0 (mainly for interaction with the event manager)
|
|
end_data -- Argument(s) (single or dict) to pass as argument
|
|
interval -- Time in seconds between each check (default: 60)
|
|
offset -- Time in seconds added to interval before the first check (default: 0)
|
|
times -- Number of times the event has to be realized before being removed; -1 for no limit (default: 1)
|
|
max_attempt -- Maximum number of times the event will be checked
|
|
"""
|
|
|
|
# What have we to check?
|
|
self.func = func
|
|
self.func_data = func_data
|
|
|
|
# How detect a change?
|
|
self.cmp = cmp
|
|
if cmp_data is not None:
|
|
self.cmp_data = cmp_data
|
|
elif callable(self.func):
|
|
if self.func_data is None:
|
|
self.cmp_data = self.func()
|
|
elif isinstance(self.func_data, dict):
|
|
self.cmp_data = self.func(**self.func_data)
|
|
else:
|
|
self.cmp_data = self.func(self.func_data)
|
|
else:
|
|
self.cmp_data = None
|
|
|
|
# What should we call when?
|
|
self.call = call
|
|
if call_data is not None:
|
|
self.call_data = call_data
|
|
else:
|
|
self.call_data = func_data
|
|
|
|
# Store time between each event
|
|
self.interval = timedelta(seconds=interval)
|
|
# How many times do this event?
|
|
self.times = times
|
|
|
|
# Cache the time of the next occurence
|
|
self.next_occur = datetime.now(timezone.utc) + timedelta(seconds=offset) + self.interval
|
|
|
|
|
|
@property
|
|
def time_left(self):
|
|
"""Return the time left before/after the near check"""
|
|
|
|
return self.next_occur - datetime.now(timezone.utc)
|
|
|
|
|
|
def check(self):
|
|
"""Run a check and realized the event if this is time"""
|
|
|
|
self.max_attempt -= 1
|
|
|
|
# Get initial data
|
|
if not callable(self.func):
|
|
d_init = self.func_data
|
|
elif self.func_data is None:
|
|
d_init = self.func()
|
|
elif isinstance(self.func_data, dict):
|
|
d_init = self.func(**self.func_data)
|
|
else:
|
|
d_init = self.func(self.func_data)
|
|
|
|
# then compare with current data
|
|
if not callable(self.cmp):
|
|
if self.cmp_data is None:
|
|
rlz = True
|
|
else:
|
|
rlz = (d_init != self.cmp_data)
|
|
elif self.cmp_data is None:
|
|
rlz = self.cmp(d_init)
|
|
elif isinstance(self.cmp_data, dict):
|
|
rlz = self.cmp(d_init, **self.cmp_data)
|
|
else:
|
|
rlz = self.cmp(d_init, self.cmp_data)
|
|
|
|
if rlz:
|
|
self.times -= 1
|
|
|
|
# Call attended function
|
|
if self.call_data is None:
|
|
if d_init is None:
|
|
self.call()
|
|
else:
|
|
self.call(d_init)
|
|
elif isinstance(self.call_data, dict):
|
|
self.call(d_init, **self.call_data)
|
|
else:
|
|
self.call(d_init, self.call_data)
|
|
|
|
# Is it finished?
|
|
if self.times == 0 or self.max_attempt == 0:
|
|
if not callable(self.end_call):
|
|
pass # TODO: log a WARN here
|
|
else:
|
|
if self.end_data is None:
|
|
self.end_call()
|
|
elif isinstance(self.end_data, dict):
|
|
self.end_call(**self.end_data)
|
|
else:
|
|
self.end_call(self.end_data)
|
|
|
|
# Not finished, ready to next one!
|
|
else:
|
|
self.next_occur += self.interval
|