19 de noviembre de 2017

PyMongo - conexión

En esta nueva entrada vamos a ver como interactuar con la base de datos MongoDB usando python.

MongoDB es una base de datos NoSQL y gracias a su buen rendimiento y escalado podemos considerarla como una alternativa posible dentro del mundo del Big Data. Tiene una amplia documentación disponible en su web

Para poder trabajar contra MongoDB vamos a emplear la librería pymongo: https://pypi.python.org/pypi/pymongo

Pymongo tiene varias virtudes, una de ellas es que es "thread-safe", es decir, que es apta para trabajar con aplicaciones multi-hilo.
En su lista de faq podéis encontrar esta junto a otras muchas features.

El primer paso para trabajar con pymongo es instalar la librería.
La forma más cómoda de instalarla es a través de pip ejecutando pip install pymongo, pero como siempre podemos optar por descargar los paquetes de la web y hacerlo de forma manual tanto en sistemas linux como windows: https://pypi.python.org/pypi/pymongo

Una vez instalada la librería ya podemos comenzar a escribir código.
Vamos a suponer que contamos con una instalación básica de mongo que no requiere autenticación para conectarse. El código para conectarse a MongoDB quedaría del siguiente modo:

#!/usr/bin/env python

import pymongo

MONGODB_HOST = '192.168.0.169'
MONGODB_PORT = '27017'
MONGODB_TIMEOUT = 1000

URI_CONNECTION = "mongodb://" + MONGODB_HOST + ":" + MONGODB_PORT +  "/"

try:
    client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT)
    client.server_info()
    print 'OK -- Connected to MongoDB at server %s' % (MONGODB_HOST)
    client.close()
except pymongo.errors.ServerSelectionTimeoutError as error:
    print 'Error with MongoDB connection: %s' % error
except pymongo.errors.ConnectionFailure as error:
    print 'Could not connect to MongoDB: %s' % error

Con el código anterior simplemente nos conectamos a la base de datos TEST (en caso de que no exista se crea de forma automática) y desconectamos.

Los usuarios de windows es posible que obtengan errores al incorporar el parámetro
serverSelectionTimeoutMS=MONGODB_TIMEOUT
dependiendo de la versión de la librería. En ese caso basta con eliminarlo y dejar simplemente
client = pymongo.MongoClient(URI_CONNECTION)

Al ejecutar el código, si todo va bien obtendremos un mensaje de confirmación:

OK -- Connected to MongoDB at server 192.168.0.169
En caso de error, obtendremos también un mensaje que nos dará la pista de qué está fallando.

Un dato importante a tener en cuenta es que cuando invocamos
pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT)
la librería en realidad está creando un pool de conexiones basado en threads con un tamaño máximo determinado por el parámetro maxPoolSize, que tiene un valor por defecto de 100.

Es decir, la línea:

client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT)
es equivalente a

client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT, maxPoolSize=100)

Bien, ya tenemos casi todo listo para realizar operaciones sobre la base de datos. Antes de continuar vamos a hacer un inciso para apuntar algunas características de MongoDB, sobre todo de cara a la gente que viene del mundo SQL.

En primer lugar mongo almacena los datos en colecciones, que vienen a ser el equivalente a las tablas del mundo SQL. Dentro de esas colecciones MongoDB va a almacenar documentos, que serían el equivalente a los registros del mundo SQL. Estos documentos son en realidad estructuras JSON, que en términos de python, podemos traducir a diccionarios.

Otra de las caracteríticas de MongoDB es que no es necesario definir de forma previa la estructura que van a tener los documentos de una colección, de hecho, ni siquiera es necesario crear la colección, al guardar un documento JSON se creará automáticamente la colección, de modo análogo a lo que sucede con la propia base de datos, que también se creará de forma automática si no existiera.
Dicho de otro modo, al crear el primer documento de una colección dentro de una base de datos se crearían de forma automática tanto la colección como la base de datos en caso de que no existieran,

Teniendo en cuenta esto, para guardar algo en nuestro MongoDB lo único que tenemos que hacer es crear un diccionario, obtener una conexión del pool y ejecutar el insert correspondiente.

Vamos a crear un diccionario con una serie de claves y valores y lo guardaremos. En este caso, vamos a salvar en la colección 'USERS' de la base de datos 'TEST' un registro y vamos a comprobar cómo efectivamente se crean de forma automática tanto la base de datos como la colección.

Suponiendo que partimos de una instalación limpia de mongo si nos conectamos a la consola de mongo y le decimos que nos muestre las bases de datos tendremos:

> show databases;
admin  0.000GB
local  0.000GB
Vamos a ejecutar el código siguiente, el cual creará un documento en la colección USERS de la base de datos TEST:

#!/usr/bin/env python

import pymongo

MONGODB_HOST = '192.168.0.169'
MONGODB_PORT = '27017'
MONGODB_TIMEOUT = 1000
MONGODB_DATABASE = 'TEST'

URI_CONNECTION = "mongodb://" + MONGODB_HOST + ":" + MONGODB_PORT +  "/"

try:
    client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT, maxPoolSize=10)
    client.server_info()
    print 'OK -- Connected to MongoDB at server %s' % (MONGODB_HOST)
except pymongo.errors.ServerSelectionTimeoutError as error:
    print 'Error with mongoDB connection: %s' % error
except pymongo.errors.ConnectionFailure as error:
    print 'Could not connect to MongoDB: %s' % error

database_entry = {}
database_entry['name'] = 'John'
database_entry['surname'] = 'Wick'
database_entry['city'] = 'New York'

# or equivalent database_entry={'name':'John', 'surname':'Wick', 'city':'New York'}

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    collection.insert(database_entry)
    print "Data saved at %s collection in %s database: %s" % (destination, MONGODB_DATABASE, database_entry)
except Exception as error:
    print "Error saving data: %s" % str(error)
Al ejecutar el código, si todo va bien obtendremos algo como lo siguiente:

OK -- Connected to MongoDB at server 192.168.0.169
Data saved at USERS collection in TEST database: {'city': 'New York', '_id': ObjectId('5a1174a5fbb7132f5c7d091c'), 'surname': 'Wick', 'name': 'John'}
Si vamos a la consola de mongo, comprobamos que efectivamente se han creado la base de datos y la colección y que además tenemos el registro guardado:

> show databases;
TEST   0.000GB
admin  0.000GB
local  0.000GB

> use TEST;
switched to db TEST

> show collections;
USERS

> db.USERS.find({}).pretty();
{
 "_id" : ObjectId("5a1174a5fbb7132f5c7d091c"),
 "city" : "New York",
 "surname" : "Wick",
 "name" : "John"
}
Lo primero que nos llamará la atención es el campo '_id'. Es un campo que crea de forma automática mongo y que juega el papel de índice actuando como clave única.

En futuras entradas veremos como explotar la librería pymongo ejecutando diferentes operaciones sobre MongoDB.

18 de noviembre de 2017

Colas (RabbitMQ) - Publicando mensajes

Siguiendo con la interacción entre RabbitMQ y python usando la librería puka, en esta entrada vamos a ver como publicar mensajes.

En la anterior entrada vimos como crear un pool de threads que se conectaba a un broker de RabbitMQ, consumía mensajes y ejecutaba un workflow para cada mensaje consumido. En esta entrada veremos como crear un pool de threads que van a publicar mensajes.

En este caso lo que vamos a publicar son strings aleatorios de cierta longitud que vamos a generar con la función genrandom que acepta como parámetro la longitud con la que queramos generar los mensajes.

Estableceremos un tiempo de espera entre la publicación de mensaje que vendrá determinado por el valor de PUBLISHER_SLEEP_TIME en segundos. Este parámetro junto con el número de threads WORKER_NUMBER van a determinar el rate de publicación.

Igual que en el ejemplo de los consumidores, en caso de problemas con la conexión al broker se establece una política de reconexión con reintentos de conexión cada BROKER_RETRY_CONNECTION segundos.

De este modo el código quedaría del siguiente modo:

#!/usr/bin/env python

import puka
import time
import threading
import random
import string

BROKER_HOST = '192.168.0.169'
QUEUE = 'QUEUE-1'
BROKER_RETRY_CONNECTION = 10
WORKER_NUMBER = 4
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'
PUBLISHER_SLEEP_TIME = 0.2

def genrandom(length):
    chain = ''
    for i in range(length+1):
        chain = chain + random.choice(string.ascii_uppercase)
    return chain

class Threaded_worker(threading.Thread):
    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.client.wait(self.client.connect())
                    self.client.wait(self.client.queue_declare(QUEUE))
                    print '[%s] Succesfully Connected at broker %s' % (threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (
                    threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    msgToPublish = genrandom(40)
                    self.client.wait(self.client.basic_publish(exchange ='', routing_key=QUEUE, body=msgToPublish))
                    time.sleep(PUBLISHER_SLEEP_TIME)
                    print '[%s] Message published at queue: %s' % (threadName, msgToPublish)
                except Exception as error:
                    print '[%s] Error at publisher: %s.....retrying connection after %d seconds' % (
                    threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)


print 'Starting %d publisher threads...' % (WORKER_NUMBER)
for i in range(WORKER_NUMBER):
    td = Threaded_worker()
    td.start()


Al ejecutarlo obtendremos lo siguiente:

Starting 4 publisher threads...
[Thread-1] Succesfully Connected at broker 192.168.0.169
[Thread-3] Succesfully Connected at broker 192.168.0.169
[Thread-2] Succesfully Connected at broker 192.168.0.169
[Thread-4] Succesfully Connected at broker 192.168.0.169
[Thread-1] Message publish at queue: YKHYRBRJWQLOLSREOGONKZICBVOQBXHUALGXCWSIT
[Thread-3] Message publish at queue: AXCLEQMGONKZUPNNOIFFZTLZBMAPRRNCELSWVORAU
[Thread-2] Message publish at queue: HAXNHDXBVJVUVOLTSYRTGEZMXGSWLOTIAMDWIQCQZ
[Thread-4] Message publish at queue: MDRJVZBHFHHCIPNXHFATUHEQWBQSJQWVOVTVIAEMA
[Thread-1] Message publish at queue: SBIRZMHIOGQDTQVSZKGGZICXJIEEXVLCCGFHONQIE
[Thread-3] Message publish at queue: WPLUWHCJWEBYETTDWFIYGBEERADWFVGPQTHPRJIKR
[Thread-2] Message publish at queue: JXGJBYEIUQRZRLFKUSKRPAMKSTZDZGOJIBFGKVLEM
[Thread-4] Message publish at queue: OELBGLBPYFTEBZVKGCGUTBCLQDXRCQLPLPDJDRRRM
........
Con el valor PUBLISHER_SLEEP_TIME = 0.2 deberíamos tener un rate de publicación de 5 mensajes por cada hilo, de forma que si fijamos WORKER_NUMBER = 4, al ejecutarse el código deberían publicarse 20 mensajes por segundo.

Efectivamente, si lo lanzamos y vamos a la web de administración de RabbitMQ vemos que el rate de publicación es el esperado:


20 de mayo de 2017

Colas (RabbitMQ) - Consumidores

En entradas anteriores vimos como trabajar con la librería de colas de python. La librería nos proporciona una funcionalidad bastante completa, pero de cara a desplegar en un entorno de producción un sistema de colas existen soluciones más completas en lo que s refiera a rendidmiento y funcionalidad.

Este es el caso de RabbitMQ. Se trata de un sistema de colas basado en el estándar AMQP. Podemos instalar RabbitMQ como un nodo aislado o bien como un cluster distribuido. En su web podemos encontrar toda la información detallada para su instalación y administración, así como una lista de plugins que ofrecen una funcionalidad adicional.
Cuenta con paquetes para prácticamente todas las plataformas: https://www.rabbitmq.com/download.html

RabbitMQ está escrito en Erlang, de este modo es necesario instalarlo previamente: https://www.erlang.org/downloads

Una vez instalado, recomiendo activar el plugin de administración (management), puesto que proporciona una interfaz web que permite administrarlo de forma cómoda: podemos crear nuevas colas, administrarlas, ver el número de mensajes en cada cola y la tasa de publicación/consumo de mensajes, etc...



Una vez instalado RabbitMQ vamos crear un código python que nos va a permitir insertar mensajes en una cola y por otro lado un código que va a extraer mensajes de la cola y va a procesarlos.

¿Para qué puede servirnos esto? Por ejemplo, si retomamos el ejemplo del código del socket-server podemos hacer que el socket-server meta en la cola cada mensaje que recibamos y posteriormente definir un pool de threads que vaya sacando los mensajes de la cola y los procese.

En la misma web de RabbitMQ podemos encontrar diferentes ejemplos empleando diferentes lenguajes. En el caso de python, veréis que se emplea la librería pika. Es una opción perfectamente válida, no obstante en mi caso después de algunas pruebas me he decantado por usar puka.

Bien, vamos pues a la tarea. Objetivo: crear un grupo de threads que lean mensajes de la cola y ejecuten un workflow para cada uno de ellos. El código quedaría del siguiente modo:


#!/usr/bin/env python

import puka
import time
import threading

BROKER_HOST = '192.168.1.45'
QUEUE = 'COLA-1'
BROKER_RETRY_CONNECTION = 10
WORKER_NUMBER = 4
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'

def workflow(threadName, msg):
    """ Workflow for every message read from queue """
    try:
        print "[%s] Read from queue: %s" % (threadName, msg)
    except Exception as error:
        print "Error: %s" % str(error)
 

class Threaded_worker(threading.Thread):

    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE)
                    print '[%s] Succesfully Connected at broker %s' % (threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise)
                    self.client.basic_ack(self.result)
                    workflow(threadName, self.result['body'])
                except Exception as error:
                    print '[%s] Error at consumer: %s.....retrying connection after %d seconds' % (threadName, str(error),BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)

print 'Starting %d consumer threads...' % (WORKER_NUMBER)
for i in range(WORKER_NUMBER):
    td = Threaded_worker()
    td.start()




Vamos con los detalles: los parámetros BROKER_HOST y QUEUE definen la IP del servidor donde corre RabbitMQ y el nombre de la cola de la que queremos leer los mensajes.

En caso de problema con la conexión al broker (RabbitMQ), ya sea en el instante inicial como durante la operativa, se establece un tiempo de espera para reestablecerla, el cual viene dado por el parámetro BROKER_RETRY_CONNECTION. El parámetro BROKER_CONNECTION nos sirve para almacenar el estado de la conexión.

El parámetro WORKER_NUMBER establece el número de threads o hilos. Cada uno de ellos establecerá una conexión con el broker y cuando haya mensajes los consumirán y para cada uno de ellos ejecutarán el código definido en el método workflow, en este caso simplemente imprimirán el mensaje leído.

¿Qué sucede si el número de hilos no es suficiente para consumir los mensajes? Sería bueno poder aumentar el número de consumidores en función del tamaño de la cola. Esto queda resuelto en la entrada https://codigo-python.blogspot.com.es/2017/12/colas-rabbitmq-numero-variable-de_11.html

En futuras entradas ampliaremos funcionalidad del método workflow, haciendo por ejemplo, que almacene en una BD mongo cada mensaje leído.

15 de marzo de 2017

Socket server como servicio

En la entrada http://codigo-python.blogspot.com.es/2017/02/socket-server-tcp-multi-thread-ii.html construíamos un socket server basado en hilos con control de número hilos en ejecución y de forma que lo que que recibíamos desde los diferentes clientes era almacenado a un fichero de log, el cual además rotaba de forma periódica.

A continuación vamos a ver como ejecutar la aplicación para que se comporte como un servicio en sistemas unix, en concreto en debian y para que corra con un usuario del sistema con los permisos adecuados.

Vamos a suponer que el código de nuestro socket server lo tenemos en el fichero socket-server.py dentro del directorio /opt/socket-server/ y que queremos ejecutar el código como usuario socket-user, el cual pertenece al grupo socket-group.

Tendremos algo como lo siguiente:
root@debian:/opt# pwd
/opt

root@debian:/opt# ls -l
total 4
drwxr-xr-x  2 socket-user socket-group 4096 mar 15 19:15 socket-server

root@debian:/opt/socket-server# ls -l
total 4
-rwxr-xr-x 1 socket-user socket-group 3231 mar 15 19:38 socket-server.py

Los logs de la aplicación vamos a guardarlos en el directorio /var/log/socket-server/, con lo cual nos aseguraremos que el directorio tiene los permisos adecuados:
root@debian:/var/log# pwd
/var/log
root@debian:/var/log# ls -l | grep socket-server
drwxr-xr-x  2 socket-user socket-group       4096 mar 15 19:25 socket-server

Con estas premisas vamos a montar un script de bash que llamaremos socket-server que hará las veces de manejador del servicio y que guardaremos en el directorio /etc/init.d/. El contenido del fichero es el siguiente:
#!/bin/bash

# /etc/init.d/scripts
# Description: Script for manage socket-server
# ————————————————–
#
### BEGIN INIT INFO
# Provides: Scripts for socket-server
# Required-Start: $network $local_fs $syslog
# Required-Stop: $local_fs $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Description: Start Python scripts to provide socket-server service
### END INIT INFO

PIDFILE=/tmp/socket-server
DAEMONLOG=/var/log/socket-server/daemon.log

case "$1" in
 start)
   if [ ! -f $PIDFILE ] ; then
                echo "Starting socket-server..."
                su socket-user -c "nohup /usr/bin/python -u /opt/socket-server/socket-server.py > $DAEMONLOG 2>&1 &"
   else
         for pid in $(cat $PIDFILE) ; do
                if ! ps --no-headers p "$pid" | grep socket-server > /dev/null ; then
                        echo "Starting socket-server..."
                        su socket-user -c "nohup /usr/bin/python -u /opt/socket-server/socket-server.py > $DAEMONLOG 2>&1 &"
                else
                        echo "The socket-server is already running!!"
        fi
   done
  fi
  ;;
  stop)
  if [ ! -f $PIDFILE ] ; then
                echo "The socket-server is not running"
  else
        for pid in $(cat $PIDFILE) ; do
                if ! ps --no-headers p "$pid" | grep socket-server > /dev/null ; then
                        echo "The socket-server is not running"
                else
                        echo "Stopping socket-server..."
                        kill -9 $pid
                fi
        done
  fi
  ;;

 restart)
   $0 stop
   sleep 1
   $0 start
   ;;
 *)
   echo "usage: $0 {start|stop|restart}"
esac

Un par de detalles a tener en cuenta: el script define un fichero de logs DAEMONLOG=/var/log/socket-server/daemon.log, y os preguntaréis para qué? si el propio socket server ya escribe su fichero de logs. En este caso en el fichero daemon.log va a almacenar los logs del propio script de bash y además las excepciones no capturadas del fichero socket-server.py

También podéis observar que definimos un fichero PIDFILE donde se va a almacenar el PID del proceso. Si echamos la vista atrás sobre el código socket-server.py vemos que no se maneja en ningún sitio el PID del proceso. Debemos por tanto incluirlo. Para ello simplemente añadimos un trocito de código a nuestro fichero socket-server.py, de forma que tendremos:

#!/usr/bin/env python

import threading
import SocketServer, socket
import sys
import os
import logging, logging.handlers

TIMEOUT = 10
HOST = '0.0.0.0'
PORT = 3456

MAX_THREADS = 50

LOG_FOLDER = '/var/log/socket-server/'
LOG_FILE = 'socket-server.log'
ROTATE_TIME = 'midnight'
LOG_COUNT = 10
PID = "/tmp/socket-server"

if os.access(os.path.expanduser(PID), os.F_OK):
    logger.info('Checking if socket-server is already running...')
    pidfile = open(os.path.expanduser(PID), "r")
    pidfile.seek(0)
    old_pd = pidfile.readline()
    # process PID
    if os.path.exists("/proc/%s" % old_pd) and old_pd!="":
        logger.info('You already have an instance of the socket-server running')
        logger.info('It is running as process %s' , old_pd)
        sys.exit(1)
    else:
        logger.info('socket-server is not running. Trying to start socket-server...')
        os.remove(os.path.expanduser(PID))
pidfile = open(os.path.expanduser(PID), 'a')
logger.info('socket-server started with PID: %s' , os.getpid())
pidfile.write(str(os.getpid()))
pidfile.close()

log_folder = os.path.dirname(LOG_FOLDER)

if not os.path.exists(log_folder):
 try:
  os.makedirs(log_folder)
 except Exception as error:
  print 'Error creating the log folder: %s' % error
        exit()

try:
 logger = logging.getLogger('socket-server')
 loggerHandler = logging.handlers.TimedRotatingFileHandler(LOG_FOLDER + LOG_FILE , ROTATE_TIME, 1, backupCount=LOG_COUNT)
 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
 loggerHandler.setFormatter(formatter)
 logger.addHandler(loggerHandler)
 logger.setLevel(logging.DEBUG)
except Exception as error:
 print '------------------------------------------------------------------'
 print '[ERROR] Error writing log at %s: %s' % (LOG_FOLDER, error)
 print '[ERROR] Please verify path folder exits and write permissions'
 print '------------------------------------------------------------------'
 exit()


class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        try:
            # chequeamos el numero de threads activos. Si es mayor que el limite establecido cerramos la conexion y no atendemos al cliente. Lo trazamos
            if threading.activeCount() > MAX_THREADS:
                logger.warn('%s -- Execution threads number: %d', threading.currentThread().getName(),
                            threading.activeCount() - 1)
                logger.warn('Max threads number as been reached.')
                self.closed()
            # si no hemos alcanzado el limite lo atendemos
            else:
                threadName = threading.currentThread().getName()
                activeThreads = threading.activeCount() - 1
                clientIP = self.client_address[0]
                logger.info('[%s] -- New connection from %s -- Active threads: %d' , threadName, clientIP, activeThreads)
                data = self.request.recv(1024)
                logger.info('[%s] -- %s -- Received: %s' , threadName, clientIP, data)
                response = 'Thanks %s, message received!!' % clientIP
                self.request.send(response)
        except Exception, error:
            if str(error) == "timed out":
                logger.error ('[%s] -- %s -- Timeout on data transmission ocurred after %d seconds.' ,threadName, clientIP, TIMEOUT)

class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)

    def finish_request(self, request, client_address):
        request.settimeout(TIMEOUT)
        SocketServer.TCPServer.finish_request(self, request, client_address)
        SocketServer.TCPServer.close_request(self, request)

try:
    print "Starting server TCP at IP %s and port %d..." % (HOST,PORT)
    server = ThreadedTCPServer((HOST, PORT), RequestHandler)
    server.serve_forever()
except KeyboardInterrupt:
    server.socket.close()


Con todo lo anterior ya estamos en disposición de manejar nuestro socket-server como un servicio:

root@debian:~# /etc/init.d/socket-server
usage: /etc/init.d/socket-server {start|stop|restart}
Probamos a arrancarlo:

root@debian:~# /etc/init.d/socket-server start
Starting socket-server...
Si todo ha ido bien el proceso debería estar corriendo con el usuario que hemos definido. Si hubiera algún error podemos comprobar el fichero /var/log/socket-server/daemon.log Para detenerlo:

root@debian:~# /etc/init.d/socket-server stop
Stoping socket-server...
Si queremos que el proceso se arranque automáticamente al arrancar el sistema, en el caso de debian ejecutamos los siguientes comandos:

root@debian:~# cd /etc/init.d/
root@debian:/etc/init.d# update-rc.d socket-server defaults
El script que maneja el servicio se puede adaptar fácilmente a vuestro propio código python si queréis convertirlo en servicio. Animaos y probadlo!!

2 de febrero de 2017

Socket server TCP multi-thread - logger

Ampliando el código de la entrada anterior vamos a extender la funcionalidad del socket server.
Como ya vimos en la entrada previa, construíamos la lógica del socket server extendiendo varias clases del módulo SocketServer y sobrescribiendo métodos según el diagrama siguiente:

En este caso, apoyándonos en la misma estructura de clases y métodos vamos a ampliar la funcionalidad guardando en un fichero de logs lo recibido a través del socket server, estableciendo además un límite máximo de clientes simultáneos a los que el socket server atenderá.

Vayamos paso a paso.

Logs:
Emplearemos el módulo logging el cual nos va a facilitar el trabajo.
Los parámetros principales parámetros que debemos indicar son el directorio donde almacenarlos y el nombre del fichero. Aprovechando la potencia del módulo vamos a hacer que los el fichero de logs rote de forma automática cada noche y manteniendo un máximo número de ficheros, de forma que el propio módulo borre aquellos más antiguos de x días.
De este modo tendremos los siguientes parámetros en nuestro código:
LOG_FOLDER = '/var/log/socket-server/'
LOG_FILE = 'socket-server.log'
ROTATE_TIME = 'midnight'
LOG_COUNT = 10

Con los parámetros anteriores ya podemos "armar" nuestro logs:
import os
import logging, logging.handlers

log_folder = os.path.dirname(LOG_FOLDER)

if not os.path.exists(log_folder):
 try:
  os.makedirs(log_folder)
 except Exception as error:
  print 'Error creating the log folder: %s' % error

try:
 logger = logging.getLogger('socket-server')
 loggerHandler = logging.handlers.TimedRotatingFileHandler(LOG_FOLDER + LOG_FILE , ROTATE_TIME, 1, backupCount=LOG_COUNT)
 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
 loggerHandler.setFormatter(formatter)
 logger.addHandler(loggerHandler)
 logger.setLevel(logging.DEBUG)
except:
 print '------------------------------------------------------------------'
 print '[ERROR] Error writing log at %s' % INTERNAL_LOG
 print '[ERROR] Please verify path folder exits and write permissions'
 print '------------------------------------------------------------------'
 exit() 

Lo que hacemos con el código anterior es comprobar si el directorio existe y en caso de no existir lo intentamos crear, trazando cualquier error que pudiera producirse al crearlo.
Por ejemplo, en sistemas unix, si el usuario que ejecuta el código no tiene permiso de escritura sobre el directorio que contendrá al directorio de logs obtendremos un error como el siguiente:
Error creating the log folder: [Errno 13] Permission denied: '/var/log/socket-server'

Por ello debemos asegurarnos que corremos el código con usuario con permisos suficientes.
El resto del código aportan la lógica comentada antes de rotación, número de ficheros de logs, etc..

Veamos a continuación como incluirlo en el código del socket server incluyendo además el límite de hilos máximos en ejecución o dicho de otro modo el número de clientes simultáneos que queremos atender:

#!/usr/bin/env python

import threading
import SocketServer, socket
import sys
import os
import logging, logging.handlers

TIMEOUT = 10
HOST = '0.0.0.0'
PORT = 3456

MAX_THREADS = 50

LOG_FOLDER = '/var/log/socket-server/'
LOG_FILE = 'socket-server.log'
ROTATE_TIME = 'midnight'
LOG_COUNT = 10

log_folder = os.path.dirname(LOG_FOLDER)

if not os.path.exists(log_folder):
 try:
  os.makedirs(log_folder)
 except Exception as error:
  print 'Error creating the log folder: %s' % error
        exit()

try:
 logger = logging.getLogger('socket-server')
 loggerHandler = logging.handlers.TimedRotatingFileHandler(LOG_FOLDER + LOG_FILE , ROTATE_TIME, 1, backupCount=LOG_COUNT)
 formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
 loggerHandler.setFormatter(formatter)
 logger.addHandler(loggerHandler)
 logger.setLevel(logging.DEBUG)
except Exception as error:
 print '------------------------------------------------------------------'
 print '[ERROR] Error writing log at %s: %s' % (LOG_FOLDER, error)
 print '[ERROR] Please verify path folder exits and write permissions'
 print '------------------------------------------------------------------'
 exit()


class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        try:
            # chequeamos el numero de threads activos. Si es mayor que el limite establecido cerramos la conexion y no atendemos al cliente. Lo trazamos
            if threading.activeCount() > MAX_THREADS:
                logger.warn('%s -- Execution threads number: %d', threading.currentThread().getName(),
                            threading.activeCount() - 1)
                logger.warn('Max threads number as been reached.')
                self.closed()
            # si no hemos alcanzado el limite lo atendemos
            else:
                threadName = threading.currentThread().getName()
                activeThreads = threading.activeCount() - 1
                clientIP = self.client_address[0]
                logger.info('[%s] -- New connection from %s -- Active threads: %d' , threadName, clientIP, activeThreads)
                data = self.request.recv(1024)
                logger.info('[%s] -- %s -- Received: %s' , threadName, clientIP, data)
                response = 'Thanks %s, message received!!' % clientIP
                self.request.send(response)
        except Exception, error:
            if str(error) == "timed out":
                logger.error ('[%s] -- %s -- Timeout on data transmission ocurred after %d seconds.' ,threadName, clientIP, TIMEOUT)

class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)

    def finish_request(self, request, client_address):
        request.settimeout(TIMEOUT)
        SocketServer.TCPServer.finish_request(self, request, client_address)
        SocketServer.TCPServer.close_request(self, request)

try:
    print "Starting server TCP at IP %s and port %d..." % (HOST,PORT)
    server = ThreadedTCPServer((HOST, PORT), RequestHandler)
    server.serve_forever()
except KeyboardInterrupt:
    server.socket.close()

Las etiquetas empleadas son bastante descriptivas:
logger.info() ---> entradas con etiqueta INFO en el fichero de logs
logger.warn() ---> entradas con etiqueta WARN en el fichero de logs
logger.error() ---> entradas con etiqueta ERROR en el fichero de logs


En el fichero /var/log/socket-server/socket-server.log tendremos contenido del siguiente tipo a medida que vayan conectando diferentes clientes:
2017-02-02 21:24:29,701 INFO [Thread-1] -- New connection from 192.168.0.143 -- Active threads: 1
2017-02-02 21:24:35,704 INFO [Thread-1] -- 192.168.0.143 -- Received: hola, que tal?
2017-02-02 21:24:46,697 INFO [Thread-2] -- New connection from 192.168.0.145 -- Active threads: 1
2017-02-02 21:24:51,331 INFO [Thread-2] -- 192.168.0.145 -- Received: buenos dias
2017-02-02 21:31:30,555 INFO [Thread-3] -- New connection from 192.168.0.89 -- Active threads: 1
2017-02-02 21:31:40,561 ERROR [Thread-3] -- 192.168.0.89 -- Timeout on data transmission ocurred after 10 seconds.


Si dejáramos la aplicación corriendo varios días como servicio (más adelante veremos como hacerlo) veríamos como el fichero de logs rota cada noche añadiéndose al nombre del fichero al rotar la fecha para tenerlos de este modo clasificados. Al tener más de 10 ficheros de logs al rotar se borrarían automáticamente los más antiguos.

23 de noviembre de 2016

Socket server TCP multi-thread

En las entradas anteriores creábamos un número fijo de hilos o threads encargados de ejecutar tareas simples.
También vimos cómo crear un número fijo de hilos o threads productores y consumidores de colas y un ejemplo en el que se creaban un número fijo de hilos o threads asignados a diferentes tareas.

El escenario que se plantea en este caso es el de un socket server TCP que va a recibir conexiones TCP y que debe crear "al vuelo" un thread o hilo para atender cada petición. En esta primera parte simplemente veremos como crear ese hilo y trazaremos el número de hilos vivos (al tratarse de un socket server pueden conectar varios clientes de forma simultánea), las IPs de los clientes conectados, y los datos recibidos.
El código sería el siguiente:
#!/usr/bin/env python

import threading
import SocketServer, socket
import sys

TIMEOUT = 10
HOST = '192.168.1.37'
PORT = 3456

class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        try:
            threadName = threading.currentThread().getName()
            activeThreads = threading.activeCount() - 1
            clientIP = self.client_address[0]
            print '[%s] -- New connection from %s -- Active threads: %d' % (threadName, clientIP, activeThreads)
            data = self.request.recv(1024)
            print '[%s] -- %s -- Received: %s' % (threadName, clientIP, data)
            response = 'Thanks %s, message received!!' % clientIP
            self.request.send(response)
        except Exception, error:
            if str(error) == "timed out":
                print '[%s] -- %s -- Timeout on data transmission ocurred after %d seconds.' % (threadName, clientIP, TIMEOUT)

class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)

    def finish_request(self, request, client_address):
        request.settimeout(TIMEOUT)
        SocketServer.TCPServer.finish_request(self, request, client_address)
        SocketServer.TCPServer.close_request(self, request)

try:
    print "Starting server TCP at IP %s and port %d..." % (HOST,PORT)
    server = ThreadedTCPServer((HOST, PORT), RequestHandler)
    server.serve_forever()
except KeyboardInterrupt:
    server.socket.close()

Veamos el detalle paso a paso. En primer lugar tenemos los parámetros de la conexión TCP:
TIMEOUT = 10  --> timeout para que el cliente transmita los datos una vez establecida la conexión.
HOST = '0.0.0.0'  --> IP en la que el socket server va a escuchar. En este caso 192.168.1.37.
PORT = 3456  --> puerto en el que el socket server va a escuchar. Escoged cualquier puerto libre en el equipo.

La lógica se encuentra en el método handle() de la clase RequestHandler : obtenemos el nombre del thread que creamos para atender la conexión, el número de threads en ejecución (en próximas entradas lo usaremos para limitar el número máximo de threads en ejecución) y la IP del cliente. Leemos los datos que nos envía el cliente, los pintamos, le contestamos indicando que hemos recibido el mensaje y cerramos la conexión.
Para probarlo ejecutamos el código y desde la propia máquina o bien desde otro equipo abrimos una conexión. En mi caso el socket server corre en la IP 192.168.1.37 y el cliente conectará desde la IP 192.168.1.254
Arrancamos el socket server ejecutando el código anterior:
Starting server TCP at IP 192.168.1.37 and port 3456...

A continuación desde el cliente hacemos un telnet al puerto 3456 de la 192.168.1.37 y mandamos un "hola":
pi@raspberrypi:~ $telnet 192.168.1.37 3456
Trying 192.168.1.37...
Connected to 192.168.1.37.
Escape character is '^]'.
hola
Thanks 192.168.1.254, message received!!Connection closed by foreign host.
En el lado servidor tendremos:
Starting server TCP at IP 192.168.1.37 and port 3456...
[Thread-1] -- New connection from 192.168.1.254 -- Active threads: 1
[Thread-1] -- 192.168.1.254 -- Received: hola

Si abrimos nuevas conexiones se crearán nuevos threads para atenderlas. Podéis comprobarlo.

Vamos a intentar explicar con detalle el código anterior.
A primera vista vemos que buena parte de la lógica está incluida en el módulo SocketServer que importamos inicialmente:

import SocketServer
El fichero SocketServer.py (es un módulo de python) forma parte de la instalación estándar de python y podéis acceder a su contenido completo. En el caso de un sistemas unix lo encontraremos en el directorio de instalación, habitualmente en /usr/lib/python2.7/SocketServer.py.

Todo se inicia invocando a la clase ThreadedTCPServer:

try:
    print "Starting server TCP at IP %s and port %d..." % (HOST,PORT)
    server = ThreadedTCPServer((HOST, PORT), RequestHandler)
    server.serve_forever()
except KeyboardInterrupt:
    server.socket.close()
En este caso ThreadedTCPServer es una clase que hemos definido, y que mediante herencia múltiple, hereda de las clases ThreadingMixIn y TCPServer del módulo SocketServer, lo que se traduce en:

class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)

    def finish_request(self, request, client_address):
        request.settimeout(TIMEOUT)
        SocketServer.TCPServer.finish_request(self, request, client_address)
        SocketServer.TCPServer.close_request(self, request)

Si accedemos al contenido del fichero SocketServer.py vemos que la clase TCPServer a su vez hereda de la clase BaseServer, que también pertenece al módulo SocketServer. Es decir, a nivel de módulos y clases tenemos los siguientes elementos:
Y atendiendo a la jerarquía de clases tendríamos:
Además estamos sobrescribiendo los métodos server_bind() y finish_request(). De nuevo, revisando el fichero SocketServer.py podemos ubicar estos métodos dentro de sus correspondientes clases:
Las razones para sobrescribir estos métodos son, en el caso de server_bind, permitir poner el puerto a la escucha aunque tengamos conexiones en modo WAIT_TIME, después de haber parado el servidor y en el caso de finish_request poder fijar un TIMEOUT para el socket. Si no quisiéramos modificar nuestro código para incluir no sería necesario sobrescribir los métodos.

El siguiente elemento en el que debemos reparar es el la clase RequestHandler que como vemos en su definición extiende la clase BaseRequestHandler del módulo SocketServer. Además, vemos que estamos sobrescribiendo el método handle() de la clase BaseRequestHandler que va a ser el que contenga la lógica a ejecutar con cada nueva conexión. Con lo cual tenemos un elemento más en nuestro diagrama:
Por tanto vemos que el módulo SocketServer realmente nos aporta prácticamente todos los ingredientes.
Nosotros únicamente vamos a definir nuestras propias clases ThreadedTCPServer y RequestHandler extendiendo clases existentes existentes en SocketServer para poder añadir la lógica deseada.
De este modo, con todos los "ingredientes" la receta quedaría del siguiente modo:
Siguiendo con esta "autopsia" que estamos haciendo a nuestro socket server multi-hilo, vamos a detenernos en la clase ThreadingMixIn del módulo SocketServer.
El código completo de la clase es el siguiente (copiado del fichero SocketServer.py):

class ThreadingMixIn:
    """Mix-in class to handle each request in a new thread."""

    # Decides how threads will act upon termination of the
    # main process
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
            self.shutdown_request(request)
        except:
            self.handle_error(request, client_address)
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start()
Como vemos esta clase va a ser la clase encargada de crear un nuevo hilo o thread para conexión recibida.
La tarea de creación de un nuevo hilo no es demasiado costosa, pero ¿no sería mejor tener un pool de hilos creados, de forma que al recibir una conexión ésta sea atendida por uno de esos hilos? Con esto nos ahorraríamos la tarea de estar creando hilos......en futuras entradas veremos como abordar esta tarea modificando la clase y haciendo uso del módulo Queue. En la entrada socket-server-logger hemos añadido la funcionalidad de establecer un sistema de logs con rotado automático cada noche. También incluye la funcionalidad de establecer un número máximo de hilos activos.

16 de noviembre de 2016

Threads multitarea

Vamos a ver un ejemplo en el que tenemos 3 tipos de tareas a realizar (task1, task2, task3) y queremos arrancar un determinado número de threads o hilos para ejecutar cada una de ellas.
Para ello vamos a definir un diccionario THREADS = { 'task1':2 , 'task2':3, 'task3':2} donde indicamos el número de threads destinado a cada tarea.
El código sería algo del siguiente tipo:
#!/usr/bin/env python

import threading
import random
import time

# how many threads we want to start
THREADS = { 'task1':2 , 'task2':3, 'task3':2}

def task1(threadName):
    while True:
        print "I am %s and I execute task1" % threadName
        time.sleep(random.randint(1, 10))

def task2(threadName):
    while True:
        print "I am %s and I execute task2" % threadName
        time.sleep(random.randint(1, 10))

def task3(threadName):
    while True:
        print "I am %s and I execute task3" % threadName
        time.sleep(random.randint(1, 10))

def generic_workflow(threadName, task_type):
    if task_type == 'task1':
        task1(threadName)
    elif task_type == 'task2':
        task2(threadName)
    elif task_type == 'task3':
        task3(threadName)

class Thread_task(threading.Thread):
    def __init__(self, task_type):
        threading.Thread.__init__(self)
        self.task_type = task_type
    def run(self):
        threadName = threading.currentThread().getName()
        generic_workflow(threadName, task_type)

print 'Checking for threads for every task...'

total_threads = 0
for task in THREADS:
    total_threads += THREADS[task]

for task in THREADS:
    print " ** Starting %d threads for %s **" % (THREADS[task], task)
    for i in range(THREADS[task]):
        task_type = task
        td = Thread_task(task)
        td.start()
La ejecución del código generaría una salida del siguiente tipo:
Checking for threads for every task...
 ** Starting 2 threads for task1 **
I am Thread-1 and I execute task1
I am Thread-2 and I execute task1
 ** Starting 3 threads for task2 **
I am Thread-3 and I execute task2
I am Thread-4 and I execute task2
I am Thread-5 and I execute task2
 ** Starting 2 threads for task3 **
I am Thread-6 and I execute task3
I am Thread-7 and I execute task3
I am Thread-1 and I execute task1
I am Thread-7 and I execute task3
I am Thread-3 and I execute task2
I am Thread-1 and I execute task1
I am Thread-6 and I execute task3
........

Resultará fácil para el lector adaptar el código para hacer algo similar con procesos en lugar de threads.