8 de diciembre de 2019

Colas

La clase Queue del módulo multiprocessing nos proporciona una implementación de colas FIFO ("First In, First Out") que podemos usar para intercambiar información entre threads o procesos de forma segura.

El siguiente ejemplo muestra una implementación de cola FIFO en la que almacenamos valores enteros comprendidos entre 1 y 100 y posteriormente los leemos de forma secuencial. Funciona en cualquier versión de Python igual o superior a 3.5

from multiprocessing import Queue
import random

queue_time = Queue()

print("Saving elements at queue...")
for i in range (5):
    random_time = random.randint(1, 100)
    queue_time.put(random_time)
    print("%d added at queue" % random_time)

print("Reading elements from queue...")
while not queue_time.empty():
    time_read = queue_time.get()
    print("%d read from queue" % time_read)

Al ejecutar el código comprobamos que los elementos de la cola se consumen en el mismo orden en el que son salvados:
Saving elements at queue...
48 added at queue
47 added at queue
64 added at queue
10 added at queue
100 added at queue
Reading elements from queue...
48 read from queue
47 read from queue
64 read from queue
10 read from queue
100 read from queue
Queue is empty!!


Combinando lo visto en entradas anteriores (1 y 2) vamos a construir un grupo de threads que van a meter mensajes en la cola (productores) y un grupo de threads que van a sacar mensajes de la cola (consumidores). Los threads impares serán los consumidores y los pares los productores.
Los productores meterán en la cola un mensaje aleatorio formado por un string de longitud fija y dormirán un tiempo aleatorio entre 1 y 30 segundos. Los consumidores comprobarán si la cola está vacía y en caso contraría leerán un mensaje. El código sería algo del siguiente tipo:

from multiprocessing import Queue
import threading
import random
import time
import string

# how many threads we want to start
THREAD_COUNT = 6

def genrandom(length):
    chain = ''
    for i in range(length+1):
        chain = chain + random.choice(string.ascii_uppercase)
    return chain

def workflow(threadName, client_type):
    if client_type == 'consumer':
        while True:
            time_sleep = random.randint(1, 30)
            if not queue_chains.empty():
                chain = queue_chains.get()
                print("<--[%s] msg %s read queue. Sleeping %d seconds..." % (threadName, chain, time_sleep))
            else:
                print("<--[%s] Queue is empty. Sleeping %d seconds..." % (threadName,time_sleep))
            time.sleep(time_sleep)
    elif client_type == 'producer':
        while True:
            chain = genrandom(20)
            queue_chains.put(chain)
            time_sleep = random.randint(1, 30)
            print("-->[%s] msg %s put at queue. Sleeping % d seconds..." % (threadName, chain, time_sleep))
            time.sleep(time_sleep)

class Queue_client(threading.Thread):
    def __init__(self, client_type):
        threading.Thread.__init__(self)
        self.client_type = client_type
    def run(self):
        threadName = threading.currentThread().getName()
        workflow(threadName, client_type)

queue_chains = Queue()

print('Starting %d threads...' % THREAD_COUNT)
for i in range(THREAD_COUNT):
    if i % 2 == 0:
        client_type = 'consumer'
        queue_client = Queue_client(client_type)
    else:
        client_type = 'producer'
        queue_client = Queue_client(client_type)
    queue_client.start()

Si ejecutamos el código éste se ejecutará indefinidamente: los threads productores meterán mensajes en la cola y los consumidores los sacarán.
Starting 6 threads...
<--[Thread-1] Queue is empty. Sleeping 17 seconds...
-->[Thread-2] msg IYXJWRCYKYITLEQRXWSBD put at queue. Sleeping  23 seconds...
<--[Thread-3] msg IYXJWRCYKYITLEQRXWSBD got from queue. Sleeping 17 seconds...
-->[Thread-4] msg AWWRXODUXVZGTHFQGLUOM put at queue. Sleeping  17 seconds...
<--[Thread-5] msg AWWRXODUXVZGTHFQGLUOM got from queue. Sleeping 19 seconds...
-->[Thread-6] msg LUCXYDAXOUWSXDCQCUJNL put at queue. Sleeping  9 seconds...
-->[Thread-6] msg LHDCSFMMFKWBUWZGMVGRD put at queue. Sleeping  30 seconds...
<--[Thread-1] msg LUCXYDAXOUWSXDCQCUJNL got from queue. Sleeping 17 seconds...
-->[Thread-4] msg LZPYFVRCKUHWLPZDGPJUJ put at queue. Sleeping  23 seconds...
<--[Thread-3] msg LHDCSFMMFKWBUWZGMVGRD got from queue. Sleeping 5 seconds...
<--[Thread-5] msg LZPYFVRCKUHWLPZDGPJUJ got from queue. Sleeping 11 seconds...
<--[Thread-3] Queue is empty. Sleeping 8 seconds...
-->[Thread-2] msg WGGBXXZDWSTQYPMRPCIYI put at queue. Sleeping  30 seconds...
<--[Thread-5] msg WGGBXXZDWSTQYPMRPCIYI got from queue. Sleeping 30 seconds...
<--[Thread-3] Queue is empty. Sleeping 8 seconds...
<--[Thread-1] Queue is empty. Sleeping 12 seconds...
<--[Thread-3] Queue is empty. Sleeping 22 seconds...
-->[Thread-6] msg PALJHDAYQRAAUHZEYZOMD put at queue. Sleeping  19 seconds...
-->[Thread-4] msg NHRLYNQMXLDWKQFRVKLIJ put at queue. Sleeping  16 seconds...
<--[Thread-1] msg PALJHDAYQRAAUHZEYZOMD got from queue. Sleeping 27 seconds...
-->[Thread-2] msg VTHSBMTDYPPRPIOPZGBOJ put at queue. Sleeping  6 seconds...
-->[Thread-4] msg WHXTSXVPNMWXNHZAROCNT put at queue. Sleeping  24 seconds...
-->[Thread-6] msg RDDGHFPHCROQMFBMCNAHI put at queue. Sleeping  3 seconds...
-->[Thread-2] msg JGGLRBEZCGWRIYTYUOOPL put at queue. Sleeping  26 seconds...
<--[Thread-5] msg NHRLYNQMXLDWKQFRVKLIJ got from queue. Sleeping 14 seconds...
<--[Thread-3] msg VTHSBMTDYPPRPIOPZGBOJ got from queue. Sleeping 24 seconds...
-->[Thread-6] msg SRMYDOHNMAKALGGOFPWIQ put at queue. Sleeping  30 seconds...
<--[Thread-1] msg WHXTSXVPNMWXNHZAROCNT got from queue. Sleeping 9 seconds...
................

Podemos decir que hemos construido (de forma burda) un pool de threads consumidores y un pool de threads productores.

Si quisiéramos cambiar hilos por procesos, en este caso empleando ProcessPoolExecutor el código quedaría del siguiente modo:

from multiprocessing import Queue
from concurrent.futures import ProcessPoolExecutor
import os
import random
import time
import string

# how many threads we want to start
PROCESS_COUNT = 6

def genrandom(length):
    chain = ''
    for i in range(length+1):
        chain = chain + random.choice(string.ascii_uppercase)
    return chain

def workflow(client_type):
    PID = os.getpid()
    if client_type == 'consumer':
        while True:
            time_sleep = random.randint(1, 30)
            if not queue_chains.empty():
                chain = queue_chains.get()
                print("<--[%d] msg %s read queue. Sleeping %d seconds..." % (PID, chain, time_sleep))
            else:
                print("<--[%d] Queue is empty. Sleeping %d seconds..." % (PID,time_sleep))
            time.sleep(time_sleep)
    elif client_type == 'producer':
        while True:
            chain = genrandom(20)
            queue_chains.put(chain)
            time_sleep = random.randint(1, 30)
            print("-->[%d] msg %s put at queue. Sleeping % d seconds..." % (PID, chain, time_sleep))
            time.sleep(time_sleep)

queue_chains = Queue()

print('Starting %d processes...' % PROCESS_COUNT)
executor = ProcessPoolExecutor(max_workers=PROCESS_COUNT)
for i in range(PROCESS_COUNT):
    if i % 2 == 0:
        client_type = 'consumer'
        executor.submit(workflow, client_type)
    else:
        client_type = 'producer'
        executor.submit(workflow, client_type)


El problema de este tipo de colas es que no tienen persistencia, existen solamente mientras el propio proceso de python está corriendo. Para llevar a cabo proyectos de envergadura que requieran mayores capacidades emplearemos otro tipo de soluciones como RabbitMQ, Kafka o Mosquito con los que podremos interactuar desde Python.

2 comentarios: