15 de diciembre de 2017

logs con rotado en python como modulo

En esta ocasión vamos a incorporar un sistema de logs a nuestra aplicación python. El sistema nos va a permitir definir diferentes niveles de logs (info, debug, error, etc...) y establecer una política de rotado para evitar que el fichero de logs crezca indefinidamente.

Para ello vamos a hacer uso de la librería logging: https://docs.python.org/2/library/logging.html

Los parámetros que vamos a usar en nuestro código son:
- LOG_NAME: nombre (string) que identifique a la instancia del logger.
- LOG_FOLDER: directorio que va a contener los ficheros de logs.
- LOG_FILE: nombre del fichero de logs.
- ROTATE_TIME: momento del día en el que queremos llevar a cabo el rotado de logs. Podemos optar por los siguientes valores:
- LOG_COUNT: número total de ficheros que queremos mantener. Una vez superado este valor, se borrarán los ficheros más antiguos.
- LOG_LEVEL: level de logs. Los valores posibles son los siguientes:

- LOG_FORMAT: formato de los logs. Podemos definir una plantilla incluyendo el orden de los elementos, gravedad y el timestamp.

Un ejemplo, en el caso de un sistema unix o linux, podría ser:

LOGGER_NAME= 'my-logger'
LOG_FOLDER = '/var/log/my-app/'
LOG_FILE = 'my-app.log'
ROTATE_TIME = 'midnight'
LOG_LEVEL = 'DEBUG'
LOG_COUNT = 10
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'

Lógicamente tenemos que asegurarnos que el el directorio LOG_FOLDER exista y que el usuario con el que va a correr nuestro código python tenga permisos de escritura en ese directorio.

También podemos añadir en el propio código del logger la opción de chequear si el directorio existe, y en caso contrario intentar crearlo, aunque de nuevo el usuario debe tener permisos para poder crear el directorio. El código sería algo como lo siguiente:

import os

log_folder = os.path.dirname(LOG_FOLDER)

if not os.path.exists(log_folder):
    try:
        os.makedirs(log_folder)
    except Exception as error:
        print 'Error creating the log folder: %s' % (str(error))

En nuestro caso vamos a suponer que el directorio LOG_FOLDER existe y que el usuario con el que vamos a ejecutar el código tiene permisos de escritura sobre el directorio.

De este modo el código quedaría del siguiente modo:

import logging.handlers
import sys

LOGGER_NAME= 'my-logger'
LOG_FOLDER = '/var/log/my-app/'
LOG_FILE = 'my-app.log'
LOG = LOG_FOLDER + LOG_FILE
ROTATE_TIME = 'midnight'
LOG_LEVEL = logging.DEBUG
LOG_COUNT = 5
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'

try:
    logger = logging.getLogger(LOGGER_NAME)
    loggerHandler = logging.handlers.TimedRotatingFileHandler(filename=LOG , when=ROTATE_TIME, interval=1, backupCount=LOG_COUNT)
    formatter = logging.Formatter(LOG_FORMAT)
    loggerHandler.setFormatter(formatter)
    logger.addHandler(loggerHandler)
    logger.setLevel(LOG_LEVEL)
except Exception as error:
    print "Error with logs: %s" % (str(error))
    sys.exit()
Para escribir logs, basta con invocar el logger, indicando el levelname y el message a trazar, según hemos definido en LOG_FORMAT. Algunos ejemplos:

logger.info("writing log with info level")
logger.error("writing log with error info. string %s | integer : %d", 'parameter string', 78 )
logger.debug("writting debug record")
Lo cual se traduciría en las siguientes trazas en el fichero /var/log/my-app/my-app.log:

2017-12-15 22:24:32,437 INFO writing log with info level
2017-12-15 22:24:32,438 ERROR writing log with error info. string parameter string | integer : 78
2017-12-15 22:24:32,438 DEBUG writting debug record
Como podéis observas las trazas se ajustan al formato que hemos definido y además podemos pasar parámetros a la traza.

Si nuestra aplicación python se compone de varios ficheros o si queremos trazar logs desde varios ficheros, resulta tedioso y poco práctico tener que incluir este código del logger en cada fichero. Para evitarlo vamos a ver como definir nuestro logger como un módulo.

Supongamos que nuestra aplicación está formada por n ficheros en el directorio my-app-python:

  my-app-python folder
      |_ _ _ _ _ _ my-app-file-1.py
      |_ _ _ _ _ _ my-app-file-2.py
      ...............
      |_ _ _ _ _ _ my-app-file-n.py

Queremos que todos los ficheros my-app-file-1.py, my-app-file-2.py ... my-app-file-n.py usen la lógica del logger que hemos definido antes.

Para ello vamos a construir un fichero mylogger.py con el siguiente contenido:

import logging.handlers
import sys

LOGGER_NAME= 'my-logger'
LOG_FOLDER = '/var/log/my-app/'
LOG = LOG_FOLDER + LOG_FILE
LOG_FILE = 'my-app.log'
ROTATE_TIME = 'midnight'
LOG_LEVEL = logging.DEBUG
LOG_COUNT = 5
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'

try:
    logger = logging.getLogger(LOGGER_NAME)
    loggerHandler = logging.handlers.TimedRotatingFileHandler(logfile=LOG , when=ROTATE_TIME, interval=1, backupCount=LOG_COUNT)
    formatter = logging.Formatter(LOG_FORMAT)
    loggerHandler.setFormatter(formatter)
    logger.addHandler(loggerHandler)
    logger.setLevel(LOG_LEVEL)
except Exception as error:
    print "Error with logs: %s" % (str(error))
    sys.exit()

def getLogger():
    return logger
A continuación creamos una carpeta, que llamaremos por ejemplo utils dentro de la cual meteremos el fichero my-logger.py junto con un fichero __init__.py vacío:

  utils folder
      |_ _ _ _ _ _  __init__.py  (empty file)
      |_ _ _ _ _ _  mylogger.py

Y por último guardamos la carpeta utils dentro de la carpeta my-app-python:

  my-app-python folder
      |_ _ _ _ _ _ my-app-file-1.py
      |_ _ _ _ _ _ my-app-file-2.py
      ...............
      |_ _ _ _ _ _ my-app-file-n.py
      |_ _ _ _ _ _ utils folder
                       |_ _ _ _ _ _  __init__.py  (empty file)
                       |_ _ _ _ _ _  mylogger.py

Para poder trazar desde cualquiera de los ficheros my-app-file-1.py, my-app-file-2.py ... my-app-file-n.py basta con importar el módulo, e invocar una instancia del logger a través del método getLogger():

from utils import mylogger

logger = mylogger.getLogger()

logger.info("info record")
logger.error("error record")

try:
   blalbla
except Exception as error:
   logger.error("Error at code: %s" , str(error)
De este modo tendremos un log unificado para todos los ficheros de my-app-python

Podéis comprobar como los ficheros efectivamente rotan y además se eliminan los fichero más antiguos según el número de ficheros que configuréis en LOG_COUNT

14 de diciembre de 2017

python MySQL operaciones de lectura

En esta entrada veremos una primera forma de conectar a una base de datos MySQL para llevar a cabo las operaciones CRUD básicas (Create, Read, Update and Delete).
Existen numerosa librerías para llevar a cabo este cometido. En este caso vamos a trabajar con el MySQLdb: info completa.

El primer paso sería la instalación del módulo, puesto que en las instalaciones estándar de python no está incluido. En debian y sistemas basados en debian podemos instalar el módulo haciendo uso de la herramienta apt-get y en general en cualquier sistema unix haciendo uso de la utilidad pip:

root@debian# apt-get install python-mysqldb
or
root@debian# pip install MySQL-python
Si ninguna de las opciones anteriores os convence podéis descargar el fichero .tar.gz desde la web oficial http://sourceforge.net/projects/mysql-python y hacer la instalación a mano:

root@debian# tar -xzvf MySQL-python-1.2.4b4.tar.gz
root@debian# cd MySQL-python-1.2.4b4
root@debian# python setup.py build
root@debian# python setup.py install
En el caso de sistemas windows podemos descargarlo desde aquí.
Una vez instalado podemos importar el módulo en nuestro programa python y hacer uso del mismo:

#!/usr/bin/env python

import MySQLdb

El módulo MySQLdb cumple con la Python Database API Specification v2.0: https://www.python.org/dev/peps/pep-0249/ y por ello maneja dos conceptos que debemos diferenciar: conexión a base de datos y cursor.

Inicialmente estableceremos la conexión a base de datos y una vez conectados los cursores nos permitirán ejecutar las operaciones y manejar los resultados de estas operaciones. Asociada a la misma conexión podemos crear diferentes cursores o bien crear un cursor para cada conexión.

Dicho de otro modo, el cursor solo existe en el contexto de una conexión previa y del mismo modo no existe el cursor si cerramos la conexión a base de datos. Esto dicho así puede resultar un poco confuso, así que vamos con algunos ejemplos concretos.

En los ejemplos que veremos a continuación vamos a trabajar con la base de datos employees que podéis descargar de la web de mysql: https://dev.mysql.com/doc/employee/en/.

En primer lugar vamos a ver como establecer la conexión a la base de datos:

#!/usr/bin/env python

import MySQLdb

DB_IP = "192.168.0.160"
DB_PORT = 3307
DB_NAME = "employees"
DB_USER = "root"
DB_PASSWORD = "1234"

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
except MySQLdb.Error as mysql_error:
    print "Error connecting to database: %s" % (str(mysql_error))
Como vemos, la librería tiene sus propias excepciones, en las que podemos acceder a los diferentes errores de mysql, como por ejemplo:

Error connecting to database: (1130, "Host '192.168.0.159' is not allowed to connect to this MySQL server")
Error connecting to database: (2003, 'Can\'t connect to MySQL server on \'192.168.0.160\' (113 "No route to host")')
Suponiendo que la base de datos está configurada con los permisos adecuados, el código anterior nos generará una conexión a MySQL que podemos usar para ejecutar operaciones.

Vamos a realizar algunas consultas sobre la tabla employees (el nombre coincide con el de la base de datos), que tiene la siguiente estructura:

mysql> desc employees;
+------------+---------------+------+-----+---------+-------+
| Field      | Type          | Null | Key | Default | Extra |
+------------+---------------+------+-----+---------+-------+
| emp_no     | int(11)       | NO   | PRI | NULL    |       |
| birth_date | date          | NO   |     | NULL    |       |
| first_name | varchar(14)   | NO   |     | NULL    |       |
| last_name  | varchar(16)   | NO   |     | NULL    |       |
| gender     | enum('M','F') | NO   |     | NULL    |       |
| hire_date  | date          | NO   |     | NULL    |       |
+------------+---------------+------+-----+---------+-------+
Vamos a buscar todos los usuarios (registros de employees) cuyo first_name sea Patricia. Al tratarse de un varchar la consulta sería:

mysql> select emp_no from employees where first_name='Patricia';
A nivel de nuestro código, se traduciría en:

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))
Como vemos, el orden a la hora de ejecutar la consulta es:

1. crear la conexión db_connection
2. definir un cursor my_cursor asociado a la conexión
3. ejecutar sobre el cursor mediante my_cursor.execute la query que hemos construido
4. recoger el resultado de la consulta del cursor mediante my_cursor.fetchall() el resultado de la operación
5. cerrar el cursor
6. cerrar la conexión

El resultado de una consulta queda almacenado en al variable result. Pero ¿Qué tipo de objeto es result? En este caso, al usar fetchall result va a ser siempre una tupla.

Esta tupla va a estar vacía si no se encuentran registros y en caso de que existan registros, va a contener a su vez tuplas con estos registros que podemos recorrer.

Volvamos sobre el ejemplo anterior para verlo con más detalle, buscando primero los registros cuyo firs_name es 'Patricia2':

username = 'Patricia2'
query = " select * from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

print "Found %d records" % len(result)

if len(result) > 0:
    for record in result:
        print record
else:
    print result
Si ejecutamos ese código obtendremos:

Found 0 records
()
En este caso existen 0 registros cuyo first_name es 'Patricia2' y el resultado es una tupla vacía result=()
Si buscamos registros cuyo first_name es 'Patricia' el resultado cambia:

Found 215 records
(10786L, datetime.date(1964, 5, 19), 'Patricia', 'dAstous', 'M', datetime.date(1989, 3, 14))
(11884L, datetime.date(1963, 4, 10), 'Patricia', 'Moehrke', 'M', datetime.date(1998, 6, 18))
(12693L, datetime.date(1956, 11, 25), 'Patricia', 'Demke', 'M', datetime.date(1986, 3, 3))
(14353L, datetime.date(1959, 1, 7), 'Patricia', 'Ghandeharizadeh', 'F', datetime.date(1990, 3, 9))
(14518L, datetime.date(1955, 6, 19), 'Patricia', 'Peir', 'M', datetime.date(1986, 7, 6))
..........................
En este caso result es una tupla con contenido: cada elemento de la tupla es a su vez una tupla con los valores de las diferentes columnas de la tabla respetando el orden de las columnas de la tabla (emp_no, birth_date, first_name, last_name , gender, hire_date).

Siguiendo con el ejemplo anterior, si nos interesa sacar solamente la lista de apellidos (last_name) de todos los usuarios de nombre Patricia, podemos recorrer las tuplas del result y quedarme con el cuarto campo de cada tupla:

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

print "Found %d records" % len(result)

if len(result) > 0:
    for record in result:
        # we get the fourth element of every tuple
        print record[3]
else:
    print result
De este modo, al ejecutar el código obtendremos:

Found 215 records
dAstous
Moehrke
Demke
Ghandeharizadeh
Peir
Dayana
Masada
Gulla
Lundstrom
.........
Otra forma más optima de hacer lo mismo sería modificar la consulta y sacar de la base de datos solamente aquellos datos que queramos, en este caso los apellidos:

username = 'Patricia'
query = " select last_name from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

print "Found %d records" % len(result)
¿Cuál creéis que será el resultado? ¿La lista de apellidos? casi...

Found 215 records
('dAstous',)
('Moehrke',)
('Demke',)
('Ghandeharizadeh',)
('Peir',)
('Dayana',)
('Masada',)
('Gulla',)
('Lundstrom',)
.......
Ya dijimos antes, que en caso de que existan registros el resultado va a ser una tupla de tuplas, por tanto para sacar los apellidos debemos quedarnos con el primer elemento de cada tupla (:

username = 'Patricia'
query = " select last_name from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

print "Found %d records" % len(result)

if len(result) > 0:
    for record in result:
        # we get the first element of every tuple
        print record[0]
else:
    print result
Ahora sí:

Found 215 records
('dAstous',)
('Moehrke',)
('Demke',)
('Ghandeharizadeh',)
('Peir',)
('Dayana',)
('Masada',)
('Gulla',)
('Lundstrom',)
.......
Con esto hemos cubierto todos los casos si empleamos fetchall.

Otra opción posible es usar fetchone. A diferencia de fetchall, fetchone va a devolver una tupla con los valores de un registro en caso de que exista algún registro o None en caso de que no exista ningún registro.

El matiz es importante porque si ejecutamos un len(result) sobre una consulta con fetchone podemos obtener una excepción porque puede ser None:

TypeError: object of type 'NoneType' has no len()
Volviendo al ejemplo anterior:

username = 'Patricia2'
query = " select * from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchone()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result is None:
    print "No record found "
else:
    print result
El resultado será:

No record found 
Si cambiamos 'Patricia2' por 'Patricia' obtendremos la tupla con los valores:

(10786L, datetime.date(1964, 5, 19), 'Patricia', 'dAstous', 'M', datetime.date(1989, 3, 14))
De nuevo si solo queremos quedarnos con el apellido podemos sacar todas las columnas y quedarnos con el campo cuarto:

username = 'Patricia'
query = " select * from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchone()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result is None:
    print "No record found "
else:
    # we get the fourth value
    print result[3]
O bien modificar la consulta y quedarnos con el primer campo de la tupla:

username = 'Patricia'
query = " select last_name from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchone()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result is None:
    print "No record found "
else:
    # we get the firts value
    print result[0]
En ambos casos el resultado es:

dAstous
Normalmente se emplea fetchone cuando buscamos sabiendo que en caso de existir el registro éste va a ser único o para operaciones en las que sepamos de antemano que el resultado es un único registro, como por ejemplo select count que podemos emplear para saber cuantos registros de la tabla tiene nombre 'Patricia' sin necesidad de traérnoslos y contarlos como hicimos al comienzo de la entrada:

username = 'Patricia'
query = " select count(*) from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchone()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result is None:
    print "No records found "
else:
    # we get the firts value
    print "Records found: %d" % result[0]
El resultado sería:

Records found: 215
Como comentábamos al principio, una misma conexión puede emplearse para encadenar varias sentencias, de modo que nos ahorramos el establecimiento. Podemos incluso reutilizar el cursor:

username_1 = 'Patricia'
username_2 = 'Oscar'
query_1 = " select count(*) from employees where first_name=\'%s\' " % username_1
query_2 = " select * from employees where first_name=\'%s\' " % username_2
try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query_1)
    result_1 = my_cursor.fetchone()
    my_cursor.execute(query_2)
    result_2 = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result_1 is None:
    print "No records found with first_name %s " % username_1
else:
    print "Records found with first_name %s: %d" % (username_1,result_1[0])

print "Records with first_name %s" % (username_2)
for record in result_2:
    print record
El resultado sería:

Records found with first_name Patricia: 215
Records with first_name Oscar
(11398L, datetime.date(1953, 5, 30), 'Oscar', 'Peir', 'M', datetime.date(1986, 8, 23))
(11530L, datetime.date(1956, 11, 14), 'Oscar', 'Jarecki', 'M', datetime.date(1985, 6, 10))
(15399L, datetime.date(1961, 8, 13), 'Oscar', 'Mukaidono', 'F', datetime.date(1985, 5, 1))
(17994L, datetime.date(1962, 1, 31), 'Oscar', 'Msuda', 'M', datetime.date(1990, 1, 21))
(20096L, datetime.date(1959, 5, 16), 'Oscar', 'Gladwell', 'M', datetime.date(1995, 9, 4))
(20737L, datetime.date(1957, 4, 8), 'Oscar', 'Acton', 'F', datetime.date(1987, 1, 29))
.......
Realmente, la forma óptima de operar sería disponer de un pool de conexiones a base de datos, de modo que para ejecutar una consulta obtengamos una conexión del pool. Veremos como hacer esto en futuras entradas.

11 de diciembre de 2017

Colas (RabbitMQ) - número variable de consumidores

En una entrada previa Colas (RabbitMQ) - consumidores mostrábamos como crear un pool de threads que iban a consumir mensajes de una cola y ejecutar un workflow para cada mensaje.

El número de consumidores (tamaño del pool de threads) era fijo. Pero ¿Qué sucede si el número de mensajes encolados se dispara? Sería bueno poder crear nuevos hilos que consumieran mensajes mientras el tamaño de la cola excediera un determinado tamaño y una vez que la cola volviera a tener un número de mensajes aceptable estos hilos terminasen.

Con este planteamiento vamos a tener un número de hilos fijos que vamos a denominar Main y por otro lados vamos a crear hilos Backing que solamente existirán cuando el tamaño de la cola exceda un determinado tamaño. Estos hilos Backing juegan el papel de refuerzo temporal a los hilos Main cuando éstos no son suficientes para consumir mensajes a un ritmo que mantenga los niveles de la cola en un determinado número de mensajes.

Para implementarlo vamos a añadir en el código una tarea encargada de comprobar de forma periódica el tamaño de la cola y en función del resultado creará nuevos hilos Backing. Podríamos realizar el chequeo usando la propia librería puka, pero en este caso vamos a aprovechar la api web que nos aporta el plugin de management de RabbitMQ del que ya hablamos en entradas previas.

El plugin nos proporciona un interfaz web que acepta diferentes métodos y operaciones: api-web-management
Basándonos en esa api y usando la librería requests de python (http://docs.python-requests.org) podemos escribir el siguiente código que va a devolvernos el número de mensajes que contiene una cola:

def get_messages_count_in_queue(queue):
    """ Get messages at Queue through RabbitMQ api"""
    try:
        url = 'http://' + BROKER_HOST + ':15672/api/queues/%2f/' + queue
        r = requests.get(url, auth=('guest', 'guest'))
        if r.status_code == 200:
            try:
                messages_count = r.json()['messages']
            except:
                messages_count = 0
        else:
            print "Error at RabbitMQ api. Code: %d" % (r.status_code)
            messages_count = 0
        return messages_count
    except Exception as error:
        print "Error checking messages at queue: %s" % str(error)
        return 0

Usando el código anterior podemos implementar una lógica como la que representa el siguiente diagrama:

:

El diagrama anterior se traduce en el siguiente código:

#!/usr/bin/env python

import puka
import time
import threading
import requests

BROKER_HOST = 'broker'
QUEUE = 'QUEUE-1'
BROKER_RETRY_CONNECTION = 10
MAIN_WORKER_NUMBER = 4
MAX_BACKING_WORKER_NUMBER = 10
WATCHDOG_CHECK_INTERVAL = 30
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'
QUEUE_SIZE_THRESHOLD = 10000

def get_messages_count_in_queue(queue):
    """ Get messages at Queue through RabbitMQ api"""
    try:
        url = 'http://' + BROKER_HOST + ':15672/api/queues/%2f/' + queue
        r = requests.get(url, auth=('guest', 'guest'))
        if r.status_code == 200:
            try:
                messages_count = r.json()['messages']
            except:
                messages_count = 0
        else:
            print "Error at RabbitMQ api. Code: %d" % (r.status_code)
            messages_count = 0
        return messages_count
    except Exception as error:
        print "Error checking messages at queue: %s" % str(error)
        return 0

def workflow(thread_name, thread_type, msg):
    """ Workflow for every message read from queue """
    try:
        if thread_type == 'main':
            print "[%s] Main thread read from queue: %s" % (thread_name, msg)
        elif thread_type == 'backing':
            print "(%s) Backing thread read from queue: %s" % (thread_name, msg)
    except Exception as error:
        print "Error: %s" % str(error)

class Main_worker(threading.Thread):
    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
                    print '[%s] Succesfully Connected at broker %s' % (self.threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise)
                    self.client.basic_ack(self.result)
                    workflow(thread_name=self.threadName, thread_type='main', msg=self.result['body'])
                except Exception as error:
                    print '[%s] Error at consumer: %s.....retrying connection after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)

class Backing_worker(threading.Thread):
    BROKER_CONNECTION = 0

    global queue_size
    global backing_threads_count

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.threadName = threading.currentThread().getName()
        while queue_size > QUEUE_SIZE_THRESHOLD:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
                    print '[--%s--] Succesfully Connected at broker %s' % (self.threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[--%s--] Error at broker connection: %s.....retrying after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise, timeout=2)
                    self.client.basic_ack(self.result)
                    workflow(thread_name=self.threadName, thread_type='backing', msg=self.result['body'])
                except puka.ResourceError as error:
                    print '[--%s--] Error at consumer: %s.....retrying connection after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
        backing_threads_count -=1


print 'Starting %d main consumer threads...' % (MAIN_WORKER_NUMBER)
for i in range (MAIN_WORKER_NUMBER):
    td = Main_worker()
    td.start()

while True:
    backing_threads_count = 0
    queue_size = get_messages_count_in_queue(QUEUE)
    if queue_size > QUEUE_SIZE_THRESHOLD and backing_threads_count < MAX_BACKING_WORKER_NUMBER:
        print "Queue size exceeds the threshold value. Starting new backing thread"
        td = Backing_worker()
        td.start()
        backing_threads_count+=1
    time.sleep(WATCHDOG_CHECK_INTERVAL)

Vamos a analizar el código, comenzando por algunos de los nuevos parámetros:

MAIN_WORKER_NUMBER: representa el numero de hilos fijos que van a componer el pool Main de consumidores. Se inicializa al comienzo y en todo momento van a estar leyendo mensajes de la cola.
MAX_BACKING_WORKER_NUMBER: representa el número máximo de hilos "de refuerzo" ejecutándose que vamos a permitir. Dicho de otro modo el máximo tamaño del pool Backing
WATCHDOG_CHECK_INTERVAL: representa el periodo de tiempo entre cada chequeo para comprobar el número de mensajes almacenados en la cola.
QUEUE_SIZE_THRESHOLD: representa el límite máximo de mensajes encolados. Si el tamaño de la cola excede este valor al realizar la comprobación se creará un nuevo thread "de refuerzo" siempre que el número de threads de refuerzo no exceda el valor MAX_BACKING_WORKER_NUMBER

Por otro lado hemos definido 2 clases, una para los Main threads y otra para los Backing threads. Las clases son similares, salvo el detalle del método run():

- En el caso de los Main threads el bucle while se ejecuta indefinidamente, es decir, este tipo de hilos siempre van a estar "vivos".
- En el caso de los Backing threads el bucle while se ejecuta mientras el tamaño de la cola exceda del valor establecido como umbral (QUEUE_SIZE_THRESHOLD). Una vez que el tamaño de la cola sea menor de ese valor el hilo termina.

while True:
vs

while queue_size > QUEUE_SIZE_THRESHOLD

Para que los Backing threads puedan tener acceso a las variables queue_size y backing_threads_count las definimos como globales dentro de la clase:

global queue_size
global backing_threads_count
En los referente al workflow que va a ejecutar cada hilo, hacemos que cada hilo especifique al método qué clase de hilo es (Main o Backing simplemente para trazar de diferente la ejecución de uno y otro con corchetes o paréntesis.

Por último un detalle referente al parámetro prefetch_count de la librería puka:

self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
El parámetro prefetch_count representa el número de mensajes "reservados" para consumir en un canal de RabbitMQ, para los cuales no hemos enviado ack. Vamos a desarrollarlo un poco más a fondo.

Si ejecutamos el código de RabbitMQ consumidores y publicamos mensajes en una cola de RabbitMQ tendremos algo como lo siguiente:



Si nos fijamos tenemos 3 contadores:

- Ready: mensajes que pueden ser consumidos.
- Unacked: mensajes reservados para consumir en un canal, para los cuales no se ha recibido confirmación de haberlos consumido.
- Total: total de mensajes.

Si en la situación anterior con digamos 100k mensajes ejecutáramos nuestro código, si la ejecución del workflow tardara demasiado podría suceder que cada uno de los 4 Main threads reserve 25k mensajes para ser consumidos, de forma que tendríamos 100k mensajes Unacked y 0 mensajes Ready, con lo cual, aunque se creen nuevos threads Backing de refuerzo, estos no podrían consumir ningún mensajes, salvo que se publicaran nuevos mensajes en la cola. Es por ello que hacemos uso del prefecth_count

Podéis encontrar información más detallada al respecto en la propia página de RabbitMQ: https://www.rabbitmq.com/consumer-prefetch.html

Probad a ejecutar el código incluyendo algún time sleep de retardo en el método workflow() y "acelerad" la publicación de mensajes usando el código de https://codigo-python.blogspot.com.es/2017/11/colas-rabbitmq-threads-ii.html y comprobad como al crecer la cola se crean los hilos de refuerzo.