8 de diciembre de 2019

Colas

La clase Queue del módulo multiprocessing nos proporciona una implementación de colas FIFO ("First In, First Out") que podemos usar para intercambiar información entre threads o procesos de forma segura.

El siguiente ejemplo muestra una implementación de cola FIFO en la que almacenamos valores enteros comprendidos entre 1 y 100 y posteriormente los leemos de forma secuencial. Funciona en cualquier versión de Python igual o superior a 3.5

from multiprocessing import Queue
import random

queue_time = Queue()

print("Saving elements at queue...")
for i in range (5):
    random_time = random.randint(1, 100)
    queue_time.put(random_time)
    print("%d added at queue" % random_time)

print("Reading elements from queue...")
while not queue_time.empty():
    time_read = queue_time.get()
    print("%d read from queue" % time_read)

Al ejecutar el código comprobamos que los elementos de la cola se consumen en el mismo orden en el que son salvados:
Saving elements at queue...
48 added at queue
47 added at queue
64 added at queue
10 added at queue
100 added at queue
Reading elements from queue...
48 read from queue
47 read from queue
64 read from queue
10 read from queue
100 read from queue
Queue is empty!!


Combinando lo visto en entradas anteriores (1 y 2) vamos a construir un grupo de threads que van a meter mensajes en la cola (productores) y un grupo de threads que van a sacar mensajes de la cola (consumidores). Los threads impares serán los consumidores y los pares los productores.
Los productores meterán en la cola un mensaje aleatorio formado por un string de longitud fija y dormirán un tiempo aleatorio entre 1 y 30 segundos. Los consumidores comprobarán si la cola está vacía y en caso contraría leerán un mensaje. El código sería algo del siguiente tipo:

from multiprocessing import Queue
import threading
import random
import time
import string

# how many threads we want to start
THREAD_COUNT = 6

def genrandom(length):
    chain = ''
    for i in range(length+1):
        chain = chain + random.choice(string.ascii_uppercase)
    return chain

def workflow(threadName, client_type):
    if client_type == 'consumer':
        while True:
            time_sleep = random.randint(1, 30)
            if not queue_chains.empty():
                chain = queue_chains.get()
                print("<--[%s] msg %s read queue. Sleeping %d seconds..." % (threadName, chain, time_sleep))
            else:
                print("<--[%s] Queue is empty. Sleeping %d seconds..." % (threadName,time_sleep))
            time.sleep(time_sleep)
    elif client_type == 'producer':
        while True:
            chain = genrandom(20)
            queue_chains.put(chain)
            time_sleep = random.randint(1, 30)
            print("-->[%s] msg %s put at queue. Sleeping % d seconds..." % (threadName, chain, time_sleep))
            time.sleep(time_sleep)

class Queue_client(threading.Thread):
    def __init__(self, client_type):
        threading.Thread.__init__(self)
        self.client_type = client_type
    def run(self):
        threadName = threading.currentThread().getName()
        workflow(threadName, client_type)

queue_chains = Queue()

print('Starting %d threads...' % THREAD_COUNT)
for i in range(THREAD_COUNT):
    if i % 2 == 0:
        client_type = 'consumer'
        queue_client = Queue_client(client_type)
    else:
        client_type = 'producer'
        queue_client = Queue_client(client_type)
    queue_client.start()

Si ejecutamos el código éste se ejecutará indefinidamente: los threads productores meterán mensajes en la cola y los consumidores los sacarán.
Starting 6 threads...
<--[Thread-1] Queue is empty. Sleeping 17 seconds...
-->[Thread-2] msg IYXJWRCYKYITLEQRXWSBD put at queue. Sleeping  23 seconds...
<--[Thread-3] msg IYXJWRCYKYITLEQRXWSBD got from queue. Sleeping 17 seconds...
-->[Thread-4] msg AWWRXODUXVZGTHFQGLUOM put at queue. Sleeping  17 seconds...
<--[Thread-5] msg AWWRXODUXVZGTHFQGLUOM got from queue. Sleeping 19 seconds...
-->[Thread-6] msg LUCXYDAXOUWSXDCQCUJNL put at queue. Sleeping  9 seconds...
-->[Thread-6] msg LHDCSFMMFKWBUWZGMVGRD put at queue. Sleeping  30 seconds...
<--[Thread-1] msg LUCXYDAXOUWSXDCQCUJNL got from queue. Sleeping 17 seconds...
-->[Thread-4] msg LZPYFVRCKUHWLPZDGPJUJ put at queue. Sleeping  23 seconds...
<--[Thread-3] msg LHDCSFMMFKWBUWZGMVGRD got from queue. Sleeping 5 seconds...
<--[Thread-5] msg LZPYFVRCKUHWLPZDGPJUJ got from queue. Sleeping 11 seconds...
<--[Thread-3] Queue is empty. Sleeping 8 seconds...
-->[Thread-2] msg WGGBXXZDWSTQYPMRPCIYI put at queue. Sleeping  30 seconds...
<--[Thread-5] msg WGGBXXZDWSTQYPMRPCIYI got from queue. Sleeping 30 seconds...
<--[Thread-3] Queue is empty. Sleeping 8 seconds...
<--[Thread-1] Queue is empty. Sleeping 12 seconds...
<--[Thread-3] Queue is empty. Sleeping 22 seconds...
-->[Thread-6] msg PALJHDAYQRAAUHZEYZOMD put at queue. Sleeping  19 seconds...
-->[Thread-4] msg NHRLYNQMXLDWKQFRVKLIJ put at queue. Sleeping  16 seconds...
<--[Thread-1] msg PALJHDAYQRAAUHZEYZOMD got from queue. Sleeping 27 seconds...
-->[Thread-2] msg VTHSBMTDYPPRPIOPZGBOJ put at queue. Sleeping  6 seconds...
-->[Thread-4] msg WHXTSXVPNMWXNHZAROCNT put at queue. Sleeping  24 seconds...
-->[Thread-6] msg RDDGHFPHCROQMFBMCNAHI put at queue. Sleeping  3 seconds...
-->[Thread-2] msg JGGLRBEZCGWRIYTYUOOPL put at queue. Sleeping  26 seconds...
<--[Thread-5] msg NHRLYNQMXLDWKQFRVKLIJ got from queue. Sleeping 14 seconds...
<--[Thread-3] msg VTHSBMTDYPPRPIOPZGBOJ got from queue. Sleeping 24 seconds...
-->[Thread-6] msg SRMYDOHNMAKALGGOFPWIQ put at queue. Sleeping  30 seconds...
<--[Thread-1] msg WHXTSXVPNMWXNHZAROCNT got from queue. Sleeping 9 seconds...
................

Podemos decir que hemos construido (de forma burda) un pool de threads consumidores y un pool de threads productores.

Si quisiéramos cambiar hilos por procesos, en este caso empleando ProcessPoolExecutor el código quedaría del siguiente modo:

from multiprocessing import Queue
from concurrent.futures import ProcessPoolExecutor
import os
import random
import time
import string

# how many threads we want to start
PROCESS_COUNT = 6

def genrandom(length):
    chain = ''
    for i in range(length+1):
        chain = chain + random.choice(string.ascii_uppercase)
    return chain

def workflow(client_type):
    PID = os.getpid()
    if client_type == 'consumer':
        while True:
            time_sleep = random.randint(1, 30)
            if not queue_chains.empty():
                chain = queue_chains.get()
                print("<--[%d] msg %s read queue. Sleeping %d seconds..." % (PID, chain, time_sleep))
            else:
                print("<--[%d] Queue is empty. Sleeping %d seconds..." % (PID,time_sleep))
            time.sleep(time_sleep)
    elif client_type == 'producer':
        while True:
            chain = genrandom(20)
            queue_chains.put(chain)
            time_sleep = random.randint(1, 30)
            print("-->[%d] msg %s put at queue. Sleeping % d seconds..." % (PID, chain, time_sleep))
            time.sleep(time_sleep)

queue_chains = Queue()

print('Starting %d processes...' % PROCESS_COUNT)
executor = ProcessPoolExecutor(max_workers=PROCESS_COUNT)
for i in range(PROCESS_COUNT):
    if i % 2 == 0:
        client_type = 'consumer'
        executor.submit(workflow, client_type)
    else:
        client_type = 'producer'
        executor.submit(workflow, client_type)


El problema de este tipo de colas es que no tienen persistencia, existen solamente mientras el propio proceso de python está corriendo. Para llevar a cabo proyectos de envergadura que requieran mayores capacidades emplearemos otro tipo de soluciones como RabbitMQ, Kafka o Mosquito con los que podremos interactuar desde Python.

7 de diciembre de 2019

Hilos y procesos (II)

En la anterior entrada vimos diferentes implementaciones con threads y procesos en los que se ejecutaba una lógica simple.
Basándonos en el anterior código vamos a ver como podemos ampliar la lógica pasando parámetros para la ejecución.

En este caso los threads y procesos recibirán como parámetro un tiempo aleatorio de espera (entero comprendido entre 1 y 10), de forma que se iniciarán indicando el tiempo de espera asignado, se "dormirán" ese número de segundos y finalmente "despertarán" del letargo y finalizarán la ejecución.

import threading
import random
import time
from datetime import datetime

# how many threads we want to start
THREADS_COUNT = 3

class Threaded_worker(threading.Thread):
    def __init__(self, sleep_time):
        threading.Thread.__init__(self)
        self.sleep_time = sleep_time
    def run(self):
        threadName = threading.currentThread().getName()
        print("[%s] Hello, I am %s and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), threadName, sleep_time))
        time.sleep(self.sleep_time)
        print("[%s] I am %s and I have waken up." % (datetime.now().strftime('%H:%M:%S'), threadName))

print('Starting %d threads...' % THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1,10)
    td = Threaded_worker(sleep_time)
    td.start()

Al ejecutar el código anterior obtendremos:
Starting 3 threads...
[18:16:12] Hello, I am Thread-1 and I will sleep 4 seconds.
[18:16:12] Hello, I am Thread-2 and I will sleep 10 seconds.
[18:16:12] Hello, I am Thread-3 and I will sleep 9 seconds.
[18:16:16] I am Thread-1 and I have waken up.
[18:16:21] I am Thread-3 and I have waken up.
[18:16:22] I am Thread-2 and I have waken up.

En este caso el parámetro que pasamos a cada hilo es sleep_time y la lógica se define en el método run().
Para tener un código mejor estructurado podemos sacar la lógica del método run() a un método externo fuera de la clase.
En el siguiente código la lógica del método run() se traslada al método externo workflow(), de modo que desde el método run() invocamos el método workflow()

import threading
import random
import time
from datetime import datetime

# how many threads we want to start
THREADS_COUNT = 3

def workflow(threadName, seconds):
    print("[%s] Hello, I am %s and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), threadName, seconds))
    time.sleep(seconds)
    print("[%s] I am %s and I have waken up." % (datetime.now().strftime('%H:%M:%S'), threadName))

class Threaded_worker(threading.Thread):
    def __init__(self, sleep_time):
        threading.Thread.__init__(self)
        self.sleep_time = sleep_time
    def run(self):
        threadName = threading.currentThread().getName()
        workflow(threadName, self.sleep_time)

print('Starting %d threads...' % THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1,10)
    td = Threaded_worker(sleep_time)
    td.start()

La ejecución del código tendrá una salida similar al anterior:

Starting 3 threads...
[18:57:40] Hello, I am Thread-1 and I will sleep 8 seconds.
[18:57:40] Hello, I am Thread-2 and I will sleep 9 seconds.
[18:57:40] Hello, I am Thread-3 and I will sleep 3 seconds.
[18:57:43] I am Thread-3 and I have waken up.
[18:57:48] I am Thread-1 and I have waken up.
[18:57:49] I am Thread-2 and I have waken up.

Análogamente, empleando procesos tenemos:

import multiprocessing
import os
import random
import time
from datetime import datetime

# how many processes we want to start
WORKER_NUMBER = 3

def worker(sleep_time):
    PID = os.getpid()
    print("[%s] Hello, I am the PID %d I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), PID, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am the PID %d and I have waken up." % (datetime.now().strftime('%H:%M:%S'), PID))

print('Starting %d processes...' % WORKER_NUMBER)

jobs = []
for i in range(WORKER_NUMBER):
    sleep_time = random.randint(1, 10)
    p = multiprocessing.Process(target=worker, args=(sleep_time,))
    jobs.append(p)
    p.start()

La ejecución del código generará una salida del siguiente tipo:
Starting 3 processes...
[14:53:48] Hello, I am the PID 2099 I will sleep 4 seconds.
[14:53:48] Hello, I am the PID 2098 I will sleep 5 seconds.
[14:53:48] Hello, I am the PID 2100 I will sleep 3 seconds.
[14:53:51] I am the PID 2100 and I have waken up.
[14:53:52] I am the PID 2099 and I have waken up.
[14:53:53] I am the PID 2098 and I have waken up

Y de nuevo, si queremos extraer la lógica a una función workflow() externa:

import multiprocessing
import os
import random
import time
from datetime import datetime

# how many processes we want to start
WORKER_NUMBER = 3

def workflow(PID, sleep_time):
    print("[%s] Hello, I am the PID %d I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), PID, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am the PID %d and I have waken up." % (datetime.now().strftime('%H:%M:%S'), PID))

def worker(sleep_time):
    PID = os.getpid()
    workflow(PID, sleep_time)

print('Starting %d processes...' % WORKER_NUMBER)

jobs = []
for i in range(WORKER_NUMBER):
    sleep_time = random.randint(1, 10)
    p = multiprocessing.Process(target=worker, args=(sleep_time,))
    jobs.append(p)
    p.start()


Los códigos anteriores son válidos para todas las versiones de python 3.

Si disponemos de una versión de Python mayor o igual que 3.5 podemos hacer uso de las subclases ThreadPoolExecutor y ProcessPoolExecutor adaptando fácilmente los códigos anteriores.

Para el caso de ThreadPoolExecutor tendríamos el siguiente código:

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import random
import threading
import time

THREADS_COUNT = 3

def run(sleep_time):
    threadName = threading.currentThread().getName()
    print("[%s] Hello, I am %s and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), threadName, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am %s and I have waken up." % (datetime.now().strftime('%H:%M:%S'), threadName))


executor = ThreadPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1, 10)
    executor.submit(run, sleep_time)
En este caso la salida sería similar:

[20:48:20] Hello, I am ThreadPoolExecutor-0_0 and I will sleep 7 seconds.
[20:48:20] Hello, I am ThreadPoolExecutor-0_1 and I will sleep 10 seconds.
[20:48:20] Hello, I am ThreadPoolExecutor-0_2 and I will sleep 9 seconds.
[20:48:27] I am ThreadPoolExecutor-0_0 and I have waken up.
[20:48:29] I am ThreadPoolExecutor-0_2 and I have waken up.
[20:48:30] I am ThreadPoolExecutor-0_1 and I have waken up.


La adaptación a ProcessPoolExecutor es trivial a partir de la anterior:

from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
import random
import os
import time


THREADS_COUNT = 3


def run(sleep_time):
    PID = os.getpid()
    print("[%s] Hello, I am %d and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), PID, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am %d and I have waken up." % (datetime.now().strftime('%H:%M:%S'), PID))


executor = ProcessPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1, 10)
    executor.submit(run, sleep_time)

La salida de la ejecución será similar a las anteriores:

[20:53:18] Hello, I am 7433 and I will sleep 5 seconds.
[20:53:18] Hello, I am 7434 and I will sleep 2 seconds.
[20:53:18] Hello, I am 7435 and I will sleep 5 seconds.
[20:53:20] I am 7434 and I have waken up.
[20:53:23] I am 7433 and I have waken up.
[20:53:23] I am 7435 and I have waken up.
En próximas entradas hablaremos de como trabajar con "colas" en las que podemos almacenar tareas, de forma que un grupo de hilos o procesos sean los encargados de "sacar" esas tareas de la cola y ejecutar la correspondiente lógica para cada una de ellas.

Hilos y procesos (I)

El siguiente ejemplo muestra una implementación de hilos o threads válido para cualquier versión de python 3 extendiendo la clase Thread del módulo threading

import threading
  
# how many threads we want to start  
THREADS_COUNT = 3  

class Threaded_worker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
        threadName = threading.currentThread().getName()
        print("Hello, I am the thread %s" % threadName)

print('Starting %d threads...' % THREADS_COUNT)
for i in range(THREADS_COUNT):
    td = Threaded_worker()
    td.start()

En este caso cada hilo se inicia y ejecuta el código del método run(), es decir, imprime el mensaje de saludo mostrando su nombre y finaliza.

Al ejecutar el código anterior obtendremos:
Starting 3 threads...
Hello, I am the thread Thread-1
Hello, I am the thread Thread-2
Hello, I am the thread Thread-3

Si en lugar de threads o hilos queremos usar procesos para saltarnos el GIL, podemos emplear el siguiente código, también válido para cualquier versión de python 3:

import multiprocessing
import os

# how many processes we want to start
WORKER_NUMBER = 3

def worker():
    PID = os.getpid()
    print ("Hello, I am the process with PID %d" % PID)

print ('Starting %d processes...' % WORKER_NUMBER)

jobs = []
for i in range(WORKER_NUMBER):
    p = multiprocessing.Process(target=worker, args=())
    jobs.append(p)
    p.start()

De modo análogo cada proceso se inicia y ejecuta el código del método worker(), y en este caso de nuevo nos saluda informando de su PID y finaliza.

Así, al ejecutar el código anterior obtendremos:
Starting 3 processes...
Hello, I am the process with PID 3486
Hello, I am the process with PID 3487
Hello, I am the process with PID 3488

En python, no siempre el empleo de hilos nos va a proporcionar mejores resultados. Vamos a verlo con un ejemplo.

Escribamos un código simple que ejecute una cuenta atrás de 500 millones:

def countdown(n):
    while n > 0:
        n -= 1

COUNT = 500000000
countdown(COUNT)
Vamos a ejecutar el código en un equipo con un procesador Intel(R) Core(TM) i5-3337U CPU @ 1.80GHz con 4 cpus, y 2 cores por cada cpu. La versión de python será la 3.7.

En este caso la ejecución tarda unos 29 segundos:

real    0m28,782s
user    0m28,776s
sys     0m0,004s
Ahora escribimos un código similar en el que vamos a realizar la misma cuenta atrás repartiendo la tarea entre 2 threads, de forma que cada thread ejecuta una cuenta atrás de 250 millones:

import threading

# how many threads we want to start
THREADS_COUNT = 2

class Threaded_worker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        n = 250000000
        while n > 0:
            n -= 1

for i in range(THREADS_COUNT):
    td = Threaded_worker()
    td.start()
Y de nuevo medimos el tiempo de ejecución. En este caso tarda unos 30 segundos

real    0m30,147s
user    0m30,132s
sys     0m0,080s
A continuación escribimos un código similar empleando procesos:

import multiprocessing

WORKER_NUMBER = 2

def worker():
    n = 250000000
    while n > 0:
        n -= 1

jobs = []
for i in range(WORKER_NUMBER):
    p = multiprocessing.Process(target=worker, args=())
    jobs.append(p)
    p.start()
En este caso el tiempo de ejecución es de 15.7 segundos, es decir, hemos reducido el tiempo a la mitad respecto al primer caso.

real    0m15,767s
user    0m31,444s
sys     0m0,012s
El "responsable" de este comportamiento es el GIL que no permite la ejecución simultánea de threads.

En futuras entradas veremos que el GIL no es realmente "tan malo", y que en determinados casos es útil emplear threads.

Para terminar vamos a hacer una pequeña revisión del módulo concurrent.futures disponible a partir de la versión 3.5 de python.
Este módulo nos va a permitir ejecutar tareas asíncronas empleando threads o procesos mediante las subclases ThreadPoolExecutor y ProcessPoolExecutor. En los link podéis encontrar la documentación completa.

Para ver un ejemplo vamos a codificar los ejemplos anteriores empleando estas subclases, haciendo de este modo un pequeña introducción al concepto de programación asíncrona.

En el caso de threads tenemos el siguiente código:

from concurrent.futures import ThreadPoolExecutor

THREADS_COUNT = 2


def run():
    n = 250000000
    while n > 0:
        n -= 1


executor = ThreadPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    executor.submit(run)

En este caso el tiempo de ejecución es similar al ejemplo previo:

real    0m29,023s
user    0m28,936s
sys     0m0,012s


Para procesos el código equivalente sería:

from concurrent.futures import ProcessPoolExecutor

THREADS_COUNT = 2


def run():
    n = 250000000
    while n > 0:
        n -= 1


executor = ProcessPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    executor.submit(run)

Con el siguiente tiempo de ejecución:

real    0m16,774s
user    0m33,100s
sys     0m0,016s
Como vemos los tiempos son similares, aunque la sintaxis es quizá más sencilla. En futuras entradas profundizaremos en el concepto de programación asíncrona y hablaremos en detalle del Event Loop como core de la programación asíncrona en python.