fix slow consumer

This commit is contained in:
Alexis Delhaie
2020-10-07 18:37:42 +02:00
parent 696818d56a
commit 9954bf6961
4 changed files with 51 additions and 98 deletions

View File

@@ -1,76 +0,0 @@
import threading
import time
import queue
import os
import sys
console = None
def get_available_threads():
if sys.platform == 'win32':
return (int)(os.environ['NUMBER_OF_PROCESSORS'])
else:
return (int)(os.popen('grep -c cores /proc/cpuinfo').read())
SIZE = get_available_threads()
q = queue.Queue()
class ProducerThread(threading.Thread):
def __init__(self, threads):
super(ProducerThread,self).__init__()
self.threads = threads
def run(self):
try:
i = 0
while i < len(self.threads):
if q.qsize() < SIZE:
q.put(self.threads[i])
i += 1
while q.qsize() > 0:
time.sleep(0.1)
except Exception as e:
console.print("[red]Error[/]: {}".format(e), style="bold")
class ConsumerThread(threading.Thread):
def __init__(self):
super(ConsumerThread,self).__init__()
self.alive = True
def stop(self):
self.alive = False
def run(self):
while self.alive:
try:
if q.qsize() > 0:
t = q.get(False)
t.start()
if t.is_alive():
t.join(t.timeout)
except queue.Empty as e:
pass
except Exception as e:
console.print("[orange3]Warning[/]: {}".format(e), style="bold")
def start(threads):
process(threads, get_available_threads())
def start_custom_limit(threads, limit):
process(threads, limit)
def process(threads, limit):
p = ProducerThread(threads)
p.start()
consumers = []
for _ in range(limit):
c = ConsumerThread()
consumers.append(c)
c.start()
p.join()
for c in consumers:
c.stop()
for c in consumers:
if c.is_alive():
c.join()

32
src/core.py Normal file
View File

@@ -0,0 +1,32 @@
import os
import sys
def get_available_threads():
if sys.platform == 'win32':
return (int)(os.environ['NUMBER_OF_PROCESSORS'])
else:
return (int)(os.popen('grep -c cores /proc/cpuinfo').read())
def start_one_by_one(threads):
for t in threads:
t.run()
def start(threads):
process(threads, get_available_threads())
def start_custom_limit(threads, limit):
process(threads, limit)
def process(threads, limit):
started = []
for t in threads:
t.start()
started.append(t)
if len(started) >= limit:
for s in started:
if s.is_alive():
s.join()
started.clear()
for s in started:
if s.is_alive():
s.join()

View File

@@ -1,21 +1,22 @@
#!/usr/bin/python3
import os
import platform
import sys
import time
import platform
import os
from rich import box
from rich.console import Console
from rich.table import Table
from thread import StressThread
from parameters import Options
import core
import thread
import basic_producer_consumer
from parameters import Options
from thread import StressThread
console = Console(highlight=False)
VERSION = "1.4.1"
VERSION = "1.4.2"
thread.console = console
basic_producer_consumer.console = console
def main():
if ((len(sys.argv) >= 2) and (("--help" in sys.argv) or ("/?" in sys.argv))):
@@ -54,18 +55,13 @@ def start(options):
for i in range(0, options.request_number):
thread_array.append(StressThread(options, i, VERSION))
if options.one_by_one:
start_one_by_one(thread_array)
core.start_one_by_one(thread_array)
elif options.no_limit:
basic_producer_consumer.start_custom_limit(thread_array, options.limit)
core.start_custom_limit(thread_array, options.limit)
else:
basic_producer_consumer.start(thread_array)
core.start(thread_array)
show_stat(thread_array, (options.timeout * 1000))
# Run requests in one thread
def start_one_by_one(threads):
for t in threads:
t.run()
def show_stat(tArray, timeoutInMs):
total, succeeded = [0, 0]
Tmax, Tmin, Tavg = [0, timeoutInMs, 0]

View File

@@ -1,10 +1,11 @@
import threading
import http.client
import platform
import time
import os
import ssl
import encodings.idna
import http.client
import os
import platform
import ssl
import threading
import time
from rich.console import Console
console = None