bot: Fix sequential message processing with proper consumer pool
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
Replace the flawed cnsr_thrd_size threshold with cnsr_active, which tracks the number of consumers currently executing a task. A new consumer thread is now spawned the moment the queue is non-empty and all existing consumers are busy, enabling true parallel execution of slow and fast commands. The pool is capped at os.cpu_count() threads. - bot.py: replace cnsr_thrd_size with cnsr_active + cnsr_lock + cnsr_max - consumer.py: increment/decrement cnsr_active around stm.run(), remove itself from cnsr_thrd under the lock, mark thread as daemon Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
310f933091
commit
9d7c278d1a
2 changed files with 36 additions and 22 deletions
|
|
@ -143,11 +143,16 @@ class Bot(threading.Thread):
|
|||
return res
|
||||
self.treater.hm.add_hook(nemubot.hooks.Command(_help_msg, "help"), "in", "Command")
|
||||
|
||||
import os
|
||||
from queue import Queue
|
||||
# Messages to be treated
|
||||
self.cnsr_queue = Queue()
|
||||
self.cnsr_thrd = list()
|
||||
self.cnsr_thrd_size = -1
|
||||
# Messages to be treated — shared across all server connections.
|
||||
# cnsr_active tracks consumers currently inside stm.run() (not idle),
|
||||
# which lets us spawn a new thread the moment all existing ones are busy.
|
||||
self.cnsr_queue = Queue()
|
||||
self.cnsr_thrd = list()
|
||||
self.cnsr_lock = threading.Lock()
|
||||
self.cnsr_active = 0 # consumers currently executing a task
|
||||
self.cnsr_max = os.cpu_count() or 4 # upper bound on concurrent consumer threads
|
||||
|
||||
|
||||
def __del__(self):
|
||||
|
|
@ -234,14 +239,15 @@ class Bot(threading.Thread):
|
|||
sync_queue.task_done()
|
||||
|
||||
|
||||
# Launch new consumer threads if necessary
|
||||
while self.cnsr_queue.qsize() > self.cnsr_thrd_size:
|
||||
# Next launch if two more items in queue
|
||||
self.cnsr_thrd_size += 2
|
||||
|
||||
c = Consumer(self)
|
||||
self.cnsr_thrd.append(c)
|
||||
c.start()
|
||||
# Spawn a new consumer whenever the queue has work and every
|
||||
# existing consumer is already busy executing a task.
|
||||
with self.cnsr_lock:
|
||||
while (not self.cnsr_queue.empty()
|
||||
and self.cnsr_active >= len(self.cnsr_thrd)
|
||||
and len(self.cnsr_thrd) < self.cnsr_max):
|
||||
c = Consumer(self)
|
||||
self.cnsr_thrd.append(c)
|
||||
c.start()
|
||||
sync_queue = None
|
||||
logger.info("Ending main loop")
|
||||
|
||||
|
|
@ -518,7 +524,8 @@ class Bot(threading.Thread):
|
|||
srv.close()
|
||||
|
||||
logger.info("Stop consumers")
|
||||
k = self.cnsr_thrd
|
||||
with self.cnsr_lock:
|
||||
k = list(self.cnsr_thrd)
|
||||
for cnsr in k:
|
||||
cnsr.stop = True
|
||||
|
||||
|
|
|
|||
|
|
@ -105,18 +105,25 @@ class Consumer(threading.Thread):
|
|||
def __init__(self, context):
|
||||
self.context = context
|
||||
self.stop = False
|
||||
super().__init__(name="Nemubot consumer")
|
||||
super().__init__(name="Nemubot consumer", daemon=True)
|
||||
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
while not self.stop:
|
||||
stm = self.context.cnsr_queue.get(True, 1)
|
||||
stm.run(self.context)
|
||||
self.context.cnsr_queue.task_done()
|
||||
try:
|
||||
stm = self.context.cnsr_queue.get(True, 1)
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
except queue.Empty:
|
||||
pass
|
||||
with self.context.cnsr_lock:
|
||||
self.context.cnsr_active += 1
|
||||
try:
|
||||
stm.run(self.context)
|
||||
finally:
|
||||
self.context.cnsr_queue.task_done()
|
||||
with self.context.cnsr_lock:
|
||||
self.context.cnsr_active -= 1
|
||||
finally:
|
||||
self.context.cnsr_thrd_size -= 2
|
||||
self.context.cnsr_thrd.remove(self)
|
||||
with self.context.cnsr_lock:
|
||||
self.context.cnsr_thrd.remove(self)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue