11 de diciembre de 2017

Colas (RabbitMQ) - número variable de consumidores

En una entrada previa Colas (RabbitMQ) - consumidores mostrábamos como crear un pool de threads que iban a consumir mensajes de una cola y ejecutar un workflow para cada mensaje.

El número de consumidores (tamaño del pool de threads) era fijo. Pero ¿Qué sucede si el número de mensajes encolados se dispara? Sería bueno poder crear nuevos hilos que consumieran mensajes mientras el tamaño de la cola excediera un determinado tamaño y una vez que la cola volviera a tener un número de mensajes aceptable estos hilos terminasen.

Con este planteamiento vamos a tener un número de hilos fijos que vamos a denominar Main y por otro lados vamos a crear hilos Backing que solamente existirán cuando el tamaño de la cola exceda un determinado tamaño. Estos hilos Backing juegan el papel de refuerzo temporal a los hilos Main cuando éstos no son suficientes para consumir mensajes a un ritmo que mantenga los niveles de la cola en un determinado número de mensajes.

Para implementarlo vamos a añadir en el código una tarea encargada de comprobar de forma periódica el tamaño de la cola y en función del resultado creará nuevos hilos Backing. Podríamos realizar el chequeo usando la propia librería puka, pero en este caso vamos a aprovechar la api web que nos aporta el plugin de management de RabbitMQ del que ya hablamos en entradas previas.

El plugin nos proporciona un interfaz web que acepta diferentes métodos y operaciones: api-web-management
Basándonos en esa api y usando la librería requests de python (http://docs.python-requests.org) podemos escribir el siguiente código que va a devolvernos el número de mensajes que contiene una cola:

def get_messages_count_in_queue(queue):
    """ Get messages at Queue through RabbitMQ api"""
    try:
        url = 'http://' + BROKER_HOST + ':15672/api/queues/%2f/' + queue
        r = requests.get(url, auth=('guest', 'guest'))
        if r.status_code == 200:
            try:
                messages_count = r.json()['messages']
            except:
                messages_count = 0
        else:
            print "Error at RabbitMQ api. Code: %d" % (r.status_code)
            messages_count = 0
        return messages_count
    except Exception as error:
        print "Error checking messages at queue: %s" % str(error)
        return 0

Usando el código anterior podemos implementar una lógica como la que representa el siguiente diagrama:

:

El diagrama anterior se traduce en el siguiente código:

#!/usr/bin/env python

import puka
import time
import threading
import requests

BROKER_HOST = 'broker'
QUEUE = 'QUEUE-1'
BROKER_RETRY_CONNECTION = 10
MAIN_WORKER_NUMBER = 4
MAX_BACKING_WORKER_NUMBER = 10
WATCHDOG_CHECK_INTERVAL = 30
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'
QUEUE_SIZE_THRESHOLD = 10000

def get_messages_count_in_queue(queue):
    """ Get messages at Queue through RabbitMQ api"""
    try:
        url = 'http://' + BROKER_HOST + ':15672/api/queues/%2f/' + queue
        r = requests.get(url, auth=('guest', 'guest'))
        if r.status_code == 200:
            try:
                messages_count = r.json()['messages']
            except:
                messages_count = 0
        else:
            print "Error at RabbitMQ api. Code: %d" % (r.status_code)
            messages_count = 0
        return messages_count
    except Exception as error:
        print "Error checking messages at queue: %s" % str(error)
        return 0

def workflow(thread_name, thread_type, msg):
    """ Workflow for every message read from queue """
    try:
        if thread_type == 'main':
            print "[%s] Main thread read from queue: %s" % (thread_name, msg)
        elif thread_type == 'backing':
            print "(%s) Backing thread read from queue: %s" % (thread_name, msg)
    except Exception as error:
        print "Error: %s" % str(error)

class Main_worker(threading.Thread):
    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
                    print '[%s] Succesfully Connected at broker %s' % (self.threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise)
                    self.client.basic_ack(self.result)
                    workflow(thread_name=self.threadName, thread_type='main', msg=self.result['body'])
                except Exception as error:
                    print '[%s] Error at consumer: %s.....retrying connection after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)

class Backing_worker(threading.Thread):
    BROKER_CONNECTION = 0

    global queue_size
    global backing_threads_count

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.threadName = threading.currentThread().getName()
        while queue_size > QUEUE_SIZE_THRESHOLD:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
                    print '[--%s--] Succesfully Connected at broker %s' % (self.threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[--%s--] Error at broker connection: %s.....retrying after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise, timeout=2)
                    self.client.basic_ack(self.result)
                    workflow(thread_name=self.threadName, thread_type='backing', msg=self.result['body'])
                except puka.ResourceError as error:
                    print '[--%s--] Error at consumer: %s.....retrying connection after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
        backing_threads_count -=1


print 'Starting %d main consumer threads...' % (MAIN_WORKER_NUMBER)
for i in range (MAIN_WORKER_NUMBER):
    td = Main_worker()
    td.start()

while True:
    backing_threads_count = 0
    queue_size = get_messages_count_in_queue(QUEUE)
    if queue_size > QUEUE_SIZE_THRESHOLD and backing_threads_count < MAX_BACKING_WORKER_NUMBER:
        print "Queue size exceeds the threshold value. Starting new backing thread"
        td = Backing_worker()
        td.start()
        backing_threads_count+=1
    time.sleep(WATCHDOG_CHECK_INTERVAL)

Vamos a analizar el código, comenzando por algunos de los nuevos parámetros:

MAIN_WORKER_NUMBER: representa el numero de hilos fijos que van a componer el pool Main de consumidores. Se inicializa al comienzo y en todo momento van a estar leyendo mensajes de la cola.
MAX_BACKING_WORKER_NUMBER: representa el número máximo de hilos "de refuerzo" ejecutándose que vamos a permitir. Dicho de otro modo el máximo tamaño del pool Backing
WATCHDOG_CHECK_INTERVAL: representa el periodo de tiempo entre cada chequeo para comprobar el número de mensajes almacenados en la cola.
QUEUE_SIZE_THRESHOLD: representa el límite máximo de mensajes encolados. Si el tamaño de la cola excede este valor al realizar la comprobación se creará un nuevo thread "de refuerzo" siempre que el número de threads de refuerzo no exceda el valor MAX_BACKING_WORKER_NUMBER

Por otro lado hemos definido 2 clases, una para los Main threads y otra para los Backing threads. Las clases son similares, salvo el detalle del método run():

- En el caso de los Main threads el bucle while se ejecuta indefinidamente, es decir, este tipo de hilos siempre van a estar "vivos".
- En el caso de los Backing threads el bucle while se ejecuta mientras el tamaño de la cola exceda del valor establecido como umbral (QUEUE_SIZE_THRESHOLD). Una vez que el tamaño de la cola sea menor de ese valor el hilo termina.

while True:
vs

while queue_size > QUEUE_SIZE_THRESHOLD

Para que los Backing threads puedan tener acceso a las variables queue_size y backing_threads_count las definimos como globales dentro de la clase:

global queue_size
global backing_threads_count
En los referente al workflow que va a ejecutar cada hilo, hacemos que cada hilo especifique al método qué clase de hilo es (Main o Backing simplemente para trazar de diferente la ejecución de uno y otro con corchetes o paréntesis.

Por último un detalle referente al parámetro prefetch_count de la librería puka:

self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
El parámetro prefetch_count representa el número de mensajes "reservados" para consumir en un canal de RabbitMQ, para los cuales no hemos enviado ack. Vamos a desarrollarlo un poco más a fondo.

Si ejecutamos el código de RabbitMQ consumidores y publicamos mensajes en una cola de RabbitMQ tendremos algo como lo siguiente:



Si nos fijamos tenemos 3 contadores:

- Ready: mensajes que pueden ser consumidos.
- Unacked: mensajes reservados para consumir en un canal, para los cuales no se ha recibido confirmación de haberlos consumido.
- Total: total de mensajes.

Si en la situación anterior con digamos 100k mensajes ejecutáramos nuestro código, si la ejecución del workflow tardara demasiado podría suceder que cada uno de los 4 Main threads reserve 25k mensajes para ser consumidos, de forma que tendríamos 100k mensajes Unacked y 0 mensajes Ready, con lo cual, aunque se creen nuevos threads Backing de refuerzo, estos no podrían consumir ningún mensajes, salvo que se publicaran nuevos mensajes en la cola. Es por ello que hacemos uso del prefecth_count

Podéis encontrar información más detallada al respecto en la propia página de RabbitMQ: https://www.rabbitmq.com/consumer-prefetch.html

Probad a ejecutar el código incluyendo algún time sleep de retardo en el método workflow() y "acelerad" la publicación de mensajes usando el código de https://codigo-python.blogspot.com.es/2017/11/colas-rabbitmq-threads-ii.html y comprobad como al crecer la cola se crean los hilos de refuerzo.

1 comentario:

  1. Fantantico post!
    Ingenioso y muy ilustrativo. Muy interesante tambien el parámetro prefecth de RabbitMQ

    ResponderEliminar