23 de noviembre de 2016

Socket server TCP multi-thread

En las entradas anteriores creábamos un número fijo de hilos o threads encargados de ejecutar tareas simples.
También vimos cómo crear un número fijo de hilos o threads productores y consumidores de colas y un ejemplo en el que se creaban un número fijo de hilos o threads asignados a diferentes tareas.

El escenario que se plantea en este caso es el de un socket server TCP que va a recibir conexiones TCP y que debe crear "al vuelo" un thread o hilo para atender cada petición. En esta primera parte simplemente veremos como crear ese hilo y trazaremos el número de hilos vivos (al tratarse de un socket server pueden conectar varios clientes de forma simultánea), las IPs de los clientes conectados, y los datos recibidos.
El código sería el siguiente:
#!/usr/bin/env python

import threading
import SocketServer, socket
import sys

TIMEOUT = 10
HOST = '192.168.1.37'
PORT = 3456

class RequestHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        try:
            threadName = threading.currentThread().getName()
            activeThreads = threading.activeCount() - 1
            clientIP = self.client_address[0]
            print '[%s] -- New connection from %s -- Active threads: %d' % (threadName, clientIP, activeThreads)
            data = self.request.recv(1024)
            print '[%s] -- %s -- Received: %s' % (threadName, clientIP, data)
            response = 'Thanks %s, message received!!' % clientIP
            self.request.send(response)
        except Exception, error:
            if str(error) == "timed out":
                print '[%s] -- %s -- Timeout on data transmission ocurred after %d seconds.' % (threadName, clientIP, TIMEOUT)

class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)

    def finish_request(self, request, client_address):
        request.settimeout(TIMEOUT)
        SocketServer.TCPServer.finish_request(self, request, client_address)
        SocketServer.TCPServer.close_request(self, request)

try:
    print "Starting server TCP at IP %s and port %d..." % (HOST,PORT)
    server = ThreadedTCPServer((HOST, PORT), RequestHandler)
    server.serve_forever()
except KeyboardInterrupt:
    server.socket.close()

Veamos el detalle paso a paso. En primer lugar tenemos los parámetros de la conexión TCP:
TIMEOUT = 10  --> timeout para que el cliente transmita los datos una vez establecida la conexión.
HOST = '0.0.0.0'  --> IP en la que el socket server va a escuchar. En este caso 192.168.1.37.
PORT = 3456  --> puerto en el que el socket server va a escuchar. Escoged cualquier puerto libre en el equipo.

La lógica se encuentra en el método handle() de la clase RequestHandler : obtenemos el nombre del thread que creamos para atender la conexión, el número de threads en ejecución (en próximas entradas lo usaremos para limitar el número máximo de threads en ejecución) y la IP del cliente. Leemos los datos que nos envía el cliente, los pintamos, le contestamos indicando que hemos recibido el mensaje y cerramos la conexión.
Para probarlo ejecutamos el código y desde la propia máquina o bien desde otro equipo abrimos una conexión. En mi caso el socket server corre en la IP 192.168.1.37 y el cliente conectará desde la IP 192.168.1.254
Arrancamos el socket server ejecutando el código anterior:
Starting server TCP at IP 192.168.1.37 and port 3456...

A continuación desde el cliente hacemos un telnet al puerto 3456 de la 192.168.1.37 y mandamos un "hola":
pi@raspberrypi:~ $telnet 192.168.1.37 3456
Trying 192.168.1.37...
Connected to 192.168.1.37.
Escape character is '^]'.
hola
Thanks 192.168.1.254, message received!!Connection closed by foreign host.
En el lado servidor tendremos:
Starting server TCP at IP 192.168.1.37 and port 3456...
[Thread-1] -- New connection from 192.168.1.254 -- Active threads: 1
[Thread-1] -- 192.168.1.254 -- Received: hola

Si abrimos nuevas conexiones se crearán nuevos threads para atenderlas. Podéis comprobarlo.

Vamos a intentar explicar con detalle el código anterior.
A primera vista vemos que buena parte de la lógica está incluida en el módulo SocketServer que importamos inicialmente:

import SocketServer
El fichero SocketServer.py (es un módulo de python) forma parte de la instalación estándar de python y podéis acceder a su contenido completo. En el caso de un sistemas unix lo encontraremos en el directorio de instalación, habitualmente en /usr/lib/python2.7/SocketServer.py.

Todo se inicia invocando a la clase ThreadedTCPServer:

try:
    print "Starting server TCP at IP %s and port %d..." % (HOST,PORT)
    server = ThreadedTCPServer((HOST, PORT), RequestHandler)
    server.serve_forever()
except KeyboardInterrupt:
    server.socket.close()
En este caso ThreadedTCPServer es una clase que hemos definido, y que mediante herencia múltiple, hereda de las clases ThreadingMixIn y TCPServer del módulo SocketServer, lo que se traduce en:

class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    def server_bind(self):
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)

    def finish_request(self, request, client_address):
        request.settimeout(TIMEOUT)
        SocketServer.TCPServer.finish_request(self, request, client_address)
        SocketServer.TCPServer.close_request(self, request)

Si accedemos al contenido del fichero SocketServer.py vemos que la clase TCPServer a su vez hereda de la clase BaseServer, que también pertenece al módulo SocketServer. Es decir, a nivel de módulos y clases tenemos los siguientes elementos:
Y atendiendo a la jerarquía de clases tendríamos:
Además estamos sobrescribiendo los métodos server_bind() y finish_request(). De nuevo, revisando el fichero SocketServer.py podemos ubicar estos métodos dentro de sus correspondientes clases:
Las razones para sobrescribir estos métodos son, en el caso de server_bind, permitir poner el puerto a la escucha aunque tengamos conexiones en modo WAIT_TIME, después de haber parado el servidor y en el caso de finish_request poder fijar un TIMEOUT para el socket. Si no quisiéramos modificar nuestro código para incluir no sería necesario sobrescribir los métodos.

El siguiente elemento en el que debemos reparar es el la clase RequestHandler que como vemos en su definición extiende la clase BaseRequestHandler del módulo SocketServer. Además, vemos que estamos sobrescribiendo el método handle() de la clase BaseRequestHandler que va a ser el que contenga la lógica a ejecutar con cada nueva conexión. Con lo cual tenemos un elemento más en nuestro diagrama:
Por tanto vemos que el módulo SocketServer realmente nos aporta prácticamente todos los ingredientes.
Nosotros únicamente vamos a definir nuestras propias clases ThreadedTCPServer y RequestHandler extendiendo clases existentes existentes en SocketServer para poder añadir la lógica deseada.
De este modo, con todos los "ingredientes" la receta quedaría del siguiente modo:
Siguiendo con esta "autopsia" que estamos haciendo a nuestro socket server multi-hilo, vamos a detenernos en la clase ThreadingMixIn del módulo SocketServer.
El código completo de la clase es el siguiente (copiado del fichero SocketServer.py):

class ThreadingMixIn:
    """Mix-in class to handle each request in a new thread."""

    # Decides how threads will act upon termination of the
    # main process
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
            self.shutdown_request(request)
        except:
            self.handle_error(request, client_address)
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start()
Como vemos esta clase va a ser la clase encargada de crear un nuevo hilo o thread para conexión recibida.
La tarea de creación de un nuevo hilo no es demasiado costosa, pero ¿no sería mejor tener un pool de hilos creados, de forma que al recibir una conexión ésta sea atendida por uno de esos hilos? Con esto nos ahorraríamos la tarea de estar creando hilos......en futuras entradas veremos como abordar esta tarea modificando la clase y haciendo uso del módulo Queue. En la entrada socket-server-logger hemos añadido la funcionalidad de establecer un sistema de logs con rotado automático cada noche. También incluye la funcionalidad de establecer un número máximo de hilos activos.

16 de noviembre de 2016

Threads multitarea

Vamos a ver un ejemplo en el que tenemos 3 tipos de tareas a realizar (task1, task2, task3) y queremos arrancar un determinado número de threads o hilos para ejecutar cada una de ellas.
Para ello vamos a definir un diccionario THREADS = { 'task1':2 , 'task2':3, 'task3':2} donde indicamos el número de threads destinado a cada tarea.
El código sería algo del siguiente tipo:
#!/usr/bin/env python

import threading
import random
import time

# how many threads we want to start
THREADS = { 'task1':2 , 'task2':3, 'task3':2}

def task1(threadName):
    while True:
        print "I am %s and I execute task1" % threadName
        time.sleep(random.randint(1, 10))

def task2(threadName):
    while True:
        print "I am %s and I execute task2" % threadName
        time.sleep(random.randint(1, 10))

def task3(threadName):
    while True:
        print "I am %s and I execute task3" % threadName
        time.sleep(random.randint(1, 10))

def generic_workflow(threadName, task_type):
    if task_type == 'task1':
        task1(threadName)
    elif task_type == 'task2':
        task2(threadName)
    elif task_type == 'task3':
        task3(threadName)

class Thread_task(threading.Thread):
    def __init__(self, task_type):
        threading.Thread.__init__(self)
        self.task_type = task_type
    def run(self):
        threadName = threading.currentThread().getName()
        generic_workflow(threadName, task_type)

print 'Checking for threads for every task...'

total_threads = 0
for task in THREADS:
    total_threads += THREADS[task]

for task in THREADS:
    print " ** Starting %d threads for %s **" % (THREADS[task], task)
    for i in range(THREADS[task]):
        task_type = task
        td = Thread_task(task)
        td.start()
La ejecución del código generaría una salida del siguiente tipo:
Checking for threads for every task...
 ** Starting 2 threads for task1 **
I am Thread-1 and I execute task1
I am Thread-2 and I execute task1
 ** Starting 3 threads for task2 **
I am Thread-3 and I execute task2
I am Thread-4 and I execute task2
I am Thread-5 and I execute task2
 ** Starting 2 threads for task3 **
I am Thread-6 and I execute task3
I am Thread-7 and I execute task3
I am Thread-1 and I execute task1
I am Thread-7 and I execute task3
I am Thread-3 and I execute task2
I am Thread-1 and I execute task1
I am Thread-6 and I execute task3
........

Resultará fácil para el lector adaptar el código para hacer algo similar con procesos en lugar de threads.