15 de diciembre de 2017

logs con rotado en python como modulo

En esta ocasión vamos a incorporar un sistema de logs a nuestra aplicación python. El sistema nos va a permitir definir diferentes niveles de logs (info, debug, error, etc...) y establecer una política de rotado para evitar que el fichero de logs crezca indefinidamente.

Para ello vamos a hacer uso de la librería logging: https://docs.python.org/2/library/logging.html

Los parámetros que vamos a usar en nuestro código son:
- LOG_NAME: nombre (string) que identifique a la instancia del logger.
- LOG_FOLDER: directorio que va a contener los ficheros de logs.
- LOG_FILE: nombre del fichero de logs.
- ROTATE_TIME: momento del día en el que queremos llevar a cabo el rotado de logs. Podemos optar por los siguientes valores:
- LOG_COUNT: número total de ficheros que queremos mantener. Una vez superado este valor, se borrarán los ficheros más antiguos.
- LOG_LEVEL: level de logs. Los valores posibles son los siguientes:

- LOG_FORMAT: formato de los logs. Podemos definir una plantilla incluyendo el orden de los elementos, gravedad y el timestamp.

Un ejemplo, en el caso de un sistema unix o linux, podría ser:

LOGGER_NAME= 'my-logger'
LOG_FOLDER = '/var/log/my-app/'
LOG_FILE = 'my-app.log'
ROTATE_TIME = 'midnight'
LOG_LEVEL = 'DEBUG'
LOG_COUNT = 10
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'

Lógicamente tenemos que asegurarnos que el el directorio LOG_FOLDER exista y que el usuario con el que va a correr nuestro código python tenga permisos de escritura en ese directorio.

También podemos añadir en el propio código del logger la opción de chequear si el directorio existe, y en caso contrario intentar crearlo, aunque de nuevo el usuario debe tener permisos para poder crear el directorio. El código sería algo como lo siguiente:

import os

log_folder = os.path.dirname(LOG_FOLDER)

if not os.path.exists(log_folder):
    try:
        os.makedirs(log_folder)
    except Exception as error:
        print 'Error creating the log folder: %s' % (str(error))

En nuestro caso vamos a suponer que el directorio LOG_FOLDER existe y que el usuario con el que vamos a ejecutar el código tiene permisos de escritura sobre el directorio.

De este modo el código quedaría del siguiente modo:

import logging.handlers
import sys

LOGGER_NAME= 'my-logger'
LOG_FOLDER = '/var/log/my-app/'
LOG_FILE = 'my-app.log'
LOG = LOG_FOLDER + LOG_FILE
ROTATE_TIME = 'midnight'
LOG_LEVEL = logging.DEBUG
LOG_COUNT = 5
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'

try:
    logger = logging.getLogger(LOGGER_NAME)
    loggerHandler = logging.handlers.TimedRotatingFileHandler(filename=LOG , when=ROTATE_TIME, interval=1, backupCount=LOG_COUNT)
    formatter = logging.Formatter(LOG_FORMAT)
    loggerHandler.setFormatter(formatter)
    logger.addHandler(loggerHandler)
    logger.setLevel(LOG_LEVEL)
except Exception as error:
    print "Error with logs: %s" % (str(error))
    sys.exit()
Para escribir logs, basta con invocar el logger, indicando el levelname y el message a trazar, según hemos definido en LOG_FORMAT. Algunos ejemplos:

logger.info("writing log with info level")
logger.error("writing log with error info. string %s | integer : %d", 'parameter string', 78 )
logger.debug("writting debug record")
Lo cual se traduciría en las siguientes trazas en el fichero /var/log/my-app/my-app.log:

2017-12-15 22:24:32,437 INFO writing log with info level
2017-12-15 22:24:32,438 ERROR writing log with error info. string parameter string | integer : 78
2017-12-15 22:24:32,438 DEBUG writting debug record
Como podéis observas las trazas se ajustan al formato que hemos definido y además podemos pasar parámetros a la traza.

Si nuestra aplicación python se compone de varios ficheros o si queremos trazar logs desde varios ficheros, resulta tedioso y poco práctico tener que incluir este código del logger en cada fichero. Para evitarlo vamos a ver como definir nuestro logger como un módulo.

Supongamos que nuestra aplicación está formada por n ficheros en el directorio my-app-python:

  my-app-python folder
      |_ _ _ _ _ _ my-app-file-1.py
      |_ _ _ _ _ _ my-app-file-2.py
      ...............
      |_ _ _ _ _ _ my-app-file-n.py

Queremos que todos los ficheros my-app-file-1.py, my-app-file-2.py ... my-app-file-n.py usen la lógica del logger que hemos definido antes.

Para ello vamos a construir un fichero mylogger.py con el siguiente contenido:

import logging.handlers
import sys

LOGGER_NAME= 'my-logger'
LOG_FOLDER = '/var/log/my-app/'
LOG = LOG_FOLDER + LOG_FILE
LOG_FILE = 'my-app.log'
ROTATE_TIME = 'midnight'
LOG_LEVEL = logging.DEBUG
LOG_COUNT = 5
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'

try:
    logger = logging.getLogger(LOGGER_NAME)
    loggerHandler = logging.handlers.TimedRotatingFileHandler(logfile=LOG , when=ROTATE_TIME, interval=1, backupCount=LOG_COUNT)
    formatter = logging.Formatter(LOG_FORMAT)
    loggerHandler.setFormatter(formatter)
    logger.addHandler(loggerHandler)
    logger.setLevel(LOG_LEVEL)
except Exception as error:
    print "Error with logs: %s" % (str(error))
    sys.exit()

def getLogger():
    return logger
A continuación creamos una carpeta, que llamaremos por ejemplo utils dentro de la cual meteremos el fichero my-logger.py junto con un fichero __init__.py vacío:

  utils folder
      |_ _ _ _ _ _  __init__.py  (empty file)
      |_ _ _ _ _ _  mylogger.py

Y por último guardamos la carpeta utils dentro de la carpeta my-app-python:

  my-app-python folder
      |_ _ _ _ _ _ my-app-file-1.py
      |_ _ _ _ _ _ my-app-file-2.py
      ...............
      |_ _ _ _ _ _ my-app-file-n.py
      |_ _ _ _ _ _ utils folder
                       |_ _ _ _ _ _  __init__.py  (empty file)
                       |_ _ _ _ _ _  mylogger.py

Para poder trazar desde cualquiera de los ficheros my-app-file-1.py, my-app-file-2.py ... my-app-file-n.py basta con importar el módulo, e invocar una instancia del logger a través del método getLogger():

from utils import mylogger

logger = mylogger.getLogger()

logger.info("info record")
logger.error("error record")

try:
   blalbla
except Exception as error:
   logger.error("Error at code: %s" , str(error)
De este modo tendremos un log unificado para todos los ficheros de my-app-python

Podéis comprobar como los ficheros efectivamente rotan y además se eliminan los fichero más antiguos según el número de ficheros que configuréis en LOG_COUNT

14 de diciembre de 2017

python MySQL operaciones de lectura

En esta entrada veremos una primera forma de conectar a una base de datos MySQL para llevar a cabo las operaciones CRUD básicas (Create, Read, Update and Delete).
Existen numerosa librerías para llevar a cabo este cometido. En este caso vamos a trabajar con el MySQLdb: info completa.

El primer paso sería la instalación del módulo, puesto que en las instalaciones estándar de python no está incluido. En debian y sistemas basados en debian podemos instalar el módulo haciendo uso de la herramienta apt-get y en general en cualquier sistema unix haciendo uso de la utilidad pip:

root@debian# apt-get install python-mysqldb
or
root@debian# pip install MySQL-python
Si ninguna de las opciones anteriores os convence podéis descargar el fichero .tar.gz desde la web oficial http://sourceforge.net/projects/mysql-python y hacer la instalación a mano:

root@debian# tar -xzvf MySQL-python-1.2.4b4.tar.gz
root@debian# cd MySQL-python-1.2.4b4
root@debian# python setup.py build
root@debian# python setup.py install
En el caso de sistemas windows podemos descargarlo desde aquí.
Una vez instalado podemos importar el módulo en nuestro programa python y hacer uso del mismo:

#!/usr/bin/env python

import MySQLdb

El módulo MySQLdb cumple con la Python Database API Specification v2.0: https://www.python.org/dev/peps/pep-0249/ y por ello maneja dos conceptos que debemos diferenciar: conexión a base de datos y cursor.

Inicialmente estableceremos la conexión a base de datos y una vez conectados los cursores nos permitirán ejecutar las operaciones y manejar los resultados de estas operaciones. Asociada a la misma conexión podemos crear diferentes cursores o bien crear un cursor para cada conexión.

Dicho de otro modo, el cursor solo existe en el contexto de una conexión previa y del mismo modo no existe el cursor si cerramos la conexión a base de datos. Esto dicho así puede resultar un poco confuso, así que vamos con algunos ejemplos concretos.

En los ejemplos que veremos a continuación vamos a trabajar con la base de datos employees que podéis descargar de la web de mysql: https://dev.mysql.com/doc/employee/en/.

En primer lugar vamos a ver como establecer la conexión a la base de datos:

#!/usr/bin/env python

import MySQLdb

DB_IP = "192.168.0.160"
DB_PORT = 3307
DB_NAME = "employees"
DB_USER = "root"
DB_PASSWORD = "1234"

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
except MySQLdb.Error as mysql_error:
    print "Error connecting to database: %s" % (str(mysql_error))
Como vemos, la librería tiene sus propias excepciones, en las que podemos acceder a los diferentes errores de mysql, como por ejemplo:

Error connecting to database: (1130, "Host '192.168.0.159' is not allowed to connect to this MySQL server")
Error connecting to database: (2003, 'Can\'t connect to MySQL server on \'192.168.0.160\' (113 "No route to host")')
Suponiendo que la base de datos está configurada con los permisos adecuados, el código anterior nos generará una conexión a MySQL que podemos usar para ejecutar operaciones.

Vamos a realizar algunas consultas sobre la tabla employees (el nombre coincide con el de la base de datos), que tiene la siguiente estructura:

mysql> desc employees;
+------------+---------------+------+-----+---------+-------+
| Field      | Type          | Null | Key | Default | Extra |
+------------+---------------+------+-----+---------+-------+
| emp_no     | int(11)       | NO   | PRI | NULL    |       |
| birth_date | date          | NO   |     | NULL    |       |
| first_name | varchar(14)   | NO   |     | NULL    |       |
| last_name  | varchar(16)   | NO   |     | NULL    |       |
| gender     | enum('M','F') | NO   |     | NULL    |       |
| hire_date  | date          | NO   |     | NULL    |       |
+------------+---------------+------+-----+---------+-------+
Vamos a buscar todos los usuarios (registros de employees) cuyo first_name sea Patricia. Al tratarse de un varchar la consulta sería:

mysql> select emp_no from employees where first_name='Patricia';
A nivel de nuestro código, se traduciría en:

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))
Como vemos, el orden a la hora de ejecutar la consulta es:

1. crear la conexión db_connection
2. definir un cursor my_cursor asociado a la conexión
3. ejecutar sobre el cursor mediante my_cursor.execute la query que hemos construido
4. recoger el resultado de la consulta del cursor mediante my_cursor.fetchall() el resultado de la operación
5. cerrar el cursor
6. cerrar la conexión

El resultado de una consulta queda almacenado en al variable result. Pero ¿Qué tipo de objeto es result? En este caso, al usar fetchall result va a ser siempre una tupla.

Esta tupla va a estar vacía si no se encuentran registros y en caso de que existan registros, va a contener a su vez tuplas con estos registros que podemos recorrer.

Volvamos sobre el ejemplo anterior para verlo con más detalle, buscando primero los registros cuyo firs_name es 'Patricia2':

username = 'Patricia2'
query = " select * from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

print "Found %d records" % len(result)

if len(result) > 0:
    for record in result:
        print record
else:
    print result
Si ejecutamos ese código obtendremos:

Found 0 records
()
En este caso existen 0 registros cuyo first_name es 'Patricia2' y el resultado es una tupla vacía result=()
Si buscamos registros cuyo first_name es 'Patricia' el resultado cambia:

Found 215 records
(10786L, datetime.date(1964, 5, 19), 'Patricia', 'dAstous', 'M', datetime.date(1989, 3, 14))
(11884L, datetime.date(1963, 4, 10), 'Patricia', 'Moehrke', 'M', datetime.date(1998, 6, 18))
(12693L, datetime.date(1956, 11, 25), 'Patricia', 'Demke', 'M', datetime.date(1986, 3, 3))
(14353L, datetime.date(1959, 1, 7), 'Patricia', 'Ghandeharizadeh', 'F', datetime.date(1990, 3, 9))
(14518L, datetime.date(1955, 6, 19), 'Patricia', 'Peir', 'M', datetime.date(1986, 7, 6))
..........................
En este caso result es una tupla con contenido: cada elemento de la tupla es a su vez una tupla con los valores de las diferentes columnas de la tabla respetando el orden de las columnas de la tabla (emp_no, birth_date, first_name, last_name , gender, hire_date).

Siguiendo con el ejemplo anterior, si nos interesa sacar solamente la lista de apellidos (last_name) de todos los usuarios de nombre Patricia, podemos recorrer las tuplas del result y quedarme con el cuarto campo de cada tupla:

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

print "Found %d records" % len(result)

if len(result) > 0:
    for record in result:
        # we get the fourth element of every tuple
        print record[3]
else:
    print result
De este modo, al ejecutar el código obtendremos:

Found 215 records
dAstous
Moehrke
Demke
Ghandeharizadeh
Peir
Dayana
Masada
Gulla
Lundstrom
.........
Otra forma más optima de hacer lo mismo sería modificar la consulta y sacar de la base de datos solamente aquellos datos que queramos, en este caso los apellidos:

username = 'Patricia'
query = " select last_name from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

print "Found %d records" % len(result)
¿Cuál creéis que será el resultado? ¿La lista de apellidos? casi...

Found 215 records
('dAstous',)
('Moehrke',)
('Demke',)
('Ghandeharizadeh',)
('Peir',)
('Dayana',)
('Masada',)
('Gulla',)
('Lundstrom',)
.......
Ya dijimos antes, que en caso de que existan registros el resultado va a ser una tupla de tuplas, por tanto para sacar los apellidos debemos quedarnos con el primer elemento de cada tupla (:

username = 'Patricia'
query = " select last_name from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

print "Found %d records" % len(result)

if len(result) > 0:
    for record in result:
        # we get the first element of every tuple
        print record[0]
else:
    print result
Ahora sí:

Found 215 records
('dAstous',)
('Moehrke',)
('Demke',)
('Ghandeharizadeh',)
('Peir',)
('Dayana',)
('Masada',)
('Gulla',)
('Lundstrom',)
.......
Con esto hemos cubierto todos los casos si empleamos fetchall.

Otra opción posible es usar fetchone. A diferencia de fetchall, fetchone va a devolver una tupla con los valores de un registro en caso de que exista algún registro o None en caso de que no exista ningún registro.

El matiz es importante porque si ejecutamos un len(result) sobre una consulta con fetchone podemos obtener una excepción porque puede ser None:

TypeError: object of type 'NoneType' has no len()
Volviendo al ejemplo anterior:

username = 'Patricia2'
query = " select * from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchone()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result is None:
    print "No record found "
else:
    print result
El resultado será:

No record found 
Si cambiamos 'Patricia2' por 'Patricia' obtendremos la tupla con los valores:

(10786L, datetime.date(1964, 5, 19), 'Patricia', 'dAstous', 'M', datetime.date(1989, 3, 14))
De nuevo si solo queremos quedarnos con el apellido podemos sacar todas las columnas y quedarnos con el campo cuarto:

username = 'Patricia'
query = " select * from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchone()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result is None:
    print "No record found "
else:
    # we get the fourth value
    print result[3]
O bien modificar la consulta y quedarnos con el primer campo de la tupla:

username = 'Patricia'
query = " select last_name from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchone()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result is None:
    print "No record found "
else:
    # we get the firts value
    print result[0]
En ambos casos el resultado es:

dAstous
Normalmente se emplea fetchone cuando buscamos sabiendo que en caso de existir el registro éste va a ser único o para operaciones en las que sepamos de antemano que el resultado es un único registro, como por ejemplo select count que podemos emplear para saber cuantos registros de la tabla tiene nombre 'Patricia' sin necesidad de traérnoslos y contarlos como hicimos al comienzo de la entrada:

username = 'Patricia'
query = " select count(*) from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchone()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result is None:
    print "No records found "
else:
    # we get the firts value
    print "Records found: %d" % result[0]
El resultado sería:

Records found: 215
Como comentábamos al principio, una misma conexión puede emplearse para encadenar varias sentencias, de modo que nos ahorramos el establecimiento. Podemos incluso reutilizar el cursor:

username_1 = 'Patricia'
username_2 = 'Oscar'
query_1 = " select count(*) from employees where first_name=\'%s\' " % username_1
query_2 = " select * from employees where first_name=\'%s\' " % username_2
try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query_1)
    result_1 = my_cursor.fetchone()
    my_cursor.execute(query_2)
    result_2 = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result_1 is None:
    print "No records found with first_name %s " % username_1
else:
    print "Records found with first_name %s: %d" % (username_1,result_1[0])

print "Records with first_name %s" % (username_2)
for record in result_2:
    print record
El resultado sería:

Records found with first_name Patricia: 215
Records with first_name Oscar
(11398L, datetime.date(1953, 5, 30), 'Oscar', 'Peir', 'M', datetime.date(1986, 8, 23))
(11530L, datetime.date(1956, 11, 14), 'Oscar', 'Jarecki', 'M', datetime.date(1985, 6, 10))
(15399L, datetime.date(1961, 8, 13), 'Oscar', 'Mukaidono', 'F', datetime.date(1985, 5, 1))
(17994L, datetime.date(1962, 1, 31), 'Oscar', 'Msuda', 'M', datetime.date(1990, 1, 21))
(20096L, datetime.date(1959, 5, 16), 'Oscar', 'Gladwell', 'M', datetime.date(1995, 9, 4))
(20737L, datetime.date(1957, 4, 8), 'Oscar', 'Acton', 'F', datetime.date(1987, 1, 29))
.......
Realmente, la forma óptima de operar sería disponer de un pool de conexiones a base de datos, de modo que para ejecutar una consulta obtengamos una conexión del pool. Veremos como hacer esto en futuras entradas.

11 de diciembre de 2017

Colas (RabbitMQ) - número variable de consumidores

En una entrada previa Colas (RabbitMQ) - consumidores mostrábamos como crear un pool de threads que iban a consumir mensajes de una cola y ejecutar un workflow para cada mensaje.

El número de consumidores (tamaño del pool de threads) era fijo. Pero ¿Qué sucede si el número de mensajes encolados se dispara? Sería bueno poder crear nuevos hilos que consumieran mensajes mientras el tamaño de la cola excediera un determinado tamaño y una vez que la cola volviera a tener un número de mensajes aceptable estos hilos terminasen.

Con este planteamiento vamos a tener un número de hilos fijos que vamos a denominar Main y por otro lados vamos a crear hilos Backing que solamente existirán cuando el tamaño de la cola exceda un determinado tamaño. Estos hilos Backing juegan el papel de refuerzo temporal a los hilos Main cuando éstos no son suficientes para consumir mensajes a un ritmo que mantenga los niveles de la cola en un determinado número de mensajes.

Para implementarlo vamos a añadir en el código una tarea encargada de comprobar de forma periódica el tamaño de la cola y en función del resultado creará nuevos hilos Backing. Podríamos realizar el chequeo usando la propia librería puka, pero en este caso vamos a aprovechar la api web que nos aporta el plugin de management de RabbitMQ del que ya hablamos en entradas previas.

El plugin nos proporciona un interfaz web que acepta diferentes métodos y operaciones: api-web-management
Basándonos en esa api y usando la librería requests de python (http://docs.python-requests.org) podemos escribir el siguiente código que va a devolvernos el número de mensajes que contiene una cola:

def get_messages_count_in_queue(queue):
    """ Get messages at Queue through RabbitMQ api"""
    try:
        url = 'http://' + BROKER_HOST + ':15672/api/queues/%2f/' + queue
        r = requests.get(url, auth=('guest', 'guest'))
        if r.status_code == 200:
            try:
                messages_count = r.json()['messages']
            except:
                messages_count = 0
        else:
            print "Error at RabbitMQ api. Code: %d" % (r.status_code)
            messages_count = 0
        return messages_count
    except Exception as error:
        print "Error checking messages at queue: %s" % str(error)
        return 0

Usando el código anterior podemos implementar una lógica como la que representa el siguiente diagrama:

:

El diagrama anterior se traduce en el siguiente código:

#!/usr/bin/env python

import puka
import time
import threading
import requests

BROKER_HOST = 'broker'
QUEUE = 'QUEUE-1'
BROKER_RETRY_CONNECTION = 10
MAIN_WORKER_NUMBER = 4
MAX_BACKING_WORKER_NUMBER = 10
WATCHDOG_CHECK_INTERVAL = 30
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'
QUEUE_SIZE_THRESHOLD = 10000

def get_messages_count_in_queue(queue):
    """ Get messages at Queue through RabbitMQ api"""
    try:
        url = 'http://' + BROKER_HOST + ':15672/api/queues/%2f/' + queue
        r = requests.get(url, auth=('guest', 'guest'))
        if r.status_code == 200:
            try:
                messages_count = r.json()['messages']
            except:
                messages_count = 0
        else:
            print "Error at RabbitMQ api. Code: %d" % (r.status_code)
            messages_count = 0
        return messages_count
    except Exception as error:
        print "Error checking messages at queue: %s" % str(error)
        return 0

def workflow(thread_name, thread_type, msg):
    """ Workflow for every message read from queue """
    try:
        if thread_type == 'main':
            print "[%s] Main thread read from queue: %s" % (thread_name, msg)
        elif thread_type == 'backing':
            print "(%s) Backing thread read from queue: %s" % (thread_name, msg)
    except Exception as error:
        print "Error: %s" % str(error)

class Main_worker(threading.Thread):
    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
                    print '[%s] Succesfully Connected at broker %s' % (self.threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise)
                    self.client.basic_ack(self.result)
                    workflow(thread_name=self.threadName, thread_type='main', msg=self.result['body'])
                except Exception as error:
                    print '[%s] Error at consumer: %s.....retrying connection after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)

class Backing_worker(threading.Thread):
    BROKER_CONNECTION = 0

    global queue_size
    global backing_threads_count

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.threadName = threading.currentThread().getName()
        while queue_size > QUEUE_SIZE_THRESHOLD:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
                    print '[--%s--] Succesfully Connected at broker %s' % (self.threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[--%s--] Error at broker connection: %s.....retrying after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise, timeout=2)
                    self.client.basic_ack(self.result)
                    workflow(thread_name=self.threadName, thread_type='backing', msg=self.result['body'])
                except puka.ResourceError as error:
                    print '[--%s--] Error at consumer: %s.....retrying connection after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
        backing_threads_count -=1


print 'Starting %d main consumer threads...' % (MAIN_WORKER_NUMBER)
for i in range (MAIN_WORKER_NUMBER):
    td = Main_worker()
    td.start()

while True:
    backing_threads_count = 0
    queue_size = get_messages_count_in_queue(QUEUE)
    if queue_size > QUEUE_SIZE_THRESHOLD and backing_threads_count < MAX_BACKING_WORKER_NUMBER:
        print "Queue size exceeds the threshold value. Starting new backing thread"
        td = Backing_worker()
        td.start()
        backing_threads_count+=1
    time.sleep(WATCHDOG_CHECK_INTERVAL)

Vamos a analizar el código, comenzando por algunos de los nuevos parámetros:

MAIN_WORKER_NUMBER: representa el numero de hilos fijos que van a componer el pool Main de consumidores. Se inicializa al comienzo y en todo momento van a estar leyendo mensajes de la cola.
MAX_BACKING_WORKER_NUMBER: representa el número máximo de hilos "de refuerzo" ejecutándose que vamos a permitir. Dicho de otro modo el máximo tamaño del pool Backing
WATCHDOG_CHECK_INTERVAL: representa el periodo de tiempo entre cada chequeo para comprobar el número de mensajes almacenados en la cola.
QUEUE_SIZE_THRESHOLD: representa el límite máximo de mensajes encolados. Si el tamaño de la cola excede este valor al realizar la comprobación se creará un nuevo thread "de refuerzo" siempre que el número de threads de refuerzo no exceda el valor MAX_BACKING_WORKER_NUMBER

Por otro lado hemos definido 2 clases, una para los Main threads y otra para los Backing threads. Las clases son similares, salvo el detalle del método run():

- En el caso de los Main threads el bucle while se ejecuta indefinidamente, es decir, este tipo de hilos siempre van a estar "vivos".
- En el caso de los Backing threads el bucle while se ejecuta mientras el tamaño de la cola exceda del valor establecido como umbral (QUEUE_SIZE_THRESHOLD). Una vez que el tamaño de la cola sea menor de ese valor el hilo termina.

while True:
vs

while queue_size > QUEUE_SIZE_THRESHOLD

Para que los Backing threads puedan tener acceso a las variables queue_size y backing_threads_count las definimos como globales dentro de la clase:

global queue_size
global backing_threads_count
En los referente al workflow que va a ejecutar cada hilo, hacemos que cada hilo especifique al método qué clase de hilo es (Main o Backing simplemente para trazar de diferente la ejecución de uno y otro con corchetes o paréntesis.

Por último un detalle referente al parámetro prefetch_count de la librería puka:

self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
El parámetro prefetch_count representa el número de mensajes "reservados" para consumir en un canal de RabbitMQ, para los cuales no hemos enviado ack. Vamos a desarrollarlo un poco más a fondo.

Si ejecutamos el código de RabbitMQ consumidores y publicamos mensajes en una cola de RabbitMQ tendremos algo como lo siguiente:



Si nos fijamos tenemos 3 contadores:

- Ready: mensajes que pueden ser consumidos.
- Unacked: mensajes reservados para consumir en un canal, para los cuales no se ha recibido confirmación de haberlos consumido.
- Total: total de mensajes.

Si en la situación anterior con digamos 100k mensajes ejecutáramos nuestro código, si la ejecución del workflow tardara demasiado podría suceder que cada uno de los 4 Main threads reserve 25k mensajes para ser consumidos, de forma que tendríamos 100k mensajes Unacked y 0 mensajes Ready, con lo cual, aunque se creen nuevos threads Backing de refuerzo, estos no podrían consumir ningún mensajes, salvo que se publicaran nuevos mensajes en la cola. Es por ello que hacemos uso del prefecth_count

Podéis encontrar información más detallada al respecto en la propia página de RabbitMQ: https://www.rabbitmq.com/consumer-prefetch.html

Probad a ejecutar el código incluyendo algún time sleep de retardo en el método workflow() y "acelerad" la publicación de mensajes usando el código de https://codigo-python.blogspot.com.es/2017/11/colas-rabbitmq-threads-ii.html y comprobad como al crecer la cola se crean los hilos de refuerzo.

21 de noviembre de 2017

PyMongo - operaciones de lectura

Continuando con la entrada anterior vamos a ver algunas operaciones más que podemos llevar a cabo con la librería de pymongo.
Vamos a comenzar con las operaciones de lectura, lo que en términos de SQL serían las operaciones SELECT.

En este primer ejemplo vamos a ver como obtener todos los registros de una colección sin especificar ningún parámetro.

En este caso en la colección USERS de la base de datos TEST solo tenemos 3 registros que hemos guardado previamente utilizando el código que vimos en PyMongo - parte 1.

#!/usr/bin/env python

import pymongo

MONGODB_HOST = '192.168.0.169'
MONGODB_PORT = '27017'
MONGODB_TIMEOUT = 1000
MONGODB_DATABASE = 'TEST'

URI_CONNECTION = "mongodb://" + MONGODB_HOST + ":" + MONGODB_PORT +  "/"

try:
    client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT, maxPoolSize=10)
    client.server_info()
    print 'OK -- Connected to MongoDB at server %s' % (MONGODB_HOST)
except pymongo.errors.ServerSelectionTimeoutError as error:
    print 'Error with mongoDB connection: %s' % error
except pymongo.errors.ConnectionFailure as error:
    print 'Could not connect to MongoDB: %s' % error

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find(condition)
    for entry_json in result:
        print "Document got from collection %s: %s" % (destination, entry_json)
except Exception as error:
    print "Error getting data: %s" % str(error)

Si ejecutamos el código obtendremos los 3 documentos de la colección, en este caso:

OK -- Connected to MongoDB at server 192.168.0.169
Document got from collection USERS: {u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
Document got from collection USERS: {u'city': u'Paris', u'_id': ObjectId('5a11debd9ea4cbaa30bf01ff'), u'surname': u'Mouse', u'name': u'Mickey'}
Document got from collection USERS: {u'city': u'New York', u'_id': ObjectId('5a11df209ea4cbaa30bf0214'), u'surname': u'Kent', u'name': u'Clark'}
Lo primero que debe llamarnos la atención es el hecho de que nosotros hemos guardado en mongo un documento JSON con strings como valores y al recuperar estos datos con pymongo estamos obteniendo unicode.

La razón de esto es que mongo almacena los datos internamente en formato BSON (Binary JSON) y los string los codifica con UTF-8.

Python codifica unicode usando también UTF-8, de este modo para asegurar la compatibilidad pymongo devuelve unicode. Para obtener más información sobre unicode en python puede consultarse el siguiente link: https://docs.python.org/2/howto/unicode.html

La conversión de unicode a string y viceversa va a depender de la codificación empleada. En este caso es tan sencillo como convertir directamente (omitimos la primera parte del código en los posteriores ejemplos):

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find(condition)
    print ''
    for entry_json in result:
        print "Found document at %s collection with next values:" % destination
        for key in entry_json:
            print ' '  + key + ' : ' + str(entry_json[key])
        print ''
except Exception as error:
    print "Error getting data: %s" % str(error)
Al ejecutarlo ya podremos ver los datos guardados como strings:

OK -- Connected to MongoDB at server 192.168.0.169

Found document at USERS collection with next values:
 city : New York
 _id : 5a1174a5fbb7132f5c7d091c
 surname : Wick
 name : John

Found document at USERS collection with next values:
 city : Paris
 _id : 5a11debd9ea4cbaa30bf01ff
 surname : Mouse
 name : Mickey

Found document at USERS collection with next values:
 city : New York
 _id : 5a11df209ea4cbaa30bf0214
 surname : Kent
 name : Clark
El método find() de la librería pymongo que acabamos de emplear devuelve un objecto de tipo cursor que podemos recorrer con un bucle for según hemos visto en el código anterior.

Para saber cuantos documentos vamos a encontrarnos al recorrer un cursor podemos invocar result.count() como vemos en el siguiente código. En caso de que no exista ningún documento el resultado de result.count() será 0.

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find(condition)
    print "Documents found: %d" % result.count()
    print ''
    for entry_json in result:
        print "Found document at %s collection with next values:" % destination
        for key in entry_json:
            print ' '  + key + ' : ' + str(entry_json[key])
        print ''
except Exception as error:
    print "Error getting data: %s" % str(error)
Al ejecutarlo obtendremos el número de elementos encontrados (remarcado en negrita) antes de recorrer el cursor para mostrarlos:

OK -- Connected to MongoDB at server 192.168.0.169
Documents found: 3

Found document at USERS collection with next values:
 city : New York
 _id : 5a1174a5fbb7132f5c7d091c
 surname : Wick
 name : John

Found document at USERS collection with next values:
 city : Paris
 _id : 5a11debd9ea4cbaa30bf01ff
 surname : Mouse
 name : Mickey

Found document at USERS collection with next values:
 city : New York
 _id : 5a11df209ea4cbaa30bf0214
 surname : Kent
 name : Clark
El método find() acepta uno o varios parámetros de cara a llevar a cabo una búsqueda. En los ejemplos anterior teníamos condition={}, que viene a traducirse como "dame todos las entradas".

Para especificar una condición en la búsqueda lo haremos definiendo la condición como un diccionario indicando clave:valor. Así por ejemplo, si queremos sacar la lista de usuarios que viven en "New York" definiremos condition={'city':'New Yor'}:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {'city':'New York'}
    result = collection.find(condition)
    for entry_json in result:
        print entry_json
except Exception as error:
    print "Error getting data: %s" % str(error)
La salida sería:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
{u'city': u'New York', u'_id': ObjectId('5a11df209ea4cbaa30bf0214'), u'surname': u'Kent', u'name': u'Clark'}
Si queremos especificar más de un parámetro al ejecutar la búsqueda, basta incluir varios entradas clave:valor al diccionario. De este modo si definimos:

condition = {'city':'New York', 'name':'John'}
obtendremos un único documento JSON al recorrer el cursor result:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
En ocasiones puede que nos interese obtener un solo documento, bien porque sepamos que solamente hay uno que cumpla una cierta condición (clave única) o bien porque nos baste con obtener un solo documento.

En ese caso pymongo dispone del método find_one().

A diferencia de find() que siempre devolvía un cursor, la función find_one() va a devolver un solo documento o bien None si no existe ningún documento que cumpla las condiciones que le pasemos.

En caso de que lo invoquemos usando condition={} nos devolverá el primer documento de la colección:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find_one(condition)
    print result
except Exception as error:
    print "Error getting data: %s" % str(error)
Si ejecutamos el código obtendremos uno de los 3 elementos de la lista:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
Si definimos condition = {'city':'New York'} igual que hicimos antes con el método find() donde obtuvimos 2 documentos, vemos que en este caso obtendremos solamente uno:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = condition = {'city':'New York', 'name':'John'}
    result = collection.find_one(condition)
    print result
except Exception as error:
    print "Error getting data: %s" % str(error)
Efectivamente obtenemos un solo documento:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
Al emplear find_one() hay que contemplar la posibilidad de que nos devuelva un None:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = condition = {'city':'Tokio'}
    result = collection.find_one(condition)
    if result is None:
        print "None document found!!"
    else:
        print result
except Exception as error:
    print "Error getting data: %s" % str(error)
En este caso obtendremos el mensaje informando que no se ha encontrado ningún documento:

None document found!!

No vamos a entrar en más detalles, solo mencionar que también podemos usar las funciones find_one_and_delete(), find_one_and_replace() y find_one_and_update() cuya documentación podemos encontrar en http://api.mongodb.com/python/current/api/pymongo/collection.html

19 de noviembre de 2017

PyMongo - conexión

En esta nueva entrada vamos a ver como interactuar con la base de datos MongoDB usando python.

MongoDB es una base de datos NoSQL y gracias a su buen rendimiento y escalado podemos considerarla como una alternativa posible dentro del mundo del Big Data. Tiene una amplia documentación disponible en su web

Para poder trabajar contra MongoDB vamos a emplear la librería pymongo: https://pypi.python.org/pypi/pymongo

Pymongo tiene varias virtudes, una de ellas es que es "thread-safe", es decir, que es apta para trabajar con aplicaciones multi-hilo.
En su lista de faq podéis encontrar esta junto a otras muchas features.

El primer paso para trabajar con pymongo es instalar la librería.
La forma más cómoda de instalarla es a través de pip ejecutando pip install pymongo, pero como siempre podemos optar por descargar los paquetes de la web y hacerlo de forma manual tanto en sistemas linux como windows: https://pypi.python.org/pypi/pymongo

Una vez instalada la librería ya podemos comenzar a escribir código.
Vamos a suponer que contamos con una instalación básica de mongo que no requiere autenticación para conectarse. El código para conectarse a MongoDB quedaría del siguiente modo:

#!/usr/bin/env python

import pymongo

MONGODB_HOST = '192.168.0.169'
MONGODB_PORT = '27017'
MONGODB_TIMEOUT = 1000

URI_CONNECTION = "mongodb://" + MONGODB_HOST + ":" + MONGODB_PORT +  "/"

try:
    client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT)
    client.server_info()
    print 'OK -- Connected to MongoDB at server %s' % (MONGODB_HOST)
    client.close()
except pymongo.errors.ServerSelectionTimeoutError as error:
    print 'Error with MongoDB connection: %s' % error
except pymongo.errors.ConnectionFailure as error:
    print 'Could not connect to MongoDB: %s' % error

Con el código anterior simplemente nos conectamos a la base de datos TEST (en caso de que no exista se crea de forma automática) y desconectamos.

Los usuarios de windows es posible que obtengan errores al incorporar el parámetro
serverSelectionTimeoutMS=MONGODB_TIMEOUT
dependiendo de la versión de la librería. En ese caso basta con eliminarlo y dejar simplemente
client = pymongo.MongoClient(URI_CONNECTION)

Al ejecutar el código, si todo va bien obtendremos un mensaje de confirmación:

OK -- Connected to MongoDB at server 192.168.0.169
En caso de error, obtendremos también un mensaje que nos dará la pista de qué está fallando.

Un dato importante a tener en cuenta es que cuando invocamos
pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT)
la librería en realidad está creando un pool de conexiones basado en threads con un tamaño máximo determinado por el parámetro maxPoolSize, que tiene un valor por defecto de 100.

Es decir, la línea:

client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT)
es equivalente a

client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT, maxPoolSize=100)

Bien, ya tenemos casi todo listo para realizar operaciones sobre la base de datos. Antes de continuar vamos a hacer un inciso para apuntar algunas características de MongoDB, sobre todo de cara a la gente que viene del mundo SQL.

En primer lugar mongo almacena los datos en colecciones, que vienen a ser el equivalente a las tablas del mundo SQL. Dentro de esas colecciones MongoDB va a almacenar documentos, que serían el equivalente a los registros del mundo SQL. Estos documentos son en realidad estructuras JSON, que en términos de python, podemos traducir a diccionarios.

Otra de las caracteríticas de MongoDB es que no es necesario definir de forma previa la estructura que van a tener los documentos de una colección, de hecho, ni siquiera es necesario crear la colección, al guardar un documento JSON se creará automáticamente la colección, de modo análogo a lo que sucede con la propia base de datos, que también se creará de forma automática si no existiera.
Dicho de otro modo, al crear el primer documento de una colección dentro de una base de datos se crearían de forma automática tanto la colección como la base de datos en caso de que no existieran,

Teniendo en cuenta esto, para guardar algo en nuestro MongoDB lo único que tenemos que hacer es crear un diccionario, obtener una conexión del pool y ejecutar el insert correspondiente.

Vamos a crear un diccionario con una serie de claves y valores y lo guardaremos. En este caso, vamos a salvar en la colección 'USERS' de la base de datos 'TEST' un registro y vamos a comprobar cómo efectivamente se crean de forma automática tanto la base de datos como la colección.

Suponiendo que partimos de una instalación limpia de mongo si nos conectamos a la consola de mongo y le decimos que nos muestre las bases de datos tendremos:

> show databases;
admin  0.000GB
local  0.000GB
Vamos a ejecutar el código siguiente, el cual creará un documento en la colección USERS de la base de datos TEST:

#!/usr/bin/env python

import pymongo

MONGODB_HOST = '192.168.0.169'
MONGODB_PORT = '27017'
MONGODB_TIMEOUT = 1000
MONGODB_DATABASE = 'TEST'

URI_CONNECTION = "mongodb://" + MONGODB_HOST + ":" + MONGODB_PORT +  "/"

try:
    client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT, maxPoolSize=10)
    client.server_info()
    print 'OK -- Connected to MongoDB at server %s' % (MONGODB_HOST)
except pymongo.errors.ServerSelectionTimeoutError as error:
    print 'Error with mongoDB connection: %s' % error
except pymongo.errors.ConnectionFailure as error:
    print 'Could not connect to MongoDB: %s' % error

database_entry = {}
database_entry['name'] = 'John'
database_entry['surname'] = 'Wick'
database_entry['city'] = 'New York'

# or equivalent database_entry={'name':'John', 'surname':'Wick', 'city':'New York'}

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    collection.insert(database_entry)
    print "Data saved at %s collection in %s database: %s" % (destination, MONGODB_DATABASE, database_entry)
except Exception as error:
    print "Error saving data: %s" % str(error)
Al ejecutar el código, si todo va bien obtendremos algo como lo siguiente:

OK -- Connected to MongoDB at server 192.168.0.169
Data saved at USERS collection in TEST database: {'city': 'New York', '_id': ObjectId('5a1174a5fbb7132f5c7d091c'), 'surname': 'Wick', 'name': 'John'}
Si vamos a la consola de mongo, comprobamos que efectivamente se han creado la base de datos y la colección y que además tenemos el registro guardado:

> show databases;
TEST   0.000GB
admin  0.000GB
local  0.000GB

> use TEST;
switched to db TEST

> show collections;
USERS

> db.USERS.find({}).pretty();
{
 "_id" : ObjectId("5a1174a5fbb7132f5c7d091c"),
 "city" : "New York",
 "surname" : "Wick",
 "name" : "John"
}
Lo primero que nos llamará la atención es el campo '_id'. Es un campo que crea de forma automática mongo y que juega el papel de índice actuando como clave única.

En futuras entradas veremos como explotar la librería pymongo ejecutando diferentes operaciones sobre MongoDB.

18 de noviembre de 2017

Colas (RabbitMQ) - Publicando mensajes

Siguiendo con la interacción entre RabbitMQ y python usando la librería puka, en esta entrada vamos a ver como publicar mensajes.

En la anterior entrada vimos como crear un pool de threads que se conectaba a un broker de RabbitMQ, consumía mensajes y ejecutaba un workflow para cada mensaje consumido. En esta entrada veremos como crear un pool de threads que van a publicar mensajes.

En este caso lo que vamos a publicar son strings aleatorios de cierta longitud que vamos a generar con la función genrandom que acepta como parámetro la longitud con la que queramos generar los mensajes.

Estableceremos un tiempo de espera entre la publicación de mensaje que vendrá determinado por el valor de PUBLISHER_SLEEP_TIME en segundos. Este parámetro junto con el número de threads WORKER_NUMBER van a determinar el rate de publicación.

Igual que en el ejemplo de los consumidores, en caso de problemas con la conexión al broker se establece una política de reconexión con reintentos de conexión cada BROKER_RETRY_CONNECTION segundos.

De este modo el código quedaría del siguiente modo:

#!/usr/bin/env python

import puka
import time
import threading
import random
import string

BROKER_HOST = '192.168.0.169'
QUEUE = 'QUEUE-1'
BROKER_RETRY_CONNECTION = 10
WORKER_NUMBER = 4
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'
PUBLISHER_SLEEP_TIME = 0.2

def genrandom(length):
    chain = ''
    for i in range(length+1):
        chain = chain + random.choice(string.ascii_uppercase)
    return chain

class Threaded_worker(threading.Thread):
    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.client.wait(self.client.connect())
                    self.client.wait(self.client.queue_declare(QUEUE))
                    print '[%s] Succesfully Connected at broker %s' % (threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (
                    threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    msgToPublish = genrandom(40)
                    self.client.wait(self.client.basic_publish(exchange ='', routing_key=QUEUE, body=msgToPublish))
                    time.sleep(PUBLISHER_SLEEP_TIME)
                    print '[%s] Message published at queue: %s' % (threadName, msgToPublish)
                except Exception as error:
                    print '[%s] Error at publisher: %s.....retrying connection after %d seconds' % (
                    threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)


print 'Starting %d publisher threads...' % (WORKER_NUMBER)
for i in range(WORKER_NUMBER):
    td = Threaded_worker()
    td.start()


Al ejecutarlo obtendremos lo siguiente:

Starting 4 publisher threads...
[Thread-1] Succesfully Connected at broker 192.168.0.169
[Thread-3] Succesfully Connected at broker 192.168.0.169
[Thread-2] Succesfully Connected at broker 192.168.0.169
[Thread-4] Succesfully Connected at broker 192.168.0.169
[Thread-1] Message publish at queue: YKHYRBRJWQLOLSREOGONKZICBVOQBXHUALGXCWSIT
[Thread-3] Message publish at queue: AXCLEQMGONKZUPNNOIFFZTLZBMAPRRNCELSWVORAU
[Thread-2] Message publish at queue: HAXNHDXBVJVUVOLTSYRTGEZMXGSWLOTIAMDWIQCQZ
[Thread-4] Message publish at queue: MDRJVZBHFHHCIPNXHFATUHEQWBQSJQWVOVTVIAEMA
[Thread-1] Message publish at queue: SBIRZMHIOGQDTQVSZKGGZICXJIEEXVLCCGFHONQIE
[Thread-3] Message publish at queue: WPLUWHCJWEBYETTDWFIYGBEERADWFVGPQTHPRJIKR
[Thread-2] Message publish at queue: JXGJBYEIUQRZRLFKUSKRPAMKSTZDZGOJIBFGKVLEM
[Thread-4] Message publish at queue: OELBGLBPYFTEBZVKGCGUTBCLQDXRCQLPLPDJDRRRM
........
Con el valor PUBLISHER_SLEEP_TIME = 0.2 deberíamos tener un rate de publicación de 5 mensajes por cada hilo, de forma que si fijamos WORKER_NUMBER = 4, al ejecutarse el código deberían publicarse 20 mensajes por segundo.

Efectivamente, si lo lanzamos y vamos a la web de administración de RabbitMQ vemos que el rate de publicación es el esperado:


20 de mayo de 2017

Colas (RabbitMQ) - Consumidores

En entradas anteriores vimos como trabajar con la librería de colas de python. La librería nos proporciona una funcionalidad bastante completa, pero de cara a desplegar en un entorno de producción un sistema de colas existen soluciones más completas en lo que s refiera a rendidmiento y funcionalidad.

Este es el caso de RabbitMQ. Se trata de un sistema de colas basado en el estándar AMQP. Podemos instalar RabbitMQ como un nodo aislado o bien como un cluster distribuido. En su web podemos encontrar toda la información detallada para su instalación y administración, así como una lista de plugins que ofrecen una funcionalidad adicional.
Cuenta con paquetes para prácticamente todas las plataformas: https://www.rabbitmq.com/download.html

RabbitMQ está escrito en Erlang, de este modo es necesario instalarlo previamente: https://www.erlang.org/downloads

Una vez instalado, recomiendo activar el plugin de administración (management), puesto que proporciona una interfaz web que permite administrarlo de forma cómoda: podemos crear nuevas colas, administrarlas, ver el número de mensajes en cada cola y la tasa de publicación/consumo de mensajes, etc...



Una vez instalado RabbitMQ vamos crear un código python que nos va a permitir insertar mensajes en una cola y por otro lado un código que va a extraer mensajes de la cola y va a procesarlos.

¿Para qué puede servirnos esto? Por ejemplo, si retomamos el ejemplo del código del socket-server podemos hacer que el socket-server meta en la cola cada mensaje que recibamos y posteriormente definir un pool de threads que vaya sacando los mensajes de la cola y los procese.

En la misma web de RabbitMQ podemos encontrar diferentes ejemplos empleando diferentes lenguajes. En el caso de python, veréis que se emplea la librería pika. Es una opción perfectamente válida, no obstante en mi caso después de algunas pruebas me he decantado por usar puka.

Bien, vamos pues a la tarea. Objetivo: crear un grupo de threads que lean mensajes de la cola y ejecuten un workflow para cada uno de ellos. El código quedaría del siguiente modo:


#!/usr/bin/env python

import puka
import time
import threading

BROKER_HOST = '192.168.1.45'
QUEUE = 'COLA-1'
BROKER_RETRY_CONNECTION = 10
WORKER_NUMBER = 4
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'

def workflow(threadName, msg):
    """ Workflow for every message read from queue """
    try:
        print "[%s] Read from queue: %s" % (threadName, msg)
    except Exception as error:
        print "Error: %s" % str(error)
 

class Threaded_worker(threading.Thread):

    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE)
                    print '[%s] Succesfully Connected at broker %s' % (threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise)
                    self.client.basic_ack(self.result)
                    workflow(threadName, self.result['body'])
                except Exception as error:
                    print '[%s] Error at consumer: %s.....retrying connection after %d seconds' % (threadName, str(error),BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)

print 'Starting %d consumer threads...' % (WORKER_NUMBER)
for i in range(WORKER_NUMBER):
    td = Threaded_worker()
    td.start()




Vamos con los detalles: los parámetros BROKER_HOST y QUEUE definen la IP del servidor donde corre RabbitMQ y el nombre de la cola de la que queremos leer los mensajes.

En caso de problema con la conexión al broker (RabbitMQ), ya sea en el instante inicial como durante la operativa, se establece un tiempo de espera para reestablecerla, el cual viene dado por el parámetro BROKER_RETRY_CONNECTION. El parámetro BROKER_CONNECTION nos sirve para almacenar el estado de la conexión.

El parámetro WORKER_NUMBER establece el número de threads o hilos. Cada uno de ellos establecerá una conexión con el broker y cuando haya mensajes los consumirán y para cada uno de ellos ejecutarán el código definido en el método workflow, en este caso simplemente imprimirán el mensaje leído.

¿Qué sucede si el número de hilos no es suficiente para consumir los mensajes? Sería bueno poder aumentar el número de consumidores en función del tamaño de la cola. Esto queda resuelto en la entrada https://codigo-python.blogspot.com.es/2017/12/colas-rabbitmq-numero-variable-de_11.html

En futuras entradas ampliaremos funcionalidad del método workflow, haciendo por ejemplo, que almacene en una BD mongo cada mensaje leído.

15 de marzo de 2017

Socket server como servicio

En la entrada http://codigo-python.blogspot.com.es/2017/02/socket-server-tcp-multi-thread-ii.html construíamos un socket server basado en hilos con control de número hilos en ejecución y de forma que lo que que recibíamos desde los diferentes clientes era almacenado a un fichero de log, el cual además rotaba de forma periódica.

A continuación vamos a ver como ejecutar la aplicación para que se comporte como un servicio en sistemas unix, en concreto en debian y para que corra con un usuario del sistema con los permisos adecuados.

Vamos a suponer que el código de nuestro socket server lo tenemos en el fichero socket-server.py dentro del directorio /opt/socket-server/ y que queremos ejecutar el código como usuario socket-user, el cual pertenece al grupo socket-group.

Tendremos algo como lo siguiente:
root@debian:/opt# pwd
/opt

root@debian:/opt# ls -l
total 4
drwxr-xr-x  2 socket-user socket-group 4096 mar 15 19:15 socket-server

root@debian:/opt/socket-server# ls -l
total 4
-rwxr-xr-x 1 socket-user socket-group 3231 mar 15 19:38 socket-server.py

Los logs de la aplicación vamos a guardarlos en el directorio /var/log/socket-server/, con lo cual nos aseguraremos que el directorio tiene los permisos adecuados:
root@debian:/var/log# pwd
/var/log
root@debian:/var/log# ls -l | grep socket-server
drwxr-xr-x  2 socket-user socket-group       4096 mar 15 19:25 socket-server

Con estas premisas vamos a montar un script de bash que llamaremos socket-server que hará las veces de manejador del servicio y que guardaremos en el directorio /etc/init.d/. El contenido del fichero es el siguiente:
#!/bin/bash

# /etc/init.d/scripts
# Description: Script for manage socket-server
# ————————————————–
#
### BEGIN INIT INFO
# Provides: Scripts for socket-server
# Required-Start: $network $local_fs $syslog
# Required-Stop: $local_fs $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Description: Start Python scripts to provide socket-server service
### END INIT INFO

PIDFILE=/tmp/socket-server
DAEMONLOG=/var/log/socket-server/daemon.log

case "$1" in
 start)
   if [ ! -f $PIDFILE ] ; then
                echo "Starting socket-server..."
                su socket-user -c "nohup /usr/bin/python -u /opt/socket-server/socket-server.py > $DAEMONLOG 2>&1 &"
   else
         for pid in $(cat $PIDFILE) ; do
                if ! ps --no-headers p "$pid" | grep socket-server > /dev/null ; then
                        echo "Starting socket-server..."
                        su socket-user -c "nohup /usr/bin/python -u /opt/socket-server/socket-server.py > $DAEMONLOG 2>&1 &"
                else
                        echo "The socket-server is already running!!"
        fi
   done
  fi
  ;;
  stop)
  if [ ! -f $PIDFILE ] ; then
                echo "The socket-server is not running"
  else
        for pid in $(cat $PIDFILE) ; do
                if ! ps --no-headers p "$pid" | grep socket-server > /dev/null ; then
                        echo "The socket-server is not running"
                else
                        echo "Stopping socket-server..."
                        kill -9 $pid
                fi
        done
  fi
  ;;

 restart)
   $0 stop
   sleep 1
   $0 start
   ;;
 *)
   echo "usage: $0 {start|stop|restart}"
esac

Un par de detalles a tener en cuenta: el script define un fichero de logs DAEMONLOG=/var/log/socket-server/daemon.log, y os preguntaréis para qué? si el propio socket server ya escribe su fichero de logs. En este caso en el fichero daemon.log va a almacenar los logs del propio script de bash y además las excepciones no capturadas del fichero socket-server.py

También podéis observar que definimos un fichero PIDFILE donde se va a almacenar el PID del proceso. Si echamos la vista atrás sobre el código socket-server.py vemos que no se maneja en ningún sitio el PID del proceso. Debemos por tanto incluirlo. Para ello simplemente añadimos un trocito de código a nuestro fichero socket-server.py, de forma que tendremos:

#!/usr/bin/env python

import threading
import SocketServer, socket
import sys
import os
import logging, logging.handlers

TIMEOUT = 10
HOST = '0.0.0.0'
PORT = 3456

MAX_THREADS = 50

LOG_FOLDER = '/var/log/socket-server/'
LOG_FILE = 'socket-server.log'
ROTATE_TIME = 'midnight'
LOG_COUNT = 10
PID = "/tmp/socket-server"

if os.access(os.path.expanduser(PID), os.F_OK):
    logger.info('Checking if socket-server is already running...')
    pidfile = open(os.path.expanduser(PID), "r")
    pidfile.seek(0)
    old_pd = pidfile.readline()
    # process PID
    if os.path.exists("/proc/%s" % old_pd) and old_pd!="":
        logger.info('You already have an instance of the socket-server running')
        logger.info('It is running as process %s' , old_pd)
        sys.exit(1)
    else:
        logger.info('socket-server is not running. Trying to start socket-server...')
        os.remove(os.path.expanduser(PID))
pidfile = open(os.path.expanduser(PID), 'a')
logger.info('socket-server started with PID: %s' , os.getpid())
pidfile.write(str(os.getpid()))
pidfile.close()

log_folder = os.path.dirname(LOG_FOLDER)

if not os.path.exists(log_folder):
 try:
  os.makedirs(log_folder)
 except Exception as error:
  print 'Error creating the log folder: %s' % error
        exit()

try:
 logger = logging.getLogger('socket-server')
 loggerHandler = logging.handlers.TimedRotatingFileHandler(LOG_FOLDER + LOG_FILE , ROTATE_TIME, 1, backupCount=LOG_COUNT)
 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
 loggerHandler.setFormatter(formatter)
 logger.addHandler(loggerHandler)
 logger.setLevel(logging.DEBUG)
except Exception as error:
 print '------------------------------------------------------------------'
 print '[ERROR] Error writing log at %s: %s' % (LOG_FOLDER, error)
 print '[ERROR] Please verify path folder exits and write permissions'
 print '------------------------------------------------------------------'
 exit()


class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        try:
            # chequeamos el numero de threads activos. Si es mayor que el limite establecido cerramos la conexion y no atendemos al cliente. Lo trazamos
            if threading.activeCount() > MAX_THREADS:
                logger.warn('%s -- Execution threads number: %d', threading.currentThread().getName(),
                            threading.activeCount() - 1)
                logger.warn('Max threads number as been reached.')
                self.closed()
            # si no hemos alcanzado el limite lo atendemos
            else:
                threadName = threading.currentThread().getName()
                activeThreads = threading.activeCount() - 1
                clientIP = self.client_address[0]
                logger.info('[%s] -- New connection from %s -- Active threads: %d' , threadName, clientIP, activeThreads)
                data = self.request.recv(1024)
                logger.info('[%s] -- %s -- Received: %s' , threadName, clientIP, data)
                response = 'Thanks %s, message received!!' % clientIP
                self.request.send(response)
        except Exception, error:
            if str(error) == "timed out":
                logger.error ('[%s] -- %s -- Timeout on data transmission ocurred after %d seconds.' ,threadName, clientIP, TIMEOUT)

class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)

    def finish_request(self, request, client_address):
        request.settimeout(TIMEOUT)
        SocketServer.TCPServer.finish_request(self, request, client_address)
        SocketServer.TCPServer.close_request(self, request)

try:
    print "Starting server TCP at IP %s and port %d..." % (HOST,PORT)
    server = ThreadedTCPServer((HOST, PORT), RequestHandler)
    server.serve_forever()
except KeyboardInterrupt:
    server.socket.close()


Con todo lo anterior ya estamos en disposición de manejar nuestro socket-server como un servicio:

root@debian:~# /etc/init.d/socket-server
usage: /etc/init.d/socket-server {start|stop|restart}
Probamos a arrancarlo:

root@debian:~# /etc/init.d/socket-server start
Starting socket-server...
Si todo ha ido bien el proceso debería estar corriendo con el usuario que hemos definido. Si hubiera algún error podemos comprobar el fichero /var/log/socket-server/daemon.log Para detenerlo:

root@debian:~# /etc/init.d/socket-server stop
Stoping socket-server...
Si queremos que el proceso se arranque automáticamente al arrancar el sistema, en el caso de debian ejecutamos los siguientes comandos:

root@debian:~# cd /etc/init.d/
root@debian:/etc/init.d# update-rc.d socket-server defaults
El script que maneja el servicio se puede adaptar fácilmente a vuestro propio código python si queréis convertirlo en servicio. Animaos y probadlo!!

2 de febrero de 2017

Socket server TCP multi-thread - logger

Ampliando el código de la entrada anterior vamos a extender la funcionalidad del socket server.
Como ya vimos en la entrada previa, construíamos la lógica del socket server extendiendo varias clases del módulo SocketServer y sobrescribiendo métodos según el diagrama siguiente:

En este caso, apoyándonos en la misma estructura de clases y métodos vamos a ampliar la funcionalidad guardando en un fichero de logs lo recibido a través del socket server, estableciendo además un límite máximo de clientes simultáneos a los que el socket server atenderá.

Vayamos paso a paso.

Logs:
Emplearemos el módulo logging el cual nos va a facilitar el trabajo.
Los parámetros principales parámetros que debemos indicar son el directorio donde almacenarlos y el nombre del fichero. Aprovechando la potencia del módulo vamos a hacer que los el fichero de logs rote de forma automática cada noche y manteniendo un máximo número de ficheros, de forma que el propio módulo borre aquellos más antiguos de x días.
De este modo tendremos los siguientes parámetros en nuestro código:
LOG_FOLDER = '/var/log/socket-server/'
LOG_FILE = 'socket-server.log'
ROTATE_TIME = 'midnight'
LOG_COUNT = 10

Con los parámetros anteriores ya podemos "armar" nuestro logs:
import os
import logging, logging.handlers

log_folder = os.path.dirname(LOG_FOLDER)

if not os.path.exists(log_folder):
 try:
  os.makedirs(log_folder)
 except Exception as error:
  print 'Error creating the log folder: %s' % error

try:
 logger = logging.getLogger('socket-server')
 loggerHandler = logging.handlers.TimedRotatingFileHandler(LOG_FOLDER + LOG_FILE , ROTATE_TIME, 1, backupCount=LOG_COUNT)
 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
 loggerHandler.setFormatter(formatter)
 logger.addHandler(loggerHandler)
 logger.setLevel(logging.DEBUG)
except:
 print '------------------------------------------------------------------'
 print '[ERROR] Error writing log at %s' % INTERNAL_LOG
 print '[ERROR] Please verify path folder exits and write permissions'
 print '------------------------------------------------------------------'
 exit() 

Lo que hacemos con el código anterior es comprobar si el directorio existe y en caso de no existir lo intentamos crear, trazando cualquier error que pudiera producirse al crearlo.
Por ejemplo, en sistemas unix, si el usuario que ejecuta el código no tiene permiso de escritura sobre el directorio que contendrá al directorio de logs obtendremos un error como el siguiente:
Error creating the log folder: [Errno 13] Permission denied: '/var/log/socket-server'

Por ello debemos asegurarnos que corremos el código con usuario con permisos suficientes.
El resto del código aportan la lógica comentada antes de rotación, número de ficheros de logs, etc..

Veamos a continuación como incluirlo en el código del socket server incluyendo además el límite de hilos máximos en ejecución o dicho de otro modo el número de clientes simultáneos que queremos atender:

#!/usr/bin/env python

import threading
import SocketServer, socket
import sys
import os
import logging, logging.handlers

TIMEOUT = 10
HOST = '0.0.0.0'
PORT = 3456

MAX_THREADS = 50

LOG_FOLDER = '/var/log/socket-server/'
LOG_FILE = 'socket-server.log'
ROTATE_TIME = 'midnight'
LOG_COUNT = 10

log_folder = os.path.dirname(LOG_FOLDER)

if not os.path.exists(log_folder):
 try:
  os.makedirs(log_folder)
 except Exception as error:
  print 'Error creating the log folder: %s' % error
        exit()

try:
 logger = logging.getLogger('socket-server')
 loggerHandler = logging.handlers.TimedRotatingFileHandler(LOG_FOLDER + LOG_FILE , ROTATE_TIME, 1, backupCount=LOG_COUNT)
 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
 loggerHandler.setFormatter(formatter)
 logger.addHandler(loggerHandler)
 logger.setLevel(logging.DEBUG)
except Exception as error:
 print '------------------------------------------------------------------'
 print '[ERROR] Error writing log at %s: %s' % (LOG_FOLDER, error)
 print '[ERROR] Please verify path folder exits and write permissions'
 print '------------------------------------------------------------------'
 exit()


class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        try:
            # chequeamos el numero de threads activos. Si es mayor que el limite establecido cerramos la conexion y no atendemos al cliente. Lo trazamos
            if threading.activeCount() > MAX_THREADS:
                logger.warn('%s -- Execution threads number: %d', threading.currentThread().getName(),
                            threading.activeCount() - 1)
                logger.warn('Max threads number as been reached.')
                self.closed()
            # si no hemos alcanzado el limite lo atendemos
            else:
                threadName = threading.currentThread().getName()
                activeThreads = threading.activeCount() - 1
                clientIP = self.client_address[0]
                logger.info('[%s] -- New connection from %s -- Active threads: %d' , threadName, clientIP, activeThreads)
                data = self.request.recv(1024)
                logger.info('[%s] -- %s -- Received: %s' , threadName, clientIP, data)
                response = 'Thanks %s, message received!!' % clientIP
                self.request.send(response)
        except Exception, error:
            if str(error) == "timed out":
                logger.error ('[%s] -- %s -- Timeout on data transmission ocurred after %d seconds.' ,threadName, clientIP, TIMEOUT)

class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)

    def finish_request(self, request, client_address):
        request.settimeout(TIMEOUT)
        SocketServer.TCPServer.finish_request(self, request, client_address)
        SocketServer.TCPServer.close_request(self, request)

try:
    print "Starting server TCP at IP %s and port %d..." % (HOST,PORT)
    server = ThreadedTCPServer((HOST, PORT), RequestHandler)
    server.serve_forever()
except KeyboardInterrupt:
    server.socket.close()

Las etiquetas empleadas son bastante descriptivas:
logger.info() ---> entradas con etiqueta INFO en el fichero de logs
logger.warn() ---> entradas con etiqueta WARN en el fichero de logs
logger.error() ---> entradas con etiqueta ERROR en el fichero de logs


En el fichero /var/log/socket-server/socket-server.log tendremos contenido del siguiente tipo a medida que vayan conectando diferentes clientes:
2017-02-02 21:24:29,701 INFO [Thread-1] -- New connection from 192.168.0.143 -- Active threads: 1
2017-02-02 21:24:35,704 INFO [Thread-1] -- 192.168.0.143 -- Received: hola, que tal?
2017-02-02 21:24:46,697 INFO [Thread-2] -- New connection from 192.168.0.145 -- Active threads: 1
2017-02-02 21:24:51,331 INFO [Thread-2] -- 192.168.0.145 -- Received: buenos dias
2017-02-02 21:31:30,555 INFO [Thread-3] -- New connection from 192.168.0.89 -- Active threads: 1
2017-02-02 21:31:40,561 ERROR [Thread-3] -- 192.168.0.89 -- Timeout on data transmission ocurred after 10 seconds.


Si dejáramos la aplicación corriendo varios días como servicio (más adelante veremos como hacerlo) veríamos como el fichero de logs rota cada noche añadiéndose al nombre del fichero al rotar la fecha para tenerlos de este modo clasificados. Al tener más de 10 ficheros de logs al rotar se borrarían automáticamente los más antiguos.