20 de mayo de 2017

Colas (RabbitMQ) - Consumidores

En entradas anteriores vimos como trabajar con la librería de colas de python. La librería nos proporciona una funcionalidad bastante completa, pero de cara a desplegar en un entorno de producción un sistema de colas existen soluciones más completas en lo que s refiera a rendidmiento y funcionalidad.

Este es el caso de RabbitMQ. Se trata de un sistema de colas basado en el estándar AMQP. Podemos instalar RabbitMQ como un nodo aislado o bien como un cluster distribuido. En su web podemos encontrar toda la información detallada para su instalación y administración, así como una lista de plugins que ofrecen una funcionalidad adicional.
Cuenta con paquetes para prácticamente todas las plataformas: https://www.rabbitmq.com/download.html

RabbitMQ está escrito en Erlang, de este modo es necesario instalarlo previamente: https://www.erlang.org/downloads

Una vez instalado, recomiendo activar el plugin de administración (management), puesto que proporciona una interfaz web que permite administrarlo de forma cómoda: podemos crear nuevas colas, administrarlas, ver el número de mensajes en cada cola y la tasa de publicación/consumo de mensajes, etc...



Una vez instalado RabbitMQ vamos crear un código python que nos va a permitir insertar mensajes en una cola y por otro lado un código que va a extraer mensajes de la cola y va a procesarlos.

¿Para qué puede servirnos esto? Por ejemplo, si retomamos el ejemplo del código del socket-server podemos hacer que el socket-server meta en la cola cada mensaje que recibamos y posteriormente definir un pool de threads que vaya sacando los mensajes de la cola y los procese.

En la misma web de RabbitMQ podemos encontrar diferentes ejemplos empleando diferentes lenguajes. En el caso de python, veréis que se emplea la librería pika. Es una opción perfectamente válida, no obstante en mi caso después de algunas pruebas me he decantado por usar puka.

Bien, vamos pues a la tarea. Objetivo: crear un grupo de threads que lean mensajes de la cola y ejecuten un workflow para cada uno de ellos. El código quedaría del siguiente modo:


#!/usr/bin/env python

import puka
import time
import threading

BROKER_HOST = '192.168.1.45'
QUEUE = 'COLA-1'
BROKER_RETRY_CONNECTION = 10
WORKER_NUMBER = 4
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'

def workflow(threadName, msg):
    """ Workflow for every message read from queue """
    try:
        print "[%s] Read from queue: %s" % (threadName, msg)
    except Exception as error:
        print "Error: %s" % str(error)
 

class Threaded_worker(threading.Thread):

    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE)
                    print '[%s] Succesfully Connected at broker %s' % (threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise)
                    self.client.basic_ack(self.result)
                    workflow(threadName, self.result['body'])
                except Exception as error:
                    print '[%s] Error at consumer: %s.....retrying connection after %d seconds' % (threadName, str(error),BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)

print 'Starting %d consumer threads...' % (WORKER_NUMBER)
for i in range(WORKER_NUMBER):
    td = Threaded_worker()
    td.start()




Vamos con los detalles: los parámetros BROKER_HOST y QUEUE definen la IP del servidor donde corre RabbitMQ y el nombre de la cola de la que queremos leer los mensajes.

En caso de problema con la conexión al broker (RabbitMQ), ya sea en el instante inicial como durante la operativa, se establece un tiempo de espera para reestablecerla, el cual viene dado por el parámetro BROKER_RETRY_CONNECTION. El parámetro BROKER_CONNECTION nos sirve para almacenar el estado de la conexión.

El parámetro WORKER_NUMBER establece el número de threads o hilos. Cada uno de ellos establecerá una conexión con el broker y cuando haya mensajes los consumirán y para cada uno de ellos ejecutarán el código definido en el método workflow, en este caso simplemente imprimirán el mensaje leído.

¿Qué sucede si el número de hilos no es suficiente para consumir los mensajes? Sería bueno poder aumentar el número de consumidores en función del tamaño de la cola. Esto queda resuelto en la entrada https://codigo-python.blogspot.com.es/2017/12/colas-rabbitmq-numero-variable-de_11.html

En futuras entradas ampliaremos funcionalidad del método workflow, haciendo por ejemplo, que almacene en una BD mongo cada mensaje leído.