Mostrando entradas con la etiqueta threading. Mostrar todas las entradas
Mostrando entradas con la etiqueta threading. Mostrar todas las entradas

7 de diciembre de 2019

Hilos y procesos (II)

En la anterior entrada vimos diferentes implementaciones con threads y procesos en los que se ejecutaba una lógica simple.
Basándonos en el anterior código vamos a ver como podemos ampliar la lógica pasando parámetros para la ejecución.

En este caso los threads y procesos recibirán como parámetro un tiempo aleatorio de espera (entero comprendido entre 1 y 10), de forma que se iniciarán indicando el tiempo de espera asignado, se "dormirán" ese número de segundos y finalmente "despertarán" del letargo y finalizarán la ejecución.

import threading
import random
import time
from datetime import datetime

# how many threads we want to start
THREADS_COUNT = 3

class Threaded_worker(threading.Thread):
    def __init__(self, sleep_time):
        threading.Thread.__init__(self)
        self.sleep_time = sleep_time
    def run(self):
        threadName = threading.currentThread().getName()
        print("[%s] Hello, I am %s and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), threadName, sleep_time))
        time.sleep(self.sleep_time)
        print("[%s] I am %s and I have waken up." % (datetime.now().strftime('%H:%M:%S'), threadName))

print('Starting %d threads...' % THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1,10)
    td = Threaded_worker(sleep_time)
    td.start()

Al ejecutar el código anterior obtendremos:
Starting 3 threads...
[18:16:12] Hello, I am Thread-1 and I will sleep 4 seconds.
[18:16:12] Hello, I am Thread-2 and I will sleep 10 seconds.
[18:16:12] Hello, I am Thread-3 and I will sleep 9 seconds.
[18:16:16] I am Thread-1 and I have waken up.
[18:16:21] I am Thread-3 and I have waken up.
[18:16:22] I am Thread-2 and I have waken up.

En este caso el parámetro que pasamos a cada hilo es sleep_time y la lógica se define en el método run().
Para tener un código mejor estructurado podemos sacar la lógica del método run() a un método externo fuera de la clase.
En el siguiente código la lógica del método run() se traslada al método externo workflow(), de modo que desde el método run() invocamos el método workflow()

import threading
import random
import time
from datetime import datetime

# how many threads we want to start
THREADS_COUNT = 3

def workflow(threadName, seconds):
    print("[%s] Hello, I am %s and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), threadName, seconds))
    time.sleep(seconds)
    print("[%s] I am %s and I have waken up." % (datetime.now().strftime('%H:%M:%S'), threadName))

class Threaded_worker(threading.Thread):
    def __init__(self, sleep_time):
        threading.Thread.__init__(self)
        self.sleep_time = sleep_time
    def run(self):
        threadName = threading.currentThread().getName()
        workflow(threadName, self.sleep_time)

print('Starting %d threads...' % THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1,10)
    td = Threaded_worker(sleep_time)
    td.start()

La ejecución del código tendrá una salida similar al anterior:

Starting 3 threads...
[18:57:40] Hello, I am Thread-1 and I will sleep 8 seconds.
[18:57:40] Hello, I am Thread-2 and I will sleep 9 seconds.
[18:57:40] Hello, I am Thread-3 and I will sleep 3 seconds.
[18:57:43] I am Thread-3 and I have waken up.
[18:57:48] I am Thread-1 and I have waken up.
[18:57:49] I am Thread-2 and I have waken up.

Análogamente, empleando procesos tenemos:

import multiprocessing
import os
import random
import time
from datetime import datetime

# how many processes we want to start
WORKER_NUMBER = 3

def worker(sleep_time):
    PID = os.getpid()
    print("[%s] Hello, I am the PID %d I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), PID, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am the PID %d and I have waken up." % (datetime.now().strftime('%H:%M:%S'), PID))

print('Starting %d processes...' % WORKER_NUMBER)

jobs = []
for i in range(WORKER_NUMBER):
    sleep_time = random.randint(1, 10)
    p = multiprocessing.Process(target=worker, args=(sleep_time,))
    jobs.append(p)
    p.start()

La ejecución del código generará una salida del siguiente tipo:
Starting 3 processes...
[14:53:48] Hello, I am the PID 2099 I will sleep 4 seconds.
[14:53:48] Hello, I am the PID 2098 I will sleep 5 seconds.
[14:53:48] Hello, I am the PID 2100 I will sleep 3 seconds.
[14:53:51] I am the PID 2100 and I have waken up.
[14:53:52] I am the PID 2099 and I have waken up.
[14:53:53] I am the PID 2098 and I have waken up

Y de nuevo, si queremos extraer la lógica a una función workflow() externa:

import multiprocessing
import os
import random
import time
from datetime import datetime

# how many processes we want to start
WORKER_NUMBER = 3

def workflow(PID, sleep_time):
    print("[%s] Hello, I am the PID %d I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), PID, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am the PID %d and I have waken up." % (datetime.now().strftime('%H:%M:%S'), PID))

def worker(sleep_time):
    PID = os.getpid()
    workflow(PID, sleep_time)

print('Starting %d processes...' % WORKER_NUMBER)

jobs = []
for i in range(WORKER_NUMBER):
    sleep_time = random.randint(1, 10)
    p = multiprocessing.Process(target=worker, args=(sleep_time,))
    jobs.append(p)
    p.start()


Los códigos anteriores son válidos para todas las versiones de python 3.

Si disponemos de una versión de Python mayor o igual que 3.5 podemos hacer uso de las subclases ThreadPoolExecutor y ProcessPoolExecutor adaptando fácilmente los códigos anteriores.

Para el caso de ThreadPoolExecutor tendríamos el siguiente código:

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import random
import threading
import time

THREADS_COUNT = 3

def run(sleep_time):
    threadName = threading.currentThread().getName()
    print("[%s] Hello, I am %s and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), threadName, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am %s and I have waken up." % (datetime.now().strftime('%H:%M:%S'), threadName))


executor = ThreadPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1, 10)
    executor.submit(run, sleep_time)
En este caso la salida sería similar:

[20:48:20] Hello, I am ThreadPoolExecutor-0_0 and I will sleep 7 seconds.
[20:48:20] Hello, I am ThreadPoolExecutor-0_1 and I will sleep 10 seconds.
[20:48:20] Hello, I am ThreadPoolExecutor-0_2 and I will sleep 9 seconds.
[20:48:27] I am ThreadPoolExecutor-0_0 and I have waken up.
[20:48:29] I am ThreadPoolExecutor-0_2 and I have waken up.
[20:48:30] I am ThreadPoolExecutor-0_1 and I have waken up.


La adaptación a ProcessPoolExecutor es trivial a partir de la anterior:

from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
import random
import os
import time


THREADS_COUNT = 3


def run(sleep_time):
    PID = os.getpid()
    print("[%s] Hello, I am %d and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), PID, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am %d and I have waken up." % (datetime.now().strftime('%H:%M:%S'), PID))


executor = ProcessPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1, 10)
    executor.submit(run, sleep_time)

La salida de la ejecución será similar a las anteriores:

[20:53:18] Hello, I am 7433 and I will sleep 5 seconds.
[20:53:18] Hello, I am 7434 and I will sleep 2 seconds.
[20:53:18] Hello, I am 7435 and I will sleep 5 seconds.
[20:53:20] I am 7434 and I have waken up.
[20:53:23] I am 7433 and I have waken up.
[20:53:23] I am 7435 and I have waken up.
En próximas entradas hablaremos de como trabajar con "colas" en las que podemos almacenar tareas, de forma que un grupo de hilos o procesos sean los encargados de "sacar" esas tareas de la cola y ejecutar la correspondiente lógica para cada una de ellas.

Hilos y procesos (I)

El siguiente ejemplo muestra una implementación de hilos o threads válido para cualquier versión de python 3 extendiendo la clase Thread del módulo threading

import threading
  
# how many threads we want to start  
THREADS_COUNT = 3  

class Threaded_worker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
        threadName = threading.currentThread().getName()
        print("Hello, I am the thread %s" % threadName)

print('Starting %d threads...' % THREADS_COUNT)
for i in range(THREADS_COUNT):
    td = Threaded_worker()
    td.start()

En este caso cada hilo se inicia y ejecuta el código del método run(), es decir, imprime el mensaje de saludo mostrando su nombre y finaliza.

Al ejecutar el código anterior obtendremos:
Starting 3 threads...
Hello, I am the thread Thread-1
Hello, I am the thread Thread-2
Hello, I am the thread Thread-3

Si en lugar de threads o hilos queremos usar procesos para saltarnos el GIL, podemos emplear el siguiente código, también válido para cualquier versión de python 3:

import multiprocessing
import os

# how many processes we want to start
WORKER_NUMBER = 3

def worker():
    PID = os.getpid()
    print ("Hello, I am the process with PID %d" % PID)

print ('Starting %d processes...' % WORKER_NUMBER)

jobs = []
for i in range(WORKER_NUMBER):
    p = multiprocessing.Process(target=worker, args=())
    jobs.append(p)
    p.start()

De modo análogo cada proceso se inicia y ejecuta el código del método worker(), y en este caso de nuevo nos saluda informando de su PID y finaliza.

Así, al ejecutar el código anterior obtendremos:
Starting 3 processes...
Hello, I am the process with PID 3486
Hello, I am the process with PID 3487
Hello, I am the process with PID 3488

En python, no siempre el empleo de hilos nos va a proporcionar mejores resultados. Vamos a verlo con un ejemplo.

Escribamos un código simple que ejecute una cuenta atrás de 500 millones:

def countdown(n):
    while n > 0:
        n -= 1

COUNT = 500000000
countdown(COUNT)
Vamos a ejecutar el código en un equipo con un procesador Intel(R) Core(TM) i5-3337U CPU @ 1.80GHz con 4 cpus, y 2 cores por cada cpu. La versión de python será la 3.7.

En este caso la ejecución tarda unos 29 segundos:

real    0m28,782s
user    0m28,776s
sys     0m0,004s
Ahora escribimos un código similar en el que vamos a realizar la misma cuenta atrás repartiendo la tarea entre 2 threads, de forma que cada thread ejecuta una cuenta atrás de 250 millones:

import threading

# how many threads we want to start
THREADS_COUNT = 2

class Threaded_worker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        n = 250000000
        while n > 0:
            n -= 1

for i in range(THREADS_COUNT):
    td = Threaded_worker()
    td.start()
Y de nuevo medimos el tiempo de ejecución. En este caso tarda unos 30 segundos

real    0m30,147s
user    0m30,132s
sys     0m0,080s
A continuación escribimos un código similar empleando procesos:

import multiprocessing

WORKER_NUMBER = 2

def worker():
    n = 250000000
    while n > 0:
        n -= 1

jobs = []
for i in range(WORKER_NUMBER):
    p = multiprocessing.Process(target=worker, args=())
    jobs.append(p)
    p.start()
En este caso el tiempo de ejecución es de 15.7 segundos, es decir, hemos reducido el tiempo a la mitad respecto al primer caso.

real    0m15,767s
user    0m31,444s
sys     0m0,012s
El "responsable" de este comportamiento es el GIL que no permite la ejecución simultánea de threads.

En futuras entradas veremos que el GIL no es realmente "tan malo", y que en determinados casos es útil emplear threads.

Para terminar vamos a hacer una pequeña revisión del módulo concurrent.futures disponible a partir de la versión 3.5 de python.
Este módulo nos va a permitir ejecutar tareas asíncronas empleando threads o procesos mediante las subclases ThreadPoolExecutor y ProcessPoolExecutor. En los link podéis encontrar la documentación completa.

Para ver un ejemplo vamos a codificar los ejemplos anteriores empleando estas subclases, haciendo de este modo un pequeña introducción al concepto de programación asíncrona.

En el caso de threads tenemos el siguiente código:

from concurrent.futures import ThreadPoolExecutor

THREADS_COUNT = 2


def run():
    n = 250000000
    while n > 0:
        n -= 1


executor = ThreadPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    executor.submit(run)

En este caso el tiempo de ejecución es similar al ejemplo previo:

real    0m29,023s
user    0m28,936s
sys     0m0,012s


Para procesos el código equivalente sería:

from concurrent.futures import ProcessPoolExecutor

THREADS_COUNT = 2


def run():
    n = 250000000
    while n > 0:
        n -= 1


executor = ProcessPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    executor.submit(run)

Con el siguiente tiempo de ejecución:

real    0m16,774s
user    0m33,100s
sys     0m0,016s
Como vemos los tiempos son similares, aunque la sintaxis es quizá más sencilla. En futuras entradas profundizaremos en el concepto de programación asíncrona y hablaremos en detalle del Event Loop como core de la programación asíncrona en python.

11 de diciembre de 2017

Colas (RabbitMQ) - número variable de consumidores

En una entrada previa Colas (RabbitMQ) - consumidores mostrábamos como crear un pool de threads que iban a consumir mensajes de una cola y ejecutar un workflow para cada mensaje.

El número de consumidores (tamaño del pool de threads) era fijo. Pero ¿Qué sucede si el número de mensajes encolados se dispara? Sería bueno poder crear nuevos hilos que consumieran mensajes mientras el tamaño de la cola excediera un determinado tamaño y una vez que la cola volviera a tener un número de mensajes aceptable estos hilos terminasen.

Con este planteamiento vamos a tener un número de hilos fijos que vamos a denominar Main y por otro lados vamos a crear hilos Backing que solamente existirán cuando el tamaño de la cola exceda un determinado tamaño. Estos hilos Backing juegan el papel de refuerzo temporal a los hilos Main cuando éstos no son suficientes para consumir mensajes a un ritmo que mantenga los niveles de la cola en un determinado número de mensajes.

Para implementarlo vamos a añadir en el código una tarea encargada de comprobar de forma periódica el tamaño de la cola y en función del resultado creará nuevos hilos Backing. Podríamos realizar el chequeo usando la propia librería puka, pero en este caso vamos a aprovechar la api web que nos aporta el plugin de management de RabbitMQ del que ya hablamos en entradas previas.

El plugin nos proporciona un interfaz web que acepta diferentes métodos y operaciones: api-web-management
Basándonos en esa api y usando la librería requests de python (http://docs.python-requests.org) podemos escribir el siguiente código que va a devolvernos el número de mensajes que contiene una cola:

def get_messages_count_in_queue(queue):
    """ Get messages at Queue through RabbitMQ api"""
    try:
        url = 'http://' + BROKER_HOST + ':15672/api/queues/%2f/' + queue
        r = requests.get(url, auth=('guest', 'guest'))
        if r.status_code == 200:
            try:
                messages_count = r.json()['messages']
            except:
                messages_count = 0
        else:
            print "Error at RabbitMQ api. Code: %d" % (r.status_code)
            messages_count = 0
        return messages_count
    except Exception as error:
        print "Error checking messages at queue: %s" % str(error)
        return 0

Usando el código anterior podemos implementar una lógica como la que representa el siguiente diagrama:

:

El diagrama anterior se traduce en el siguiente código:

#!/usr/bin/env python

import puka
import time
import threading
import requests

BROKER_HOST = 'broker'
QUEUE = 'QUEUE-1'
BROKER_RETRY_CONNECTION = 10
MAIN_WORKER_NUMBER = 4
MAX_BACKING_WORKER_NUMBER = 10
WATCHDOG_CHECK_INTERVAL = 30
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'
QUEUE_SIZE_THRESHOLD = 10000

def get_messages_count_in_queue(queue):
    """ Get messages at Queue through RabbitMQ api"""
    try:
        url = 'http://' + BROKER_HOST + ':15672/api/queues/%2f/' + queue
        r = requests.get(url, auth=('guest', 'guest'))
        if r.status_code == 200:
            try:
                messages_count = r.json()['messages']
            except:
                messages_count = 0
        else:
            print "Error at RabbitMQ api. Code: %d" % (r.status_code)
            messages_count = 0
        return messages_count
    except Exception as error:
        print "Error checking messages at queue: %s" % str(error)
        return 0

def workflow(thread_name, thread_type, msg):
    """ Workflow for every message read from queue """
    try:
        if thread_type == 'main':
            print "[%s] Main thread read from queue: %s" % (thread_name, msg)
        elif thread_type == 'backing':
            print "(%s) Backing thread read from queue: %s" % (thread_name, msg)
    except Exception as error:
        print "Error: %s" % str(error)

class Main_worker(threading.Thread):
    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
                    print '[%s] Succesfully Connected at broker %s' % (self.threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise)
                    self.client.basic_ack(self.result)
                    workflow(thread_name=self.threadName, thread_type='main', msg=self.result['body'])
                except Exception as error:
                    print '[%s] Error at consumer: %s.....retrying connection after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)

class Backing_worker(threading.Thread):
    BROKER_CONNECTION = 0

    global queue_size
    global backing_threads_count

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.threadName = threading.currentThread().getName()
        while queue_size > QUEUE_SIZE_THRESHOLD:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
                    print '[--%s--] Succesfully Connected at broker %s' % (self.threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[--%s--] Error at broker connection: %s.....retrying after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise, timeout=2)
                    self.client.basic_ack(self.result)
                    workflow(thread_name=self.threadName, thread_type='backing', msg=self.result['body'])
                except puka.ResourceError as error:
                    print '[--%s--] Error at consumer: %s.....retrying connection after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
        backing_threads_count -=1


print 'Starting %d main consumer threads...' % (MAIN_WORKER_NUMBER)
for i in range (MAIN_WORKER_NUMBER):
    td = Main_worker()
    td.start()

while True:
    backing_threads_count = 0
    queue_size = get_messages_count_in_queue(QUEUE)
    if queue_size > QUEUE_SIZE_THRESHOLD and backing_threads_count < MAX_BACKING_WORKER_NUMBER:
        print "Queue size exceeds the threshold value. Starting new backing thread"
        td = Backing_worker()
        td.start()
        backing_threads_count+=1
    time.sleep(WATCHDOG_CHECK_INTERVAL)

Vamos a analizar el código, comenzando por algunos de los nuevos parámetros:

MAIN_WORKER_NUMBER: representa el numero de hilos fijos que van a componer el pool Main de consumidores. Se inicializa al comienzo y en todo momento van a estar leyendo mensajes de la cola.
MAX_BACKING_WORKER_NUMBER: representa el número máximo de hilos "de refuerzo" ejecutándose que vamos a permitir. Dicho de otro modo el máximo tamaño del pool Backing
WATCHDOG_CHECK_INTERVAL: representa el periodo de tiempo entre cada chequeo para comprobar el número de mensajes almacenados en la cola.
QUEUE_SIZE_THRESHOLD: representa el límite máximo de mensajes encolados. Si el tamaño de la cola excede este valor al realizar la comprobación se creará un nuevo thread "de refuerzo" siempre que el número de threads de refuerzo no exceda el valor MAX_BACKING_WORKER_NUMBER

Por otro lado hemos definido 2 clases, una para los Main threads y otra para los Backing threads. Las clases son similares, salvo el detalle del método run():

- En el caso de los Main threads el bucle while se ejecuta indefinidamente, es decir, este tipo de hilos siempre van a estar "vivos".
- En el caso de los Backing threads el bucle while se ejecuta mientras el tamaño de la cola exceda del valor establecido como umbral (QUEUE_SIZE_THRESHOLD). Una vez que el tamaño de la cola sea menor de ese valor el hilo termina.

while True:
vs

while queue_size > QUEUE_SIZE_THRESHOLD

Para que los Backing threads puedan tener acceso a las variables queue_size y backing_threads_count las definimos como globales dentro de la clase:

global queue_size
global backing_threads_count
En los referente al workflow que va a ejecutar cada hilo, hacemos que cada hilo especifique al método qué clase de hilo es (Main o Backing simplemente para trazar de diferente la ejecución de uno y otro con corchetes o paréntesis.

Por último un detalle referente al parámetro prefetch_count de la librería puka:

self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
El parámetro prefetch_count representa el número de mensajes "reservados" para consumir en un canal de RabbitMQ, para los cuales no hemos enviado ack. Vamos a desarrollarlo un poco más a fondo.

Si ejecutamos el código de RabbitMQ consumidores y publicamos mensajes en una cola de RabbitMQ tendremos algo como lo siguiente:



Si nos fijamos tenemos 3 contadores:

- Ready: mensajes que pueden ser consumidos.
- Unacked: mensajes reservados para consumir en un canal, para los cuales no se ha recibido confirmación de haberlos consumido.
- Total: total de mensajes.

Si en la situación anterior con digamos 100k mensajes ejecutáramos nuestro código, si la ejecución del workflow tardara demasiado podría suceder que cada uno de los 4 Main threads reserve 25k mensajes para ser consumidos, de forma que tendríamos 100k mensajes Unacked y 0 mensajes Ready, con lo cual, aunque se creen nuevos threads Backing de refuerzo, estos no podrían consumir ningún mensajes, salvo que se publicaran nuevos mensajes en la cola. Es por ello que hacemos uso del prefecth_count

Podéis encontrar información más detallada al respecto en la propia página de RabbitMQ: https://www.rabbitmq.com/consumer-prefetch.html

Probad a ejecutar el código incluyendo algún time sleep de retardo en el método workflow() y "acelerad" la publicación de mensajes usando el código de https://codigo-python.blogspot.com.es/2017/11/colas-rabbitmq-threads-ii.html y comprobad como al crecer la cola se crean los hilos de refuerzo.

21 de noviembre de 2017

PyMongo - operaciones de lectura

Continuando con la entrada anterior vamos a ver algunas operaciones más que podemos llevar a cabo con la librería de pymongo.
Vamos a comenzar con las operaciones de lectura, lo que en términos de SQL serían las operaciones SELECT.

En este primer ejemplo vamos a ver como obtener todos los registros de una colección sin especificar ningún parámetro.

En este caso en la colección USERS de la base de datos TEST solo tenemos 3 registros que hemos guardado previamente utilizando el código que vimos en PyMongo - parte 1.

#!/usr/bin/env python

import pymongo

MONGODB_HOST = '192.168.0.169'
MONGODB_PORT = '27017'
MONGODB_TIMEOUT = 1000
MONGODB_DATABASE = 'TEST'

URI_CONNECTION = "mongodb://" + MONGODB_HOST + ":" + MONGODB_PORT +  "/"

try:
    client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT, maxPoolSize=10)
    client.server_info()
    print 'OK -- Connected to MongoDB at server %s' % (MONGODB_HOST)
except pymongo.errors.ServerSelectionTimeoutError as error:
    print 'Error with mongoDB connection: %s' % error
except pymongo.errors.ConnectionFailure as error:
    print 'Could not connect to MongoDB: %s' % error

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find(condition)
    for entry_json in result:
        print "Document got from collection %s: %s" % (destination, entry_json)
except Exception as error:
    print "Error getting data: %s" % str(error)

Si ejecutamos el código obtendremos los 3 documentos de la colección, en este caso:

OK -- Connected to MongoDB at server 192.168.0.169
Document got from collection USERS: {u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
Document got from collection USERS: {u'city': u'Paris', u'_id': ObjectId('5a11debd9ea4cbaa30bf01ff'), u'surname': u'Mouse', u'name': u'Mickey'}
Document got from collection USERS: {u'city': u'New York', u'_id': ObjectId('5a11df209ea4cbaa30bf0214'), u'surname': u'Kent', u'name': u'Clark'}
Lo primero que debe llamarnos la atención es el hecho de que nosotros hemos guardado en mongo un documento JSON con strings como valores y al recuperar estos datos con pymongo estamos obteniendo unicode.

La razón de esto es que mongo almacena los datos internamente en formato BSON (Binary JSON) y los string los codifica con UTF-8.

Python codifica unicode usando también UTF-8, de este modo para asegurar la compatibilidad pymongo devuelve unicode. Para obtener más información sobre unicode en python puede consultarse el siguiente link: https://docs.python.org/2/howto/unicode.html

La conversión de unicode a string y viceversa va a depender de la codificación empleada. En este caso es tan sencillo como convertir directamente (omitimos la primera parte del código en los posteriores ejemplos):

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find(condition)
    print ''
    for entry_json in result:
        print "Found document at %s collection with next values:" % destination
        for key in entry_json:
            print ' '  + key + ' : ' + str(entry_json[key])
        print ''
except Exception as error:
    print "Error getting data: %s" % str(error)
Al ejecutarlo ya podremos ver los datos guardados como strings:

OK -- Connected to MongoDB at server 192.168.0.169

Found document at USERS collection with next values:
 city : New York
 _id : 5a1174a5fbb7132f5c7d091c
 surname : Wick
 name : John

Found document at USERS collection with next values:
 city : Paris
 _id : 5a11debd9ea4cbaa30bf01ff
 surname : Mouse
 name : Mickey

Found document at USERS collection with next values:
 city : New York
 _id : 5a11df209ea4cbaa30bf0214
 surname : Kent
 name : Clark
El método find() de la librería pymongo que acabamos de emplear devuelve un objecto de tipo cursor que podemos recorrer con un bucle for según hemos visto en el código anterior.

Para saber cuantos documentos vamos a encontrarnos al recorrer un cursor podemos invocar result.count() como vemos en el siguiente código. En caso de que no exista ningún documento el resultado de result.count() será 0.

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find(condition)
    print "Documents found: %d" % result.count()
    print ''
    for entry_json in result:
        print "Found document at %s collection with next values:" % destination
        for key in entry_json:
            print ' '  + key + ' : ' + str(entry_json[key])
        print ''
except Exception as error:
    print "Error getting data: %s" % str(error)
Al ejecutarlo obtendremos el número de elementos encontrados (remarcado en negrita) antes de recorrer el cursor para mostrarlos:

OK -- Connected to MongoDB at server 192.168.0.169
Documents found: 3

Found document at USERS collection with next values:
 city : New York
 _id : 5a1174a5fbb7132f5c7d091c
 surname : Wick
 name : John

Found document at USERS collection with next values:
 city : Paris
 _id : 5a11debd9ea4cbaa30bf01ff
 surname : Mouse
 name : Mickey

Found document at USERS collection with next values:
 city : New York
 _id : 5a11df209ea4cbaa30bf0214
 surname : Kent
 name : Clark
El método find() acepta uno o varios parámetros de cara a llevar a cabo una búsqueda. En los ejemplos anterior teníamos condition={}, que viene a traducirse como "dame todos las entradas".

Para especificar una condición en la búsqueda lo haremos definiendo la condición como un diccionario indicando clave:valor. Así por ejemplo, si queremos sacar la lista de usuarios que viven en "New York" definiremos condition={'city':'New Yor'}:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {'city':'New York'}
    result = collection.find(condition)
    for entry_json in result:
        print entry_json
except Exception as error:
    print "Error getting data: %s" % str(error)
La salida sería:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
{u'city': u'New York', u'_id': ObjectId('5a11df209ea4cbaa30bf0214'), u'surname': u'Kent', u'name': u'Clark'}
Si queremos especificar más de un parámetro al ejecutar la búsqueda, basta incluir varios entradas clave:valor al diccionario. De este modo si definimos:

condition = {'city':'New York', 'name':'John'}
obtendremos un único documento JSON al recorrer el cursor result:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
En ocasiones puede que nos interese obtener un solo documento, bien porque sepamos que solamente hay uno que cumpla una cierta condición (clave única) o bien porque nos baste con obtener un solo documento.

En ese caso pymongo dispone del método find_one().

A diferencia de find() que siempre devolvía un cursor, la función find_one() va a devolver un solo documento o bien None si no existe ningún documento que cumpla las condiciones que le pasemos.

En caso de que lo invoquemos usando condition={} nos devolverá el primer documento de la colección:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find_one(condition)
    print result
except Exception as error:
    print "Error getting data: %s" % str(error)
Si ejecutamos el código obtendremos uno de los 3 elementos de la lista:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
Si definimos condition = {'city':'New York'} igual que hicimos antes con el método find() donde obtuvimos 2 documentos, vemos que en este caso obtendremos solamente uno:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = condition = {'city':'New York', 'name':'John'}
    result = collection.find_one(condition)
    print result
except Exception as error:
    print "Error getting data: %s" % str(error)
Efectivamente obtenemos un solo documento:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
Al emplear find_one() hay que contemplar la posibilidad de que nos devuelva un None:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = condition = {'city':'Tokio'}
    result = collection.find_one(condition)
    if result is None:
        print "None document found!!"
    else:
        print result
except Exception as error:
    print "Error getting data: %s" % str(error)
En este caso obtendremos el mensaje informando que no se ha encontrado ningún documento:

None document found!!

No vamos a entrar en más detalles, solo mencionar que también podemos usar las funciones find_one_and_delete(), find_one_and_replace() y find_one_and_update() cuya documentación podemos encontrar en http://api.mongodb.com/python/current/api/pymongo/collection.html

19 de noviembre de 2017

PyMongo - conexión

En esta nueva entrada vamos a ver como interactuar con la base de datos MongoDB usando python.

MongoDB es una base de datos NoSQL y gracias a su buen rendimiento y escalado podemos considerarla como una alternativa posible dentro del mundo del Big Data. Tiene una amplia documentación disponible en su web

Para poder trabajar contra MongoDB vamos a emplear la librería pymongo: https://pypi.python.org/pypi/pymongo

Pymongo tiene varias virtudes, una de ellas es que es "thread-safe", es decir, que es apta para trabajar con aplicaciones multi-hilo.
En su lista de faq podéis encontrar esta junto a otras muchas features.

El primer paso para trabajar con pymongo es instalar la librería.
La forma más cómoda de instalarla es a través de pip ejecutando pip install pymongo, pero como siempre podemos optar por descargar los paquetes de la web y hacerlo de forma manual tanto en sistemas linux como windows: https://pypi.python.org/pypi/pymongo

Una vez instalada la librería ya podemos comenzar a escribir código.
Vamos a suponer que contamos con una instalación básica de mongo que no requiere autenticación para conectarse. El código para conectarse a MongoDB quedaría del siguiente modo:

#!/usr/bin/env python

import pymongo

MONGODB_HOST = '192.168.0.169'
MONGODB_PORT = '27017'
MONGODB_TIMEOUT = 1000

URI_CONNECTION = "mongodb://" + MONGODB_HOST + ":" + MONGODB_PORT +  "/"

try:
    client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT)
    client.server_info()
    print 'OK -- Connected to MongoDB at server %s' % (MONGODB_HOST)
    client.close()
except pymongo.errors.ServerSelectionTimeoutError as error:
    print 'Error with MongoDB connection: %s' % error
except pymongo.errors.ConnectionFailure as error:
    print 'Could not connect to MongoDB: %s' % error

Con el código anterior simplemente nos conectamos a la base de datos TEST (en caso de que no exista se crea de forma automática) y desconectamos.

Los usuarios de windows es posible que obtengan errores al incorporar el parámetro
serverSelectionTimeoutMS=MONGODB_TIMEOUT
dependiendo de la versión de la librería. En ese caso basta con eliminarlo y dejar simplemente
client = pymongo.MongoClient(URI_CONNECTION)

Al ejecutar el código, si todo va bien obtendremos un mensaje de confirmación:

OK -- Connected to MongoDB at server 192.168.0.169
En caso de error, obtendremos también un mensaje que nos dará la pista de qué está fallando.

Un dato importante a tener en cuenta es que cuando invocamos
pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT)
la librería en realidad está creando un pool de conexiones basado en threads con un tamaño máximo determinado por el parámetro maxPoolSize, que tiene un valor por defecto de 100.

Es decir, la línea:

client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT)
es equivalente a

client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT, maxPoolSize=100)

Bien, ya tenemos casi todo listo para realizar operaciones sobre la base de datos. Antes de continuar vamos a hacer un inciso para apuntar algunas características de MongoDB, sobre todo de cara a la gente que viene del mundo SQL.

En primer lugar mongo almacena los datos en colecciones, que vienen a ser el equivalente a las tablas del mundo SQL. Dentro de esas colecciones MongoDB va a almacenar documentos, que serían el equivalente a los registros del mundo SQL. Estos documentos son en realidad estructuras JSON, que en términos de python, podemos traducir a diccionarios.

Otra de las caracteríticas de MongoDB es que no es necesario definir de forma previa la estructura que van a tener los documentos de una colección, de hecho, ni siquiera es necesario crear la colección, al guardar un documento JSON se creará automáticamente la colección, de modo análogo a lo que sucede con la propia base de datos, que también se creará de forma automática si no existiera.
Dicho de otro modo, al crear el primer documento de una colección dentro de una base de datos se crearían de forma automática tanto la colección como la base de datos en caso de que no existieran,

Teniendo en cuenta esto, para guardar algo en nuestro MongoDB lo único que tenemos que hacer es crear un diccionario, obtener una conexión del pool y ejecutar el insert correspondiente.

Vamos a crear un diccionario con una serie de claves y valores y lo guardaremos. En este caso, vamos a salvar en la colección 'USERS' de la base de datos 'TEST' un registro y vamos a comprobar cómo efectivamente se crean de forma automática tanto la base de datos como la colección.

Suponiendo que partimos de una instalación limpia de mongo si nos conectamos a la consola de mongo y le decimos que nos muestre las bases de datos tendremos:

> show databases;
admin  0.000GB
local  0.000GB
Vamos a ejecutar el código siguiente, el cual creará un documento en la colección USERS de la base de datos TEST:

#!/usr/bin/env python

import pymongo

MONGODB_HOST = '192.168.0.169'
MONGODB_PORT = '27017'
MONGODB_TIMEOUT = 1000
MONGODB_DATABASE = 'TEST'

URI_CONNECTION = "mongodb://" + MONGODB_HOST + ":" + MONGODB_PORT +  "/"

try:
    client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT, maxPoolSize=10)
    client.server_info()
    print 'OK -- Connected to MongoDB at server %s' % (MONGODB_HOST)
except pymongo.errors.ServerSelectionTimeoutError as error:
    print 'Error with mongoDB connection: %s' % error
except pymongo.errors.ConnectionFailure as error:
    print 'Could not connect to MongoDB: %s' % error

database_entry = {}
database_entry['name'] = 'John'
database_entry['surname'] = 'Wick'
database_entry['city'] = 'New York'

# or equivalent database_entry={'name':'John', 'surname':'Wick', 'city':'New York'}

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    collection.insert(database_entry)
    print "Data saved at %s collection in %s database: %s" % (destination, MONGODB_DATABASE, database_entry)
except Exception as error:
    print "Error saving data: %s" % str(error)
Al ejecutar el código, si todo va bien obtendremos algo como lo siguiente:

OK -- Connected to MongoDB at server 192.168.0.169
Data saved at USERS collection in TEST database: {'city': 'New York', '_id': ObjectId('5a1174a5fbb7132f5c7d091c'), 'surname': 'Wick', 'name': 'John'}
Si vamos a la consola de mongo, comprobamos que efectivamente se han creado la base de datos y la colección y que además tenemos el registro guardado:

> show databases;
TEST   0.000GB
admin  0.000GB
local  0.000GB

> use TEST;
switched to db TEST

> show collections;
USERS

> db.USERS.find({}).pretty();
{
 "_id" : ObjectId("5a1174a5fbb7132f5c7d091c"),
 "city" : "New York",
 "surname" : "Wick",
 "name" : "John"
}
Lo primero que nos llamará la atención es el campo '_id'. Es un campo que crea de forma automática mongo y que juega el papel de índice actuando como clave única.

En futuras entradas veremos como explotar la librería pymongo ejecutando diferentes operaciones sobre MongoDB.

18 de noviembre de 2017

Colas (RabbitMQ) - Publicando mensajes

Siguiendo con la interacción entre RabbitMQ y python usando la librería puka, en esta entrada vamos a ver como publicar mensajes.

En la anterior entrada vimos como crear un pool de threads que se conectaba a un broker de RabbitMQ, consumía mensajes y ejecutaba un workflow para cada mensaje consumido. En esta entrada veremos como crear un pool de threads que van a publicar mensajes.

En este caso lo que vamos a publicar son strings aleatorios de cierta longitud que vamos a generar con la función genrandom que acepta como parámetro la longitud con la que queramos generar los mensajes.

Estableceremos un tiempo de espera entre la publicación de mensaje que vendrá determinado por el valor de PUBLISHER_SLEEP_TIME en segundos. Este parámetro junto con el número de threads WORKER_NUMBER van a determinar el rate de publicación.

Igual que en el ejemplo de los consumidores, en caso de problemas con la conexión al broker se establece una política de reconexión con reintentos de conexión cada BROKER_RETRY_CONNECTION segundos.

De este modo el código quedaría del siguiente modo:

#!/usr/bin/env python

import puka
import time
import threading
import random
import string

BROKER_HOST = '192.168.0.169'
QUEUE = 'QUEUE-1'
BROKER_RETRY_CONNECTION = 10
WORKER_NUMBER = 4
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'
PUBLISHER_SLEEP_TIME = 0.2

def genrandom(length):
    chain = ''
    for i in range(length+1):
        chain = chain + random.choice(string.ascii_uppercase)
    return chain

class Threaded_worker(threading.Thread):
    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.client.wait(self.client.connect())
                    self.client.wait(self.client.queue_declare(QUEUE))
                    print '[%s] Succesfully Connected at broker %s' % (threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (
                    threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    msgToPublish = genrandom(40)
                    self.client.wait(self.client.basic_publish(exchange ='', routing_key=QUEUE, body=msgToPublish))
                    time.sleep(PUBLISHER_SLEEP_TIME)
                    print '[%s] Message published at queue: %s' % (threadName, msgToPublish)
                except Exception as error:
                    print '[%s] Error at publisher: %s.....retrying connection after %d seconds' % (
                    threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)


print 'Starting %d publisher threads...' % (WORKER_NUMBER)
for i in range(WORKER_NUMBER):
    td = Threaded_worker()
    td.start()


Al ejecutarlo obtendremos lo siguiente:

Starting 4 publisher threads...
[Thread-1] Succesfully Connected at broker 192.168.0.169
[Thread-3] Succesfully Connected at broker 192.168.0.169
[Thread-2] Succesfully Connected at broker 192.168.0.169
[Thread-4] Succesfully Connected at broker 192.168.0.169
[Thread-1] Message publish at queue: YKHYRBRJWQLOLSREOGONKZICBVOQBXHUALGXCWSIT
[Thread-3] Message publish at queue: AXCLEQMGONKZUPNNOIFFZTLZBMAPRRNCELSWVORAU
[Thread-2] Message publish at queue: HAXNHDXBVJVUVOLTSYRTGEZMXGSWLOTIAMDWIQCQZ
[Thread-4] Message publish at queue: MDRJVZBHFHHCIPNXHFATUHEQWBQSJQWVOVTVIAEMA
[Thread-1] Message publish at queue: SBIRZMHIOGQDTQVSZKGGZICXJIEEXVLCCGFHONQIE
[Thread-3] Message publish at queue: WPLUWHCJWEBYETTDWFIYGBEERADWFVGPQTHPRJIKR
[Thread-2] Message publish at queue: JXGJBYEIUQRZRLFKUSKRPAMKSTZDZGOJIBFGKVLEM
[Thread-4] Message publish at queue: OELBGLBPYFTEBZVKGCGUTBCLQDXRCQLPLPDJDRRRM
........
Con el valor PUBLISHER_SLEEP_TIME = 0.2 deberíamos tener un rate de publicación de 5 mensajes por cada hilo, de forma que si fijamos WORKER_NUMBER = 4, al ejecutarse el código deberían publicarse 20 mensajes por segundo.

Efectivamente, si lo lanzamos y vamos a la web de administración de RabbitMQ vemos que el rate de publicación es el esperado:


20 de mayo de 2017

Colas (RabbitMQ) - Consumidores

En entradas anteriores vimos como trabajar con la librería de colas de python. La librería nos proporciona una funcionalidad bastante completa, pero de cara a desplegar en un entorno de producción un sistema de colas existen soluciones más completas en lo que s refiera a rendidmiento y funcionalidad.

Este es el caso de RabbitMQ. Se trata de un sistema de colas basado en el estándar AMQP. Podemos instalar RabbitMQ como un nodo aislado o bien como un cluster distribuido. En su web podemos encontrar toda la información detallada para su instalación y administración, así como una lista de plugins que ofrecen una funcionalidad adicional.
Cuenta con paquetes para prácticamente todas las plataformas: https://www.rabbitmq.com/download.html

RabbitMQ está escrito en Erlang, de este modo es necesario instalarlo previamente: https://www.erlang.org/downloads

Una vez instalado, recomiendo activar el plugin de administración (management), puesto que proporciona una interfaz web que permite administrarlo de forma cómoda: podemos crear nuevas colas, administrarlas, ver el número de mensajes en cada cola y la tasa de publicación/consumo de mensajes, etc...



Una vez instalado RabbitMQ vamos crear un código python que nos va a permitir insertar mensajes en una cola y por otro lado un código que va a extraer mensajes de la cola y va a procesarlos.

¿Para qué puede servirnos esto? Por ejemplo, si retomamos el ejemplo del código del socket-server podemos hacer que el socket-server meta en la cola cada mensaje que recibamos y posteriormente definir un pool de threads que vaya sacando los mensajes de la cola y los procese.

En la misma web de RabbitMQ podemos encontrar diferentes ejemplos empleando diferentes lenguajes. En el caso de python, veréis que se emplea la librería pika. Es una opción perfectamente válida, no obstante en mi caso después de algunas pruebas me he decantado por usar puka.

Bien, vamos pues a la tarea. Objetivo: crear un grupo de threads que lean mensajes de la cola y ejecuten un workflow para cada uno de ellos. El código quedaría del siguiente modo:


#!/usr/bin/env python

import puka
import time
import threading

BROKER_HOST = '192.168.1.45'
QUEUE = 'COLA-1'
BROKER_RETRY_CONNECTION = 10
WORKER_NUMBER = 4
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'

def workflow(threadName, msg):
    """ Workflow for every message read from queue """
    try:
        print "[%s] Read from queue: %s" % (threadName, msg)
    except Exception as error:
        print "Error: %s" % str(error)
 

class Threaded_worker(threading.Thread):

    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE)
                    print '[%s] Succesfully Connected at broker %s' % (threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise)
                    self.client.basic_ack(self.result)
                    workflow(threadName, self.result['body'])
                except Exception as error:
                    print '[%s] Error at consumer: %s.....retrying connection after %d seconds' % (threadName, str(error),BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)

print 'Starting %d consumer threads...' % (WORKER_NUMBER)
for i in range(WORKER_NUMBER):
    td = Threaded_worker()
    td.start()




Vamos con los detalles: los parámetros BROKER_HOST y QUEUE definen la IP del servidor donde corre RabbitMQ y el nombre de la cola de la que queremos leer los mensajes.

En caso de problema con la conexión al broker (RabbitMQ), ya sea en el instante inicial como durante la operativa, se establece un tiempo de espera para reestablecerla, el cual viene dado por el parámetro BROKER_RETRY_CONNECTION. El parámetro BROKER_CONNECTION nos sirve para almacenar el estado de la conexión.

El parámetro WORKER_NUMBER establece el número de threads o hilos. Cada uno de ellos establecerá una conexión con el broker y cuando haya mensajes los consumirán y para cada uno de ellos ejecutarán el código definido en el método workflow, en este caso simplemente imprimirán el mensaje leído.

¿Qué sucede si el número de hilos no es suficiente para consumir los mensajes? Sería bueno poder aumentar el número de consumidores en función del tamaño de la cola. Esto queda resuelto en la entrada https://codigo-python.blogspot.com.es/2017/12/colas-rabbitmq-numero-variable-de_11.html

En futuras entradas ampliaremos funcionalidad del método workflow, haciendo por ejemplo, que almacene en una BD mongo cada mensaje leído.

2 de febrero de 2017

Socket server TCP multi-thread - logger

Ampliando el código de la entrada anterior vamos a extender la funcionalidad del socket server.
Como ya vimos en la entrada previa, construíamos la lógica del socket server extendiendo varias clases del módulo SocketServer y sobrescribiendo métodos según el diagrama siguiente:

En este caso, apoyándonos en la misma estructura de clases y métodos vamos a ampliar la funcionalidad guardando en un fichero de logs lo recibido a través del socket server, estableciendo además un límite máximo de clientes simultáneos a los que el socket server atenderá.

Vayamos paso a paso.

Logs:
Emplearemos el módulo logging el cual nos va a facilitar el trabajo.
Los parámetros principales parámetros que debemos indicar son el directorio donde almacenarlos y el nombre del fichero. Aprovechando la potencia del módulo vamos a hacer que los el fichero de logs rote de forma automática cada noche y manteniendo un máximo número de ficheros, de forma que el propio módulo borre aquellos más antiguos de x días.
De este modo tendremos los siguientes parámetros en nuestro código:
LOG_FOLDER = '/var/log/socket-server/'
LOG_FILE = 'socket-server.log'
ROTATE_TIME = 'midnight'
LOG_COUNT = 10

Con los parámetros anteriores ya podemos "armar" nuestro logs:
import os
import logging, logging.handlers

log_folder = os.path.dirname(LOG_FOLDER)

if not os.path.exists(log_folder):
 try:
  os.makedirs(log_folder)
 except Exception as error:
  print 'Error creating the log folder: %s' % error

try:
 logger = logging.getLogger('socket-server')
 loggerHandler = logging.handlers.TimedRotatingFileHandler(LOG_FOLDER + LOG_FILE , ROTATE_TIME, 1, backupCount=LOG_COUNT)
 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
 loggerHandler.setFormatter(formatter)
 logger.addHandler(loggerHandler)
 logger.setLevel(logging.DEBUG)
except:
 print '------------------------------------------------------------------'
 print '[ERROR] Error writing log at %s' % INTERNAL_LOG
 print '[ERROR] Please verify path folder exits and write permissions'
 print '------------------------------------------------------------------'
 exit() 

Lo que hacemos con el código anterior es comprobar si el directorio existe y en caso de no existir lo intentamos crear, trazando cualquier error que pudiera producirse al crearlo.
Por ejemplo, en sistemas unix, si el usuario que ejecuta el código no tiene permiso de escritura sobre el directorio que contendrá al directorio de logs obtendremos un error como el siguiente:
Error creating the log folder: [Errno 13] Permission denied: '/var/log/socket-server'

Por ello debemos asegurarnos que corremos el código con usuario con permisos suficientes.
El resto del código aportan la lógica comentada antes de rotación, número de ficheros de logs, etc..

Veamos a continuación como incluirlo en el código del socket server incluyendo además el límite de hilos máximos en ejecución o dicho de otro modo el número de clientes simultáneos que queremos atender:

#!/usr/bin/env python

import threading
import SocketServer, socket
import sys
import os
import logging, logging.handlers

TIMEOUT = 10
HOST = '0.0.0.0'
PORT = 3456

MAX_THREADS = 50

LOG_FOLDER = '/var/log/socket-server/'
LOG_FILE = 'socket-server.log'
ROTATE_TIME = 'midnight'
LOG_COUNT = 10

log_folder = os.path.dirname(LOG_FOLDER)

if not os.path.exists(log_folder):
 try:
  os.makedirs(log_folder)
 except Exception as error:
  print 'Error creating the log folder: %s' % error
        exit()

try:
 logger = logging.getLogger('socket-server')
 loggerHandler = logging.handlers.TimedRotatingFileHandler(LOG_FOLDER + LOG_FILE , ROTATE_TIME, 1, backupCount=LOG_COUNT)
 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
 loggerHandler.setFormatter(formatter)
 logger.addHandler(loggerHandler)
 logger.setLevel(logging.DEBUG)
except Exception as error:
 print '------------------------------------------------------------------'
 print '[ERROR] Error writing log at %s: %s' % (LOG_FOLDER, error)
 print '[ERROR] Please verify path folder exits and write permissions'
 print '------------------------------------------------------------------'
 exit()


class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        try:
            # chequeamos el numero de threads activos. Si es mayor que el limite establecido cerramos la conexion y no atendemos al cliente. Lo trazamos
            if threading.activeCount() > MAX_THREADS:
                logger.warn('%s -- Execution threads number: %d', threading.currentThread().getName(),
                            threading.activeCount() - 1)
                logger.warn('Max threads number as been reached.')
                self.closed()
            # si no hemos alcanzado el limite lo atendemos
            else:
                threadName = threading.currentThread().getName()
                activeThreads = threading.activeCount() - 1
                clientIP = self.client_address[0]
                logger.info('[%s] -- New connection from %s -- Active threads: %d' , threadName, clientIP, activeThreads)
                data = self.request.recv(1024)
                logger.info('[%s] -- %s -- Received: %s' , threadName, clientIP, data)
                response = 'Thanks %s, message received!!' % clientIP
                self.request.send(response)
        except Exception, error:
            if str(error) == "timed out":
                logger.error ('[%s] -- %s -- Timeout on data transmission ocurred after %d seconds.' ,threadName, clientIP, TIMEOUT)

class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)

    def finish_request(self, request, client_address):
        request.settimeout(TIMEOUT)
        SocketServer.TCPServer.finish_request(self, request, client_address)
        SocketServer.TCPServer.close_request(self, request)

try:
    print "Starting server TCP at IP %s and port %d..." % (HOST,PORT)
    server = ThreadedTCPServer((HOST, PORT), RequestHandler)
    server.serve_forever()
except KeyboardInterrupt:
    server.socket.close()

Las etiquetas empleadas son bastante descriptivas:
logger.info() ---> entradas con etiqueta INFO en el fichero de logs
logger.warn() ---> entradas con etiqueta WARN en el fichero de logs
logger.error() ---> entradas con etiqueta ERROR en el fichero de logs


En el fichero /var/log/socket-server/socket-server.log tendremos contenido del siguiente tipo a medida que vayan conectando diferentes clientes:
2017-02-02 21:24:29,701 INFO [Thread-1] -- New connection from 192.168.0.143 -- Active threads: 1
2017-02-02 21:24:35,704 INFO [Thread-1] -- 192.168.0.143 -- Received: hola, que tal?
2017-02-02 21:24:46,697 INFO [Thread-2] -- New connection from 192.168.0.145 -- Active threads: 1
2017-02-02 21:24:51,331 INFO [Thread-2] -- 192.168.0.145 -- Received: buenos dias
2017-02-02 21:31:30,555 INFO [Thread-3] -- New connection from 192.168.0.89 -- Active threads: 1
2017-02-02 21:31:40,561 ERROR [Thread-3] -- 192.168.0.89 -- Timeout on data transmission ocurred after 10 seconds.


Si dejáramos la aplicación corriendo varios días como servicio (más adelante veremos como hacerlo) veríamos como el fichero de logs rota cada noche añadiéndose al nombre del fichero al rotar la fecha para tenerlos de este modo clasificados. Al tener más de 10 ficheros de logs al rotar se borrarían automáticamente los más antiguos.

23 de noviembre de 2016

Socket server TCP multi-thread

En las entradas anteriores creábamos un número fijo de hilos o threads encargados de ejecutar tareas simples.
También vimos cómo crear un número fijo de hilos o threads productores y consumidores de colas y un ejemplo en el que se creaban un número fijo de hilos o threads asignados a diferentes tareas.

El escenario que se plantea en este caso es el de un socket server TCP que va a recibir conexiones TCP y que debe crear "al vuelo" un thread o hilo para atender cada petición. En esta primera parte simplemente veremos como crear ese hilo y trazaremos el número de hilos vivos (al tratarse de un socket server pueden conectar varios clientes de forma simultánea), las IPs de los clientes conectados, y los datos recibidos.
El código sería el siguiente:
#!/usr/bin/env python

import threading
import SocketServer, socket
import sys

TIMEOUT = 10
HOST = '192.168.1.37'
PORT = 3456

class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        try:
            threadName = threading.currentThread().getName()
            activeThreads = threading.activeCount() - 1
            clientIP = self.client_address[0]
            print '[%s] -- New connection from %s -- Active threads: %d' % (threadName, clientIP, activeThreads)
            data = self.request.recv(1024)
            print '[%s] -- %s -- Received: %s' % (threadName, clientIP, data)
            response = 'Thanks %s, message received!!' % clientIP
            self.request.send(response)
        except Exception, error:
            if str(error) == "timed out":
                print '[%s] -- %s -- Timeout on data transmission ocurred after %d seconds.' % (threadName, clientIP, TIMEOUT)

class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)

    def finish_request(self, request, client_address):
        request.settimeout(TIMEOUT)
        SocketServer.TCPServer.finish_request(self, request, client_address)
        SocketServer.TCPServer.close_request(self, request)

try:
    print "Starting server TCP at IP %s and port %d..." % (HOST,PORT)
    server = ThreadedTCPServer((HOST, PORT), RequestHandler)
    server.serve_forever()
except KeyboardInterrupt:
    server.socket.close()

Veamos el detalle paso a paso. En primer lugar tenemos los parámetros de la conexión TCP:
TIMEOUT = 10  --> timeout para que el cliente transmita los datos una vez establecida la conexión.
HOST = '0.0.0.0'  --> IP en la que el socket server va a escuchar. En este caso 192.168.1.37.
PORT = 3456  --> puerto en el que el socket server va a escuchar. Escoged cualquier puerto libre en el equipo.

La lógica se encuentra en el método handle() de la clase RequestHandler : obtenemos el nombre del thread que creamos para atender la conexión, el número de threads en ejecución (en próximas entradas lo usaremos para limitar el número máximo de threads en ejecución) y la IP del cliente. Leemos los datos que nos envía el cliente, los pintamos, le contestamos indicando que hemos recibido el mensaje y cerramos la conexión.
Para probarlo ejecutamos el código y desde la propia máquina o bien desde otro equipo abrimos una conexión. En mi caso el socket server corre en la IP 192.168.1.37 y el cliente conectará desde la IP 192.168.1.254
Arrancamos el socket server ejecutando el código anterior:
Starting server TCP at IP 192.168.1.37 and port 3456...

A continuación desde el cliente hacemos un telnet al puerto 3456 de la 192.168.1.37 y mandamos un "hola":
pi@raspberrypi:~ $telnet 192.168.1.37 3456
Trying 192.168.1.37...
Connected to 192.168.1.37.
Escape character is '^]'.
hola
Thanks 192.168.1.254, message received!!Connection closed by foreign host.
En el lado servidor tendremos:
Starting server TCP at IP 192.168.1.37 and port 3456...
[Thread-1] -- New connection from 192.168.1.254 -- Active threads: 1
[Thread-1] -- 192.168.1.254 -- Received: hola

Si abrimos nuevas conexiones se crearán nuevos threads para atenderlas. Podéis comprobarlo.

Vamos a intentar explicar con detalle el código anterior.
A primera vista vemos que buena parte de la lógica está incluida en el módulo SocketServer que importamos inicialmente:

import SocketServer
El fichero SocketServer.py (es un módulo de python) forma parte de la instalación estándar de python y podéis acceder a su contenido completo. En el caso de un sistemas unix lo encontraremos en el directorio de instalación, habitualmente en /usr/lib/python2.7/SocketServer.py.

Todo se inicia invocando a la clase ThreadedTCPServer:

try:
    print "Starting server TCP at IP %s and port %d..." % (HOST,PORT)
    server = ThreadedTCPServer((HOST, PORT), RequestHandler)
    server.serve_forever()
except KeyboardInterrupt:
    server.socket.close()
En este caso ThreadedTCPServer es una clase que hemos definido, y que mediante herencia múltiple, hereda de las clases ThreadingMixIn y TCPServer del módulo SocketServer, lo que se traduce en:

class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)

    def finish_request(self, request, client_address):
        request.settimeout(TIMEOUT)
        SocketServer.TCPServer.finish_request(self, request, client_address)
        SocketServer.TCPServer.close_request(self, request)

Si accedemos al contenido del fichero SocketServer.py vemos que la clase TCPServer a su vez hereda de la clase BaseServer, que también pertenece al módulo SocketServer. Es decir, a nivel de módulos y clases tenemos los siguientes elementos:
Y atendiendo a la jerarquía de clases tendríamos:
Además estamos sobrescribiendo los métodos server_bind() y finish_request(). De nuevo, revisando el fichero SocketServer.py podemos ubicar estos métodos dentro de sus correspondientes clases:
Las razones para sobrescribir estos métodos son, en el caso de server_bind, permitir poner el puerto a la escucha aunque tengamos conexiones en modo WAIT_TIME, después de haber parado el servidor y en el caso de finish_request poder fijar un TIMEOUT para el socket. Si no quisiéramos modificar nuestro código para incluir no sería necesario sobrescribir los métodos.

El siguiente elemento en el que debemos reparar es el la clase RequestHandler que como vemos en su definición extiende la clase BaseRequestHandler del módulo SocketServer. Además, vemos que estamos sobrescribiendo el método handle() de la clase BaseRequestHandler que va a ser el que contenga la lógica a ejecutar con cada nueva conexión. Con lo cual tenemos un elemento más en nuestro diagrama:
Por tanto vemos que el módulo SocketServer realmente nos aporta prácticamente todos los ingredientes.
Nosotros únicamente vamos a definir nuestras propias clases ThreadedTCPServer y RequestHandler extendiendo clases existentes existentes en SocketServer para poder añadir la lógica deseada.
De este modo, con todos los "ingredientes" la receta quedaría del siguiente modo:
Siguiendo con esta "autopsia" que estamos haciendo a nuestro socket server multi-hilo, vamos a detenernos en la clase ThreadingMixIn del módulo SocketServer.
El código completo de la clase es el siguiente (copiado del fichero SocketServer.py):

class ThreadingMixIn:
    """Mix-in class to handle each request in a new thread."""

    # Decides how threads will act upon termination of the
    # main process
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
            self.shutdown_request(request)
        except:
            self.handle_error(request, client_address)
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start()
Como vemos esta clase va a ser la clase encargada de crear un nuevo hilo o thread para conexión recibida.
La tarea de creación de un nuevo hilo no es demasiado costosa, pero ¿no sería mejor tener un pool de hilos creados, de forma que al recibir una conexión ésta sea atendida por uno de esos hilos? Con esto nos ahorraríamos la tarea de estar creando hilos......en futuras entradas veremos como abordar esta tarea modificando la clase y haciendo uso del módulo Queue. En la entrada socket-server-logger hemos añadido la funcionalidad de establecer un sistema de logs con rotado automático cada noche. También incluye la funcionalidad de establecer un número máximo de hilos activos.