Lock select lists to avoid invalid states (particularly on closing)
This commit is contained in:
parent
ae7526dd96
commit
f66ed07496
@ -136,10 +136,11 @@ class Bot(threading.Thread):
|
||||
|
||||
def run(self):
|
||||
from select import select
|
||||
from nemubot.server import _rlist, _wlist, _xlist
|
||||
from nemubot.server import _lock, _rlist, _wlist, _xlist
|
||||
|
||||
self.stop = False
|
||||
while not self.stop:
|
||||
_lock.acquire()
|
||||
try:
|
||||
rl, wl, xl = select(_rlist, _wlist, _xlist, 0.1)
|
||||
except:
|
||||
@ -164,6 +165,7 @@ class Bot(threading.Thread):
|
||||
if not fnd_smth:
|
||||
logger.exception("Can't continue, sorry")
|
||||
self.quit()
|
||||
_lock.release()
|
||||
continue
|
||||
|
||||
for x in xl:
|
||||
@ -183,6 +185,8 @@ class Bot(threading.Thread):
|
||||
except:
|
||||
logger.exception("Uncatched exception on server read")
|
||||
|
||||
_lock.release()
|
||||
|
||||
# Launch new consumer threads if necessary
|
||||
while self.cnsr_queue.qsize() > self.cnsr_thrd_size:
|
||||
# Next launch if two more items in queue
|
||||
|
@ -231,7 +231,7 @@ class Consumer(threading.Thread):
|
||||
def run(self):
|
||||
try:
|
||||
while not self.stop:
|
||||
stm = self.context.cnsr_queue.get(True, 20)
|
||||
stm = self.context.cnsr_queue.get(True, 10)
|
||||
stm.run(self.context)
|
||||
self.context.cnsr_queue.task_done()
|
||||
|
||||
|
@ -16,6 +16,9 @@
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import threading
|
||||
|
||||
_lock = threading.Lock()
|
||||
|
||||
# Lists for select
|
||||
_rlist = []
|
||||
|
@ -20,7 +20,7 @@ import io
|
||||
import logging
|
||||
import queue
|
||||
|
||||
from nemubot.server import _rlist, _wlist, _xlist
|
||||
from nemubot.server import _lock, _rlist, _wlist, _xlist
|
||||
|
||||
# Extends from IOBase in order to be compatible with select function
|
||||
class AbstractServer(io.IOBase):
|
||||
@ -71,6 +71,7 @@ class AbstractServer(io.IOBase):
|
||||
"""Generic close function that register the server un _{r,w,x}list in
|
||||
case of successful _close"""
|
||||
self.logger.info("Closing connection to %s", self.id)
|
||||
_lock.acquire()
|
||||
if not hasattr(self, "_close") or self._close():
|
||||
if self in _rlist:
|
||||
_rlist.remove(self)
|
||||
@ -78,7 +79,9 @@ class AbstractServer(io.IOBase):
|
||||
_wlist.remove(self)
|
||||
if self in _xlist:
|
||||
_xlist.remove(self)
|
||||
_lock.release()
|
||||
return True
|
||||
_lock.release()
|
||||
return False
|
||||
|
||||
|
||||
|
@ -73,14 +73,19 @@ class SocketServer(AbstractServer):
|
||||
def _close(self):
|
||||
import socket
|
||||
|
||||
from nemubot.server import _lock
|
||||
_lock.release()
|
||||
self._sending_queue.join()
|
||||
_lock.acquire()
|
||||
if self.connected:
|
||||
try:
|
||||
self.socket.shutdown(socket.SHUT_RDWR)
|
||||
self.socket.close()
|
||||
except socket.error:
|
||||
pass
|
||||
|
||||
self.socket = None
|
||||
|
||||
return True
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user