7 de diciembre de 2019

Hilos y procesos (II)

En la anterior entrada vimos diferentes implementaciones con threads y procesos en los que se ejecutaba una lógica simple.
Basándonos en el anterior código vamos a ver como podemos ampliar la lógica pasando parámetros para la ejecución.

En este caso los threads y procesos recibirán como parámetro un tiempo aleatorio de espera (entero comprendido entre 1 y 10), de forma que se iniciarán indicando el tiempo de espera asignado, se "dormirán" ese número de segundos y finalmente "despertarán" del letargo y finalizarán la ejecución.

import threading
import random
import time
from datetime import datetime

# how many threads we want to start
THREADS_COUNT = 3

class Threaded_worker(threading.Thread):
    def __init__(self, sleep_time):
        threading.Thread.__init__(self)
        self.sleep_time = sleep_time
    def run(self):
        threadName = threading.currentThread().getName()
        print("[%s] Hello, I am %s and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), threadName, sleep_time))
        time.sleep(self.sleep_time)
        print("[%s] I am %s and I have waken up." % (datetime.now().strftime('%H:%M:%S'), threadName))

print('Starting %d threads...' % THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1,10)
    td = Threaded_worker(sleep_time)
    td.start()

Al ejecutar el código anterior obtendremos:
Starting 3 threads...
[18:16:12] Hello, I am Thread-1 and I will sleep 4 seconds.
[18:16:12] Hello, I am Thread-2 and I will sleep 10 seconds.
[18:16:12] Hello, I am Thread-3 and I will sleep 9 seconds.
[18:16:16] I am Thread-1 and I have waken up.
[18:16:21] I am Thread-3 and I have waken up.
[18:16:22] I am Thread-2 and I have waken up.

En este caso el parámetro que pasamos a cada hilo es sleep_time y la lógica se define en el método run().
Para tener un código mejor estructurado podemos sacar la lógica del método run() a un método externo fuera de la clase.
En el siguiente código la lógica del método run() se traslada al método externo workflow(), de modo que desde el método run() invocamos el método workflow()

import threading
import random
import time
from datetime import datetime

# how many threads we want to start
THREADS_COUNT = 3

def workflow(threadName, seconds):
    print("[%s] Hello, I am %s and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), threadName, seconds))
    time.sleep(seconds)
    print("[%s] I am %s and I have waken up." % (datetime.now().strftime('%H:%M:%S'), threadName))

class Threaded_worker(threading.Thread):
    def __init__(self, sleep_time):
        threading.Thread.__init__(self)
        self.sleep_time = sleep_time
    def run(self):
        threadName = threading.currentThread().getName()
        workflow(threadName, self.sleep_time)

print('Starting %d threads...' % THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1,10)
    td = Threaded_worker(sleep_time)
    td.start()

La ejecución del código tendrá una salida similar al anterior:

Starting 3 threads...
[18:57:40] Hello, I am Thread-1 and I will sleep 8 seconds.
[18:57:40] Hello, I am Thread-2 and I will sleep 9 seconds.
[18:57:40] Hello, I am Thread-3 and I will sleep 3 seconds.
[18:57:43] I am Thread-3 and I have waken up.
[18:57:48] I am Thread-1 and I have waken up.
[18:57:49] I am Thread-2 and I have waken up.

Análogamente, empleando procesos tenemos:

import multiprocessing
import os
import random
import time
from datetime import datetime

# how many processes we want to start
WORKER_NUMBER = 3

def worker(sleep_time):
    PID = os.getpid()
    print("[%s] Hello, I am the PID %d I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), PID, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am the PID %d and I have waken up." % (datetime.now().strftime('%H:%M:%S'), PID))

print('Starting %d processes...' % WORKER_NUMBER)

jobs = []
for i in range(WORKER_NUMBER):
    sleep_time = random.randint(1, 10)
    p = multiprocessing.Process(target=worker, args=(sleep_time,))
    jobs.append(p)
    p.start()

La ejecución del código generará una salida del siguiente tipo:
Starting 3 processes...
[14:53:48] Hello, I am the PID 2099 I will sleep 4 seconds.
[14:53:48] Hello, I am the PID 2098 I will sleep 5 seconds.
[14:53:48] Hello, I am the PID 2100 I will sleep 3 seconds.
[14:53:51] I am the PID 2100 and I have waken up.
[14:53:52] I am the PID 2099 and I have waken up.
[14:53:53] I am the PID 2098 and I have waken up

Y de nuevo, si queremos extraer la lógica a una función workflow() externa:

import multiprocessing
import os
import random
import time
from datetime import datetime

# how many processes we want to start
WORKER_NUMBER = 3

def workflow(PID, sleep_time):
    print("[%s] Hello, I am the PID %d I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), PID, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am the PID %d and I have waken up." % (datetime.now().strftime('%H:%M:%S'), PID))

def worker(sleep_time):
    PID = os.getpid()
    workflow(PID, sleep_time)

print('Starting %d processes...' % WORKER_NUMBER)

jobs = []
for i in range(WORKER_NUMBER):
    sleep_time = random.randint(1, 10)
    p = multiprocessing.Process(target=worker, args=(sleep_time,))
    jobs.append(p)
    p.start()


Los códigos anteriores son válidos para todas las versiones de python 3.

Si disponemos de una versión de Python mayor o igual que 3.5 podemos hacer uso de las subclases ThreadPoolExecutor y ProcessPoolExecutor adaptando fácilmente los códigos anteriores.

Para el caso de ThreadPoolExecutor tendríamos el siguiente código:

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import random
import threading
import time

THREADS_COUNT = 3

def run(sleep_time):
    threadName = threading.currentThread().getName()
    print("[%s] Hello, I am %s and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), threadName, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am %s and I have waken up." % (datetime.now().strftime('%H:%M:%S'), threadName))


executor = ThreadPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1, 10)
    executor.submit(run, sleep_time)
En este caso la salida sería similar:

[20:48:20] Hello, I am ThreadPoolExecutor-0_0 and I will sleep 7 seconds.
[20:48:20] Hello, I am ThreadPoolExecutor-0_1 and I will sleep 10 seconds.
[20:48:20] Hello, I am ThreadPoolExecutor-0_2 and I will sleep 9 seconds.
[20:48:27] I am ThreadPoolExecutor-0_0 and I have waken up.
[20:48:29] I am ThreadPoolExecutor-0_2 and I have waken up.
[20:48:30] I am ThreadPoolExecutor-0_1 and I have waken up.


La adaptación a ProcessPoolExecutor es trivial a partir de la anterior:

from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
import random
import os
import time


THREADS_COUNT = 3


def run(sleep_time):
    PID = os.getpid()
    print("[%s] Hello, I am %d and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), PID, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am %d and I have waken up." % (datetime.now().strftime('%H:%M:%S'), PID))


executor = ProcessPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1, 10)
    executor.submit(run, sleep_time)

La salida de la ejecución será similar a las anteriores:

[20:53:18] Hello, I am 7433 and I will sleep 5 seconds.
[20:53:18] Hello, I am 7434 and I will sleep 2 seconds.
[20:53:18] Hello, I am 7435 and I will sleep 5 seconds.
[20:53:20] I am 7434 and I have waken up.
[20:53:23] I am 7433 and I have waken up.
[20:53:23] I am 7435 and I have waken up.
En próximas entradas hablaremos de como trabajar con "colas" en las que podemos almacenar tareas, de forma que un grupo de hilos o procesos sean los encargados de "sacar" esas tareas de la cola y ejecutar la correspondiente lógica para cada una de ellas.

No hay comentarios:

Publicar un comentario