8 de diciembre de 2019

Colas

La clase Queue del módulo multiprocessing nos proporciona una implementación de colas FIFO ("First In, First Out") que podemos usar para intercambiar información entre threads o procesos de forma segura.

El siguiente ejemplo muestra una implementación de cola FIFO en la que almacenamos valores enteros comprendidos entre 1 y 100 y posteriormente los leemos de forma secuencial. Funciona en cualquier versión de Python igual o superior a 3.5

from multiprocessing import Queue
import random

queue_time = Queue()

print("Saving elements at queue...")
for i in range (5):
    random_time = random.randint(1, 100)
    queue_time.put(random_time)
    print("%d added at queue" % random_time)

print("Reading elements from queue...")
while not queue_time.empty():
    time_read = queue_time.get()
    print("%d read from queue" % time_read)

Al ejecutar el código comprobamos que los elementos de la cola se consumen en el mismo orden en el que son salvados:
Saving elements at queue...
48 added at queue
47 added at queue
64 added at queue
10 added at queue
100 added at queue
Reading elements from queue...
48 read from queue
47 read from queue
64 read from queue
10 read from queue
100 read from queue
Queue is empty!!


Combinando lo visto en entradas anteriores (1 y 2) vamos a construir un grupo de threads que van a meter mensajes en la cola (productores) y un grupo de threads que van a sacar mensajes de la cola (consumidores). Los threads impares serán los consumidores y los pares los productores.
Los productores meterán en la cola un mensaje aleatorio formado por un string de longitud fija y dormirán un tiempo aleatorio entre 1 y 30 segundos. Los consumidores comprobarán si la cola está vacía y en caso contraría leerán un mensaje. El código sería algo del siguiente tipo:

from multiprocessing import Queue
import threading
import random
import time
import string

# how many threads we want to start
THREAD_COUNT = 6

def genrandom(length):
    chain = ''
    for i in range(length+1):
        chain = chain + random.choice(string.ascii_uppercase)
    return chain

def workflow(threadName, client_type):
    if client_type == 'consumer':
        while True:
            time_sleep = random.randint(1, 30)
            if not queue_chains.empty():
                chain = queue_chains.get()
                print("<--[%s] msg %s read queue. Sleeping %d seconds..." % (threadName, chain, time_sleep))
            else:
                print("<--[%s] Queue is empty. Sleeping %d seconds..." % (threadName,time_sleep))
            time.sleep(time_sleep)
    elif client_type == 'producer':
        while True:
            chain = genrandom(20)
            queue_chains.put(chain)
            time_sleep = random.randint(1, 30)
            print("-->[%s] msg %s put at queue. Sleeping % d seconds..." % (threadName, chain, time_sleep))
            time.sleep(time_sleep)

class Queue_client(threading.Thread):
    def __init__(self, client_type):
        threading.Thread.__init__(self)
        self.client_type = client_type
    def run(self):
        threadName = threading.currentThread().getName()
        workflow(threadName, client_type)

queue_chains = Queue()

print('Starting %d threads...' % THREAD_COUNT)
for i in range(THREAD_COUNT):
    if i % 2 == 0:
        client_type = 'consumer'
        queue_client = Queue_client(client_type)
    else:
        client_type = 'producer'
        queue_client = Queue_client(client_type)
    queue_client.start()

Si ejecutamos el código éste se ejecutará indefinidamente: los threads productores meterán mensajes en la cola y los consumidores los sacarán.
Starting 6 threads...
<--[Thread-1] Queue is empty. Sleeping 17 seconds...
-->[Thread-2] msg IYXJWRCYKYITLEQRXWSBD put at queue. Sleeping  23 seconds...
<--[Thread-3] msg IYXJWRCYKYITLEQRXWSBD got from queue. Sleeping 17 seconds...
-->[Thread-4] msg AWWRXODUXVZGTHFQGLUOM put at queue. Sleeping  17 seconds...
<--[Thread-5] msg AWWRXODUXVZGTHFQGLUOM got from queue. Sleeping 19 seconds...
-->[Thread-6] msg LUCXYDAXOUWSXDCQCUJNL put at queue. Sleeping  9 seconds...
-->[Thread-6] msg LHDCSFMMFKWBUWZGMVGRD put at queue. Sleeping  30 seconds...
<--[Thread-1] msg LUCXYDAXOUWSXDCQCUJNL got from queue. Sleeping 17 seconds...
-->[Thread-4] msg LZPYFVRCKUHWLPZDGPJUJ put at queue. Sleeping  23 seconds...
<--[Thread-3] msg LHDCSFMMFKWBUWZGMVGRD got from queue. Sleeping 5 seconds...
<--[Thread-5] msg LZPYFVRCKUHWLPZDGPJUJ got from queue. Sleeping 11 seconds...
<--[Thread-3] Queue is empty. Sleeping 8 seconds...
-->[Thread-2] msg WGGBXXZDWSTQYPMRPCIYI put at queue. Sleeping  30 seconds...
<--[Thread-5] msg WGGBXXZDWSTQYPMRPCIYI got from queue. Sleeping 30 seconds...
<--[Thread-3] Queue is empty. Sleeping 8 seconds...
<--[Thread-1] Queue is empty. Sleeping 12 seconds...
<--[Thread-3] Queue is empty. Sleeping 22 seconds...
-->[Thread-6] msg PALJHDAYQRAAUHZEYZOMD put at queue. Sleeping  19 seconds...
-->[Thread-4] msg NHRLYNQMXLDWKQFRVKLIJ put at queue. Sleeping  16 seconds...
<--[Thread-1] msg PALJHDAYQRAAUHZEYZOMD got from queue. Sleeping 27 seconds...
-->[Thread-2] msg VTHSBMTDYPPRPIOPZGBOJ put at queue. Sleeping  6 seconds...
-->[Thread-4] msg WHXTSXVPNMWXNHZAROCNT put at queue. Sleeping  24 seconds...
-->[Thread-6] msg RDDGHFPHCROQMFBMCNAHI put at queue. Sleeping  3 seconds...
-->[Thread-2] msg JGGLRBEZCGWRIYTYUOOPL put at queue. Sleeping  26 seconds...
<--[Thread-5] msg NHRLYNQMXLDWKQFRVKLIJ got from queue. Sleeping 14 seconds...
<--[Thread-3] msg VTHSBMTDYPPRPIOPZGBOJ got from queue. Sleeping 24 seconds...
-->[Thread-6] msg SRMYDOHNMAKALGGOFPWIQ put at queue. Sleeping  30 seconds...
<--[Thread-1] msg WHXTSXVPNMWXNHZAROCNT got from queue. Sleeping 9 seconds...
................

Podemos decir que hemos construido (de forma burda) un pool de threads consumidores y un pool de threads productores.

Si quisiéramos cambiar hilos por procesos, en este caso empleando ProcessPoolExecutor el código quedaría del siguiente modo:

from multiprocessing import Queue
from concurrent.futures import ProcessPoolExecutor
import os
import random
import time
import string

# how many threads we want to start
PROCESS_COUNT = 6

def genrandom(length):
    chain = ''
    for i in range(length+1):
        chain = chain + random.choice(string.ascii_uppercase)
    return chain

def workflow(client_type):
    PID = os.getpid()
    if client_type == 'consumer':
        while True:
            time_sleep = random.randint(1, 30)
            if not queue_chains.empty():
                chain = queue_chains.get()
                print("<--[%d] msg %s read queue. Sleeping %d seconds..." % (PID, chain, time_sleep))
            else:
                print("<--[%d] Queue is empty. Sleeping %d seconds..." % (PID,time_sleep))
            time.sleep(time_sleep)
    elif client_type == 'producer':
        while True:
            chain = genrandom(20)
            queue_chains.put(chain)
            time_sleep = random.randint(1, 30)
            print("-->[%d] msg %s put at queue. Sleeping % d seconds..." % (PID, chain, time_sleep))
            time.sleep(time_sleep)

queue_chains = Queue()

print('Starting %d processes...' % PROCESS_COUNT)
executor = ProcessPoolExecutor(max_workers=PROCESS_COUNT)
for i in range(PROCESS_COUNT):
    if i % 2 == 0:
        client_type = 'consumer'
        executor.submit(workflow, client_type)
    else:
        client_type = 'producer'
        executor.submit(workflow, client_type)


El problema de este tipo de colas es que no tienen persistencia, existen solamente mientras el propio proceso de python está corriendo. Para llevar a cabo proyectos de envergadura que requieran mayores capacidades emplearemos otro tipo de soluciones como RabbitMQ, Kafka o Mosquito con los que podremos interactuar desde Python.

7 de diciembre de 2019

Hilos y procesos (II)

En la anterior entrada vimos diferentes implementaciones con threads y procesos en los que se ejecutaba una lógica simple.
Basándonos en el anterior código vamos a ver como podemos ampliar la lógica pasando parámetros para la ejecución.

En este caso los threads y procesos recibirán como parámetro un tiempo aleatorio de espera (entero comprendido entre 1 y 10), de forma que se iniciarán indicando el tiempo de espera asignado, se "dormirán" ese número de segundos y finalmente "despertarán" del letargo y finalizarán la ejecución.

import threading
import random
import time
from datetime import datetime

# how many threads we want to start
THREADS_COUNT = 3

class Threaded_worker(threading.Thread):
    def __init__(self, sleep_time):
        threading.Thread.__init__(self)
        self.sleep_time = sleep_time
    def run(self):
        threadName = threading.currentThread().getName()
        print("[%s] Hello, I am %s and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), threadName, sleep_time))
        time.sleep(self.sleep_time)
        print("[%s] I am %s and I have waken up." % (datetime.now().strftime('%H:%M:%S'), threadName))

print('Starting %d threads...' % THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1,10)
    td = Threaded_worker(sleep_time)
    td.start()

Al ejecutar el código anterior obtendremos:
Starting 3 threads...
[18:16:12] Hello, I am Thread-1 and I will sleep 4 seconds.
[18:16:12] Hello, I am Thread-2 and I will sleep 10 seconds.
[18:16:12] Hello, I am Thread-3 and I will sleep 9 seconds.
[18:16:16] I am Thread-1 and I have waken up.
[18:16:21] I am Thread-3 and I have waken up.
[18:16:22] I am Thread-2 and I have waken up.

En este caso el parámetro que pasamos a cada hilo es sleep_time y la lógica se define en el método run().
Para tener un código mejor estructurado podemos sacar la lógica del método run() a un método externo fuera de la clase.
En el siguiente código la lógica del método run() se traslada al método externo workflow(), de modo que desde el método run() invocamos el método workflow()

import threading
import random
import time
from datetime import datetime

# how many threads we want to start
THREADS_COUNT = 3

def workflow(threadName, seconds):
    print("[%s] Hello, I am %s and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), threadName, seconds))
    time.sleep(seconds)
    print("[%s] I am %s and I have waken up." % (datetime.now().strftime('%H:%M:%S'), threadName))

class Threaded_worker(threading.Thread):
    def __init__(self, sleep_time):
        threading.Thread.__init__(self)
        self.sleep_time = sleep_time
    def run(self):
        threadName = threading.currentThread().getName()
        workflow(threadName, self.sleep_time)

print('Starting %d threads...' % THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1,10)
    td = Threaded_worker(sleep_time)
    td.start()

La ejecución del código tendrá una salida similar al anterior:

Starting 3 threads...
[18:57:40] Hello, I am Thread-1 and I will sleep 8 seconds.
[18:57:40] Hello, I am Thread-2 and I will sleep 9 seconds.
[18:57:40] Hello, I am Thread-3 and I will sleep 3 seconds.
[18:57:43] I am Thread-3 and I have waken up.
[18:57:48] I am Thread-1 and I have waken up.
[18:57:49] I am Thread-2 and I have waken up.

Análogamente, empleando procesos tenemos:

import multiprocessing
import os
import random
import time
from datetime import datetime

# how many processes we want to start
WORKER_NUMBER = 3

def worker(sleep_time):
    PID = os.getpid()
    print("[%s] Hello, I am the PID %d I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), PID, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am the PID %d and I have waken up." % (datetime.now().strftime('%H:%M:%S'), PID))

print('Starting %d processes...' % WORKER_NUMBER)

jobs = []
for i in range(WORKER_NUMBER):
    sleep_time = random.randint(1, 10)
    p = multiprocessing.Process(target=worker, args=(sleep_time,))
    jobs.append(p)
    p.start()

La ejecución del código generará una salida del siguiente tipo:
Starting 3 processes...
[14:53:48] Hello, I am the PID 2099 I will sleep 4 seconds.
[14:53:48] Hello, I am the PID 2098 I will sleep 5 seconds.
[14:53:48] Hello, I am the PID 2100 I will sleep 3 seconds.
[14:53:51] I am the PID 2100 and I have waken up.
[14:53:52] I am the PID 2099 and I have waken up.
[14:53:53] I am the PID 2098 and I have waken up

Y de nuevo, si queremos extraer la lógica a una función workflow() externa:

import multiprocessing
import os
import random
import time
from datetime import datetime

# how many processes we want to start
WORKER_NUMBER = 3

def workflow(PID, sleep_time):
    print("[%s] Hello, I am the PID %d I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), PID, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am the PID %d and I have waken up." % (datetime.now().strftime('%H:%M:%S'), PID))

def worker(sleep_time):
    PID = os.getpid()
    workflow(PID, sleep_time)

print('Starting %d processes...' % WORKER_NUMBER)

jobs = []
for i in range(WORKER_NUMBER):
    sleep_time = random.randint(1, 10)
    p = multiprocessing.Process(target=worker, args=(sleep_time,))
    jobs.append(p)
    p.start()


Los códigos anteriores son válidos para todas las versiones de python 3.

Si disponemos de una versión de Python mayor o igual que 3.5 podemos hacer uso de las subclases ThreadPoolExecutor y ProcessPoolExecutor adaptando fácilmente los códigos anteriores.

Para el caso de ThreadPoolExecutor tendríamos el siguiente código:

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import random
import threading
import time

THREADS_COUNT = 3

def run(sleep_time):
    threadName = threading.currentThread().getName()
    print("[%s] Hello, I am %s and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), threadName, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am %s and I have waken up." % (datetime.now().strftime('%H:%M:%S'), threadName))


executor = ThreadPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1, 10)
    executor.submit(run, sleep_time)
En este caso la salida sería similar:

[20:48:20] Hello, I am ThreadPoolExecutor-0_0 and I will sleep 7 seconds.
[20:48:20] Hello, I am ThreadPoolExecutor-0_1 and I will sleep 10 seconds.
[20:48:20] Hello, I am ThreadPoolExecutor-0_2 and I will sleep 9 seconds.
[20:48:27] I am ThreadPoolExecutor-0_0 and I have waken up.
[20:48:29] I am ThreadPoolExecutor-0_2 and I have waken up.
[20:48:30] I am ThreadPoolExecutor-0_1 and I have waken up.


La adaptación a ProcessPoolExecutor es trivial a partir de la anterior:

from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
import random
import os
import time


THREADS_COUNT = 3


def run(sleep_time):
    PID = os.getpid()
    print("[%s] Hello, I am %d and I will sleep %d seconds." % (datetime.now().strftime('%H:%M:%S'), PID, sleep_time))
    time.sleep(sleep_time)
    print("[%s] I am %d and I have waken up." % (datetime.now().strftime('%H:%M:%S'), PID))


executor = ProcessPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    sleep_time = random.randint(1, 10)
    executor.submit(run, sleep_time)

La salida de la ejecución será similar a las anteriores:

[20:53:18] Hello, I am 7433 and I will sleep 5 seconds.
[20:53:18] Hello, I am 7434 and I will sleep 2 seconds.
[20:53:18] Hello, I am 7435 and I will sleep 5 seconds.
[20:53:20] I am 7434 and I have waken up.
[20:53:23] I am 7433 and I have waken up.
[20:53:23] I am 7435 and I have waken up.
En próximas entradas hablaremos de como trabajar con "colas" en las que podemos almacenar tareas, de forma que un grupo de hilos o procesos sean los encargados de "sacar" esas tareas de la cola y ejecutar la correspondiente lógica para cada una de ellas.

Hilos y procesos (I)

El siguiente ejemplo muestra una implementación de hilos o threads válido para cualquier versión de python 3 extendiendo la clase Thread del módulo threading

import threading
  
# how many threads we want to start  
THREADS_COUNT = 3  

class Threaded_worker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
        threadName = threading.currentThread().getName()
        print("Hello, I am the thread %s" % threadName)

print('Starting %d threads...' % THREADS_COUNT)
for i in range(THREADS_COUNT):
    td = Threaded_worker()
    td.start()

En este caso cada hilo se inicia y ejecuta el código del método run(), es decir, imprime el mensaje de saludo mostrando su nombre y finaliza.

Al ejecutar el código anterior obtendremos:
Starting 3 threads...
Hello, I am the thread Thread-1
Hello, I am the thread Thread-2
Hello, I am the thread Thread-3

Si en lugar de threads o hilos queremos usar procesos para saltarnos el GIL, podemos emplear el siguiente código, también válido para cualquier versión de python 3:

import multiprocessing
import os

# how many processes we want to start
WORKER_NUMBER = 3

def worker():
    PID = os.getpid()
    print ("Hello, I am the process with PID %d" % PID)

print ('Starting %d processes...' % WORKER_NUMBER)

jobs = []
for i in range(WORKER_NUMBER):
    p = multiprocessing.Process(target=worker, args=())
    jobs.append(p)
    p.start()

De modo análogo cada proceso se inicia y ejecuta el código del método worker(), y en este caso de nuevo nos saluda informando de su PID y finaliza.

Así, al ejecutar el código anterior obtendremos:
Starting 3 processes...
Hello, I am the process with PID 3486
Hello, I am the process with PID 3487
Hello, I am the process with PID 3488

En python, no siempre el empleo de hilos nos va a proporcionar mejores resultados. Vamos a verlo con un ejemplo.

Escribamos un código simple que ejecute una cuenta atrás de 500 millones:

def countdown(n):
    while n > 0:
        n -= 1

COUNT = 500000000
countdown(COUNT)
Vamos a ejecutar el código en un equipo con un procesador Intel(R) Core(TM) i5-3337U CPU @ 1.80GHz con 4 cpus, y 2 cores por cada cpu. La versión de python será la 3.7.

En este caso la ejecución tarda unos 29 segundos:

real    0m28,782s
user    0m28,776s
sys     0m0,004s
Ahora escribimos un código similar en el que vamos a realizar la misma cuenta atrás repartiendo la tarea entre 2 threads, de forma que cada thread ejecuta una cuenta atrás de 250 millones:

import threading

# how many threads we want to start
THREADS_COUNT = 2

class Threaded_worker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        n = 250000000
        while n > 0:
            n -= 1

for i in range(THREADS_COUNT):
    td = Threaded_worker()
    td.start()
Y de nuevo medimos el tiempo de ejecución. En este caso tarda unos 30 segundos

real    0m30,147s
user    0m30,132s
sys     0m0,080s
A continuación escribimos un código similar empleando procesos:

import multiprocessing

WORKER_NUMBER = 2

def worker():
    n = 250000000
    while n > 0:
        n -= 1

jobs = []
for i in range(WORKER_NUMBER):
    p = multiprocessing.Process(target=worker, args=())
    jobs.append(p)
    p.start()
En este caso el tiempo de ejecución es de 15.7 segundos, es decir, hemos reducido el tiempo a la mitad respecto al primer caso.

real    0m15,767s
user    0m31,444s
sys     0m0,012s
El "responsable" de este comportamiento es el GIL que no permite la ejecución simultánea de threads.

En futuras entradas veremos que el GIL no es realmente "tan malo", y que en determinados casos es útil emplear threads.

Para terminar vamos a hacer una pequeña revisión del módulo concurrent.futures disponible a partir de la versión 3.5 de python.
Este módulo nos va a permitir ejecutar tareas asíncronas empleando threads o procesos mediante las subclases ThreadPoolExecutor y ProcessPoolExecutor. En los link podéis encontrar la documentación completa.

Para ver un ejemplo vamos a codificar los ejemplos anteriores empleando estas subclases, haciendo de este modo un pequeña introducción al concepto de programación asíncrona.

En el caso de threads tenemos el siguiente código:

from concurrent.futures import ThreadPoolExecutor

THREADS_COUNT = 2


def run():
    n = 250000000
    while n > 0:
        n -= 1


executor = ThreadPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    executor.submit(run)

En este caso el tiempo de ejecución es similar al ejemplo previo:

real    0m29,023s
user    0m28,936s
sys     0m0,012s


Para procesos el código equivalente sería:

from concurrent.futures import ProcessPoolExecutor

THREADS_COUNT = 2


def run():
    n = 250000000
    while n > 0:
        n -= 1


executor = ProcessPoolExecutor(max_workers=THREADS_COUNT)
for i in range(THREADS_COUNT):
    executor.submit(run)

Con el siguiente tiempo de ejecución:

real    0m16,774s
user    0m33,100s
sys     0m0,016s
Como vemos los tiempos son similares, aunque la sintaxis es quizá más sencilla. En futuras entradas profundizaremos en el concepto de programación asíncrona y hablaremos en detalle del Event Loop como core de la programación asíncrona en python.

15 de diciembre de 2017

logs con rotado en python como modulo

En esta ocasión vamos a incorporar un sistema de logs a nuestra aplicación python. El sistema nos va a permitir definir diferentes niveles de logs (info, debug, error, etc...) y establecer una política de rotado para evitar que el fichero de logs crezca indefinidamente.

Para ello vamos a hacer uso de la librería logging: https://docs.python.org/2/library/logging.html

Los parámetros que vamos a usar en nuestro código son:
- LOG_NAME: nombre (string) que identifique a la instancia del logger.
- LOG_FOLDER: directorio que va a contener los ficheros de logs.
- LOG_FILE: nombre del fichero de logs.
- ROTATE_TIME: momento del día en el que queremos llevar a cabo el rotado de logs. Podemos optar por los siguientes valores:
- LOG_COUNT: número total de ficheros que queremos mantener. Una vez superado este valor, se borrarán los ficheros más antiguos.
- LOG_LEVEL: level de logs. Los valores posibles son los siguientes:

- LOG_FORMAT: formato de los logs. Podemos definir una plantilla incluyendo el orden de los elementos, gravedad y el timestamp.

Un ejemplo, en el caso de un sistema unix o linux, podría ser:

LOGGER_NAME= 'my-logger'
LOG_FOLDER = '/var/log/my-app/'
LOG_FILE = 'my-app.log'
ROTATE_TIME = 'midnight'
LOG_LEVEL = 'DEBUG'
LOG_COUNT = 10
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'

Lógicamente tenemos que asegurarnos que el el directorio LOG_FOLDER exista y que el usuario con el que va a correr nuestro código python tenga permisos de escritura en ese directorio.

También podemos añadir en el propio código del logger la opción de chequear si el directorio existe, y en caso contrario intentar crearlo, aunque de nuevo el usuario debe tener permisos para poder crear el directorio. El código sería algo como lo siguiente:

import os

log_folder = os.path.dirname(LOG_FOLDER)

if not os.path.exists(log_folder):
    try:
        os.makedirs(log_folder)
    except Exception as error:
        print 'Error creating the log folder: %s' % (str(error))

En nuestro caso vamos a suponer que el directorio LOG_FOLDER existe y que el usuario con el que vamos a ejecutar el código tiene permisos de escritura sobre el directorio.

De este modo el código quedaría del siguiente modo:

import logging.handlers
import sys

LOGGER_NAME= 'my-logger'
LOG_FOLDER = '/var/log/my-app/'
LOG_FILE = 'my-app.log'
LOG = LOG_FOLDER + LOG_FILE
ROTATE_TIME = 'midnight'
LOG_LEVEL = logging.DEBUG
LOG_COUNT = 5
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'

try:
    logger = logging.getLogger(LOGGER_NAME)
    loggerHandler = logging.handlers.TimedRotatingFileHandler(filename=LOG , when=ROTATE_TIME, interval=1, backupCount=LOG_COUNT)
    formatter = logging.Formatter(LOG_FORMAT)
    loggerHandler.setFormatter(formatter)
    logger.addHandler(loggerHandler)
    logger.setLevel(LOG_LEVEL)
except Exception as error:
    print "Error with logs: %s" % (str(error))
    sys.exit()
Para escribir logs, basta con invocar el logger, indicando el levelname y el message a trazar, según hemos definido en LOG_FORMAT. Algunos ejemplos:

logger.info("writing log with info level")
logger.error("writing log with error info. string %s | integer : %d", 'parameter string', 78 )
logger.debug("writting debug record")
Lo cual se traduciría en las siguientes trazas en el fichero /var/log/my-app/my-app.log:

2017-12-15 22:24:32,437 INFO writing log with info level
2017-12-15 22:24:32,438 ERROR writing log with error info. string parameter string | integer : 78
2017-12-15 22:24:32,438 DEBUG writting debug record
Como podéis observas las trazas se ajustan al formato que hemos definido y además podemos pasar parámetros a la traza.

Si nuestra aplicación python se compone de varios ficheros o si queremos trazar logs desde varios ficheros, resulta tedioso y poco práctico tener que incluir este código del logger en cada fichero. Para evitarlo vamos a ver como definir nuestro logger como un módulo.

Supongamos que nuestra aplicación está formada por n ficheros en el directorio my-app-python:

  my-app-python folder
      |_ _ _ _ _ _ my-app-file-1.py
      |_ _ _ _ _ _ my-app-file-2.py
      ...............
      |_ _ _ _ _ _ my-app-file-n.py

Queremos que todos los ficheros my-app-file-1.py, my-app-file-2.py ... my-app-file-n.py usen la lógica del logger que hemos definido antes.

Para ello vamos a construir un fichero mylogger.py con el siguiente contenido:

import logging.handlers
import sys

LOGGER_NAME= 'my-logger'
LOG_FOLDER = '/var/log/my-app/'
LOG = LOG_FOLDER + LOG_FILE
LOG_FILE = 'my-app.log'
ROTATE_TIME = 'midnight'
LOG_LEVEL = logging.DEBUG
LOG_COUNT = 5
LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s'

try:
    logger = logging.getLogger(LOGGER_NAME)
    loggerHandler = logging.handlers.TimedRotatingFileHandler(logfile=LOG , when=ROTATE_TIME, interval=1, backupCount=LOG_COUNT)
    formatter = logging.Formatter(LOG_FORMAT)
    loggerHandler.setFormatter(formatter)
    logger.addHandler(loggerHandler)
    logger.setLevel(LOG_LEVEL)
except Exception as error:
    print "Error with logs: %s" % (str(error))
    sys.exit()

def getLogger():
    return logger
A continuación creamos una carpeta, que llamaremos por ejemplo utils dentro de la cual meteremos el fichero my-logger.py junto con un fichero __init__.py vacío:

  utils folder
      |_ _ _ _ _ _  __init__.py  (empty file)
      |_ _ _ _ _ _  mylogger.py

Y por último guardamos la carpeta utils dentro de la carpeta my-app-python:

  my-app-python folder
      |_ _ _ _ _ _ my-app-file-1.py
      |_ _ _ _ _ _ my-app-file-2.py
      ...............
      |_ _ _ _ _ _ my-app-file-n.py
      |_ _ _ _ _ _ utils folder
                       |_ _ _ _ _ _  __init__.py  (empty file)
                       |_ _ _ _ _ _  mylogger.py

Para poder trazar desde cualquiera de los ficheros my-app-file-1.py, my-app-file-2.py ... my-app-file-n.py basta con importar el módulo, e invocar una instancia del logger a través del método getLogger():

from utils import mylogger

logger = mylogger.getLogger()

logger.info("info record")
logger.error("error record")

try:
   blalbla
except Exception as error:
   logger.error("Error at code: %s" , str(error)
De este modo tendremos un log unificado para todos los ficheros de my-app-python

Podéis comprobar como los ficheros efectivamente rotan y además se eliminan los fichero más antiguos según el número de ficheros que configuréis en LOG_COUNT

14 de diciembre de 2017

python MySQL operaciones de lectura

En esta entrada veremos una primera forma de conectar a una base de datos MySQL para llevar a cabo las operaciones CRUD básicas (Create, Read, Update and Delete).
Existen numerosa librerías para llevar a cabo este cometido. En este caso vamos a trabajar con el MySQLdb: info completa.

El primer paso sería la instalación del módulo, puesto que en las instalaciones estándar de python no está incluido. En debian y sistemas basados en debian podemos instalar el módulo haciendo uso de la herramienta apt-get y en general en cualquier sistema unix haciendo uso de la utilidad pip:

root@debian# apt-get install python-mysqldb
or
root@debian# pip install MySQL-python
Si ninguna de las opciones anteriores os convence podéis descargar el fichero .tar.gz desde la web oficial http://sourceforge.net/projects/mysql-python y hacer la instalación a mano:

root@debian# tar -xzvf MySQL-python-1.2.4b4.tar.gz
root@debian# cd MySQL-python-1.2.4b4
root@debian# python setup.py build
root@debian# python setup.py install
En el caso de sistemas windows podemos descargarlo desde aquí.
Una vez instalado podemos importar el módulo en nuestro programa python y hacer uso del mismo:

#!/usr/bin/env python

import MySQLdb

El módulo MySQLdb cumple con la Python Database API Specification v2.0: https://www.python.org/dev/peps/pep-0249/ y por ello maneja dos conceptos que debemos diferenciar: conexión a base de datos y cursor.

Inicialmente estableceremos la conexión a base de datos y una vez conectados los cursores nos permitirán ejecutar las operaciones y manejar los resultados de estas operaciones. Asociada a la misma conexión podemos crear diferentes cursores o bien crear un cursor para cada conexión.

Dicho de otro modo, el cursor solo existe en el contexto de una conexión previa y del mismo modo no existe el cursor si cerramos la conexión a base de datos. Esto dicho así puede resultar un poco confuso, así que vamos con algunos ejemplos concretos.

En los ejemplos que veremos a continuación vamos a trabajar con la base de datos employees que podéis descargar de la web de mysql: https://dev.mysql.com/doc/employee/en/.

En primer lugar vamos a ver como establecer la conexión a la base de datos:

#!/usr/bin/env python

import MySQLdb

DB_IP = "192.168.0.160"
DB_PORT = 3307
DB_NAME = "employees"
DB_USER = "root"
DB_PASSWORD = "1234"

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
except MySQLdb.Error as mysql_error:
    print "Error connecting to database: %s" % (str(mysql_error))
Como vemos, la librería tiene sus propias excepciones, en las que podemos acceder a los diferentes errores de mysql, como por ejemplo:

Error connecting to database: (1130, "Host '192.168.0.159' is not allowed to connect to this MySQL server")
Error connecting to database: (2003, 'Can\'t connect to MySQL server on \'192.168.0.160\' (113 "No route to host")')
Suponiendo que la base de datos está configurada con los permisos adecuados, el código anterior nos generará una conexión a MySQL que podemos usar para ejecutar operaciones.

Vamos a realizar algunas consultas sobre la tabla employees (el nombre coincide con el de la base de datos), que tiene la siguiente estructura:

mysql> desc employees;
+------------+---------------+------+-----+---------+-------+
| Field      | Type          | Null | Key | Default | Extra |
+------------+---------------+------+-----+---------+-------+
| emp_no     | int(11)       | NO   | PRI | NULL    |       |
| birth_date | date          | NO   |     | NULL    |       |
| first_name | varchar(14)   | NO   |     | NULL    |       |
| last_name  | varchar(16)   | NO   |     | NULL    |       |
| gender     | enum('M','F') | NO   |     | NULL    |       |
| hire_date  | date          | NO   |     | NULL    |       |
+------------+---------------+------+-----+---------+-------+
Vamos a buscar todos los usuarios (registros de employees) cuyo first_name sea Patricia. Al tratarse de un varchar la consulta sería:

mysql> select emp_no from employees where first_name='Patricia';
A nivel de nuestro código, se traduciría en:

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))
Como vemos, el orden a la hora de ejecutar la consulta es:

1. crear la conexión db_connection
2. definir un cursor my_cursor asociado a la conexión
3. ejecutar sobre el cursor mediante my_cursor.execute la query que hemos construido
4. recoger el resultado de la consulta del cursor mediante my_cursor.fetchall() el resultado de la operación
5. cerrar el cursor
6. cerrar la conexión

El resultado de una consulta queda almacenado en al variable result. Pero ¿Qué tipo de objeto es result? En este caso, al usar fetchall result va a ser siempre una tupla.

Esta tupla va a estar vacía si no se encuentran registros y en caso de que existan registros, va a contener a su vez tuplas con estos registros que podemos recorrer.

Volvamos sobre el ejemplo anterior para verlo con más detalle, buscando primero los registros cuyo firs_name es 'Patricia2':

username = 'Patricia2'
query = " select * from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

print "Found %d records" % len(result)

if len(result) > 0:
    for record in result:
        print record
else:
    print result
Si ejecutamos ese código obtendremos:

Found 0 records
()
En este caso existen 0 registros cuyo first_name es 'Patricia2' y el resultado es una tupla vacía result=()
Si buscamos registros cuyo first_name es 'Patricia' el resultado cambia:

Found 215 records
(10786L, datetime.date(1964, 5, 19), 'Patricia', 'dAstous', 'M', datetime.date(1989, 3, 14))
(11884L, datetime.date(1963, 4, 10), 'Patricia', 'Moehrke', 'M', datetime.date(1998, 6, 18))
(12693L, datetime.date(1956, 11, 25), 'Patricia', 'Demke', 'M', datetime.date(1986, 3, 3))
(14353L, datetime.date(1959, 1, 7), 'Patricia', 'Ghandeharizadeh', 'F', datetime.date(1990, 3, 9))
(14518L, datetime.date(1955, 6, 19), 'Patricia', 'Peir', 'M', datetime.date(1986, 7, 6))
..........................
En este caso result es una tupla con contenido: cada elemento de la tupla es a su vez una tupla con los valores de las diferentes columnas de la tabla respetando el orden de las columnas de la tabla (emp_no, birth_date, first_name, last_name , gender, hire_date).

Siguiendo con el ejemplo anterior, si nos interesa sacar solamente la lista de apellidos (last_name) de todos los usuarios de nombre Patricia, podemos recorrer las tuplas del result y quedarme con el cuarto campo de cada tupla:

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

print "Found %d records" % len(result)

if len(result) > 0:
    for record in result:
        # we get the fourth element of every tuple
        print record[3]
else:
    print result
De este modo, al ejecutar el código obtendremos:

Found 215 records
dAstous
Moehrke
Demke
Ghandeharizadeh
Peir
Dayana
Masada
Gulla
Lundstrom
.........
Otra forma más optima de hacer lo mismo sería modificar la consulta y sacar de la base de datos solamente aquellos datos que queramos, en este caso los apellidos:

username = 'Patricia'
query = " select last_name from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

print "Found %d records" % len(result)
¿Cuál creéis que será el resultado? ¿La lista de apellidos? casi...

Found 215 records
('dAstous',)
('Moehrke',)
('Demke',)
('Ghandeharizadeh',)
('Peir',)
('Dayana',)
('Masada',)
('Gulla',)
('Lundstrom',)
.......
Ya dijimos antes, que en caso de que existan registros el resultado va a ser una tupla de tuplas, por tanto para sacar los apellidos debemos quedarnos con el primer elemento de cada tupla (:

username = 'Patricia'
query = " select last_name from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

print "Found %d records" % len(result)

if len(result) > 0:
    for record in result:
        # we get the first element of every tuple
        print record[0]
else:
    print result
Ahora sí:

Found 215 records
('dAstous',)
('Moehrke',)
('Demke',)
('Ghandeharizadeh',)
('Peir',)
('Dayana',)
('Masada',)
('Gulla',)
('Lundstrom',)
.......
Con esto hemos cubierto todos los casos si empleamos fetchall.

Otra opción posible es usar fetchone. A diferencia de fetchall, fetchone va a devolver una tupla con los valores de un registro en caso de que exista algún registro o None en caso de que no exista ningún registro.

El matiz es importante porque si ejecutamos un len(result) sobre una consulta con fetchone podemos obtener una excepción porque puede ser None:

TypeError: object of type 'NoneType' has no len()
Volviendo al ejemplo anterior:

username = 'Patricia2'
query = " select * from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchone()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result is None:
    print "No record found "
else:
    print result
El resultado será:

No record found 
Si cambiamos 'Patricia2' por 'Patricia' obtendremos la tupla con los valores:

(10786L, datetime.date(1964, 5, 19), 'Patricia', 'dAstous', 'M', datetime.date(1989, 3, 14))
De nuevo si solo queremos quedarnos con el apellido podemos sacar todas las columnas y quedarnos con el campo cuarto:

username = 'Patricia'
query = " select * from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchone()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result is None:
    print "No record found "
else:
    # we get the fourth value
    print result[3]
O bien modificar la consulta y quedarnos con el primer campo de la tupla:

username = 'Patricia'
query = " select last_name from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchone()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result is None:
    print "No record found "
else:
    # we get the firts value
    print result[0]
En ambos casos el resultado es:

dAstous
Normalmente se emplea fetchone cuando buscamos sabiendo que en caso de existir el registro éste va a ser único o para operaciones en las que sepamos de antemano que el resultado es un único registro, como por ejemplo select count que podemos emplear para saber cuantos registros de la tabla tiene nombre 'Patricia' sin necesidad de traérnoslos y contarlos como hicimos al comienzo de la entrada:

username = 'Patricia'
query = " select count(*) from employees where first_name=\'%s\' " % username

try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query)
    result = my_cursor.fetchone()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result is None:
    print "No records found "
else:
    # we get the firts value
    print "Records found: %d" % result[0]
El resultado sería:

Records found: 215
Como comentábamos al principio, una misma conexión puede emplearse para encadenar varias sentencias, de modo que nos ahorramos el establecimiento. Podemos incluso reutilizar el cursor:

username_1 = 'Patricia'
username_2 = 'Oscar'
query_1 = " select count(*) from employees where first_name=\'%s\' " % username_1
query_2 = " select * from employees where first_name=\'%s\' " % username_2
try:
    db_connection = MySQLdb.connect(DB_IP, DB_USER, DB_PASSWORD, DB_NAME)
    my_cursor = db_connection.cursor()
    my_cursor.execute(query_1)
    result_1 = my_cursor.fetchone()
    my_cursor.execute(query_2)
    result_2 = my_cursor.fetchall()
    my_cursor.close()
    db_connection.close()
except MySQLdb.Error as mysql_error:
    print "Error executing query: %s" % (str(mysql_error))

if result_1 is None:
    print "No records found with first_name %s " % username_1
else:
    print "Records found with first_name %s: %d" % (username_1,result_1[0])

print "Records with first_name %s" % (username_2)
for record in result_2:
    print record
El resultado sería:

Records found with first_name Patricia: 215
Records with first_name Oscar
(11398L, datetime.date(1953, 5, 30), 'Oscar', 'Peir', 'M', datetime.date(1986, 8, 23))
(11530L, datetime.date(1956, 11, 14), 'Oscar', 'Jarecki', 'M', datetime.date(1985, 6, 10))
(15399L, datetime.date(1961, 8, 13), 'Oscar', 'Mukaidono', 'F', datetime.date(1985, 5, 1))
(17994L, datetime.date(1962, 1, 31), 'Oscar', 'Msuda', 'M', datetime.date(1990, 1, 21))
(20096L, datetime.date(1959, 5, 16), 'Oscar', 'Gladwell', 'M', datetime.date(1995, 9, 4))
(20737L, datetime.date(1957, 4, 8), 'Oscar', 'Acton', 'F', datetime.date(1987, 1, 29))
.......
Realmente, la forma óptima de operar sería disponer de un pool de conexiones a base de datos, de modo que para ejecutar una consulta obtengamos una conexión del pool. Veremos como hacer esto en futuras entradas.

11 de diciembre de 2017

Colas (RabbitMQ) - número variable de consumidores

En una entrada previa Colas (RabbitMQ) - consumidores mostrábamos como crear un pool de threads que iban a consumir mensajes de una cola y ejecutar un workflow para cada mensaje.

El número de consumidores (tamaño del pool de threads) era fijo. Pero ¿Qué sucede si el número de mensajes encolados se dispara? Sería bueno poder crear nuevos hilos que consumieran mensajes mientras el tamaño de la cola excediera un determinado tamaño y una vez que la cola volviera a tener un número de mensajes aceptable estos hilos terminasen.

Con este planteamiento vamos a tener un número de hilos fijos que vamos a denominar Main y por otro lados vamos a crear hilos Backing que solamente existirán cuando el tamaño de la cola exceda un determinado tamaño. Estos hilos Backing juegan el papel de refuerzo temporal a los hilos Main cuando éstos no son suficientes para consumir mensajes a un ritmo que mantenga los niveles de la cola en un determinado número de mensajes.

Para implementarlo vamos a añadir en el código una tarea encargada de comprobar de forma periódica el tamaño de la cola y en función del resultado creará nuevos hilos Backing. Podríamos realizar el chequeo usando la propia librería puka, pero en este caso vamos a aprovechar la api web que nos aporta el plugin de management de RabbitMQ del que ya hablamos en entradas previas.

El plugin nos proporciona un interfaz web que acepta diferentes métodos y operaciones: api-web-management
Basándonos en esa api y usando la librería requests de python (http://docs.python-requests.org) podemos escribir el siguiente código que va a devolvernos el número de mensajes que contiene una cola:

def get_messages_count_in_queue(queue):
    """ Get messages at Queue through RabbitMQ api"""
    try:
        url = 'http://' + BROKER_HOST + ':15672/api/queues/%2f/' + queue
        r = requests.get(url, auth=('guest', 'guest'))
        if r.status_code == 200:
            try:
                messages_count = r.json()['messages']
            except:
                messages_count = 0
        else:
            print "Error at RabbitMQ api. Code: %d" % (r.status_code)
            messages_count = 0
        return messages_count
    except Exception as error:
        print "Error checking messages at queue: %s" % str(error)
        return 0

Usando el código anterior podemos implementar una lógica como la que representa el siguiente diagrama:

:

El diagrama anterior se traduce en el siguiente código:

#!/usr/bin/env python

import puka
import time
import threading
import requests

BROKER_HOST = 'broker'
QUEUE = 'QUEUE-1'
BROKER_RETRY_CONNECTION = 10
MAIN_WORKER_NUMBER = 4
MAX_BACKING_WORKER_NUMBER = 10
WATCHDOG_CHECK_INTERVAL = 30
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'
QUEUE_SIZE_THRESHOLD = 10000

def get_messages_count_in_queue(queue):
    """ Get messages at Queue through RabbitMQ api"""
    try:
        url = 'http://' + BROKER_HOST + ':15672/api/queues/%2f/' + queue
        r = requests.get(url, auth=('guest', 'guest'))
        if r.status_code == 200:
            try:
                messages_count = r.json()['messages']
            except:
                messages_count = 0
        else:
            print "Error at RabbitMQ api. Code: %d" % (r.status_code)
            messages_count = 0
        return messages_count
    except Exception as error:
        print "Error checking messages at queue: %s" % str(error)
        return 0

def workflow(thread_name, thread_type, msg):
    """ Workflow for every message read from queue """
    try:
        if thread_type == 'main':
            print "[%s] Main thread read from queue: %s" % (thread_name, msg)
        elif thread_type == 'backing':
            print "(%s) Backing thread read from queue: %s" % (thread_name, msg)
    except Exception as error:
        print "Error: %s" % str(error)

class Main_worker(threading.Thread):
    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
                    print '[%s] Succesfully Connected at broker %s' % (self.threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise)
                    self.client.basic_ack(self.result)
                    workflow(thread_name=self.threadName, thread_type='main', msg=self.result['body'])
                except Exception as error:
                    print '[%s] Error at consumer: %s.....retrying connection after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)

class Backing_worker(threading.Thread):
    BROKER_CONNECTION = 0

    global queue_size
    global backing_threads_count

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.threadName = threading.currentThread().getName()
        while queue_size > QUEUE_SIZE_THRESHOLD:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.promise = self.client.connect()
                    self.client.wait(self.promise)
                    self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
                    print '[--%s--] Succesfully Connected at broker %s' % (self.threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[--%s--] Error at broker connection: %s.....retrying after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    self.result = self.client.wait(self.consume_promise, timeout=2)
                    self.client.basic_ack(self.result)
                    workflow(thread_name=self.threadName, thread_type='backing', msg=self.result['body'])
                except puka.ResourceError as error:
                    print '[--%s--] Error at consumer: %s.....retrying connection after %d seconds' % (self.threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
        backing_threads_count -=1


print 'Starting %d main consumer threads...' % (MAIN_WORKER_NUMBER)
for i in range (MAIN_WORKER_NUMBER):
    td = Main_worker()
    td.start()

while True:
    backing_threads_count = 0
    queue_size = get_messages_count_in_queue(QUEUE)
    if queue_size > QUEUE_SIZE_THRESHOLD and backing_threads_count < MAX_BACKING_WORKER_NUMBER:
        print "Queue size exceeds the threshold value. Starting new backing thread"
        td = Backing_worker()
        td.start()
        backing_threads_count+=1
    time.sleep(WATCHDOG_CHECK_INTERVAL)

Vamos a analizar el código, comenzando por algunos de los nuevos parámetros:

MAIN_WORKER_NUMBER: representa el numero de hilos fijos que van a componer el pool Main de consumidores. Se inicializa al comienzo y en todo momento van a estar leyendo mensajes de la cola.
MAX_BACKING_WORKER_NUMBER: representa el número máximo de hilos "de refuerzo" ejecutándose que vamos a permitir. Dicho de otro modo el máximo tamaño del pool Backing
WATCHDOG_CHECK_INTERVAL: representa el periodo de tiempo entre cada chequeo para comprobar el número de mensajes almacenados en la cola.
QUEUE_SIZE_THRESHOLD: representa el límite máximo de mensajes encolados. Si el tamaño de la cola excede este valor al realizar la comprobación se creará un nuevo thread "de refuerzo" siempre que el número de threads de refuerzo no exceda el valor MAX_BACKING_WORKER_NUMBER

Por otro lado hemos definido 2 clases, una para los Main threads y otra para los Backing threads. Las clases son similares, salvo el detalle del método run():

- En el caso de los Main threads el bucle while se ejecuta indefinidamente, es decir, este tipo de hilos siempre van a estar "vivos".
- En el caso de los Backing threads el bucle while se ejecuta mientras el tamaño de la cola exceda del valor establecido como umbral (QUEUE_SIZE_THRESHOLD). Una vez que el tamaño de la cola sea menor de ese valor el hilo termina.

while True:
vs

while queue_size > QUEUE_SIZE_THRESHOLD

Para que los Backing threads puedan tener acceso a las variables queue_size y backing_threads_count las definimos como globales dentro de la clase:

global queue_size
global backing_threads_count
En los referente al workflow que va a ejecutar cada hilo, hacemos que cada hilo especifique al método qué clase de hilo es (Main o Backing simplemente para trazar de diferente la ejecución de uno y otro con corchetes o paréntesis.

Por último un detalle referente al parámetro prefetch_count de la librería puka:

self.consume_promise = self.client.basic_consume(queue=QUEUE, prefetch_count=1000)
El parámetro prefetch_count representa el número de mensajes "reservados" para consumir en un canal de RabbitMQ, para los cuales no hemos enviado ack. Vamos a desarrollarlo un poco más a fondo.

Si ejecutamos el código de RabbitMQ consumidores y publicamos mensajes en una cola de RabbitMQ tendremos algo como lo siguiente:



Si nos fijamos tenemos 3 contadores:

- Ready: mensajes que pueden ser consumidos.
- Unacked: mensajes reservados para consumir en un canal, para los cuales no se ha recibido confirmación de haberlos consumido.
- Total: total de mensajes.

Si en la situación anterior con digamos 100k mensajes ejecutáramos nuestro código, si la ejecución del workflow tardara demasiado podría suceder que cada uno de los 4 Main threads reserve 25k mensajes para ser consumidos, de forma que tendríamos 100k mensajes Unacked y 0 mensajes Ready, con lo cual, aunque se creen nuevos threads Backing de refuerzo, estos no podrían consumir ningún mensajes, salvo que se publicaran nuevos mensajes en la cola. Es por ello que hacemos uso del prefecth_count

Podéis encontrar información más detallada al respecto en la propia página de RabbitMQ: https://www.rabbitmq.com/consumer-prefetch.html

Probad a ejecutar el código incluyendo algún time sleep de retardo en el método workflow() y "acelerad" la publicación de mensajes usando el código de https://codigo-python.blogspot.com.es/2017/11/colas-rabbitmq-threads-ii.html y comprobad como al crecer la cola se crean los hilos de refuerzo.

21 de noviembre de 2017

PyMongo - operaciones de lectura

Continuando con la entrada anterior vamos a ver algunas operaciones más que podemos llevar a cabo con la librería de pymongo.
Vamos a comenzar con las operaciones de lectura, lo que en términos de SQL serían las operaciones SELECT.

En este primer ejemplo vamos a ver como obtener todos los registros de una colección sin especificar ningún parámetro.

En este caso en la colección USERS de la base de datos TEST solo tenemos 3 registros que hemos guardado previamente utilizando el código que vimos en PyMongo - parte 1.

#!/usr/bin/env python

import pymongo

MONGODB_HOST = '192.168.0.169'
MONGODB_PORT = '27017'
MONGODB_TIMEOUT = 1000
MONGODB_DATABASE = 'TEST'

URI_CONNECTION = "mongodb://" + MONGODB_HOST + ":" + MONGODB_PORT +  "/"

try:
    client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT, maxPoolSize=10)
    client.server_info()
    print 'OK -- Connected to MongoDB at server %s' % (MONGODB_HOST)
except pymongo.errors.ServerSelectionTimeoutError as error:
    print 'Error with mongoDB connection: %s' % error
except pymongo.errors.ConnectionFailure as error:
    print 'Could not connect to MongoDB: %s' % error

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find(condition)
    for entry_json in result:
        print "Document got from collection %s: %s" % (destination, entry_json)
except Exception as error:
    print "Error getting data: %s" % str(error)

Si ejecutamos el código obtendremos los 3 documentos de la colección, en este caso:

OK -- Connected to MongoDB at server 192.168.0.169
Document got from collection USERS: {u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
Document got from collection USERS: {u'city': u'Paris', u'_id': ObjectId('5a11debd9ea4cbaa30bf01ff'), u'surname': u'Mouse', u'name': u'Mickey'}
Document got from collection USERS: {u'city': u'New York', u'_id': ObjectId('5a11df209ea4cbaa30bf0214'), u'surname': u'Kent', u'name': u'Clark'}
Lo primero que debe llamarnos la atención es el hecho de que nosotros hemos guardado en mongo un documento JSON con strings como valores y al recuperar estos datos con pymongo estamos obteniendo unicode.

La razón de esto es que mongo almacena los datos internamente en formato BSON (Binary JSON) y los string los codifica con UTF-8.

Python codifica unicode usando también UTF-8, de este modo para asegurar la compatibilidad pymongo devuelve unicode. Para obtener más información sobre unicode en python puede consultarse el siguiente link: https://docs.python.org/2/howto/unicode.html

La conversión de unicode a string y viceversa va a depender de la codificación empleada. En este caso es tan sencillo como convertir directamente (omitimos la primera parte del código en los posteriores ejemplos):

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find(condition)
    print ''
    for entry_json in result:
        print "Found document at %s collection with next values:" % destination
        for key in entry_json:
            print ' '  + key + ' : ' + str(entry_json[key])
        print ''
except Exception as error:
    print "Error getting data: %s" % str(error)
Al ejecutarlo ya podremos ver los datos guardados como strings:

OK -- Connected to MongoDB at server 192.168.0.169

Found document at USERS collection with next values:
 city : New York
 _id : 5a1174a5fbb7132f5c7d091c
 surname : Wick
 name : John

Found document at USERS collection with next values:
 city : Paris
 _id : 5a11debd9ea4cbaa30bf01ff
 surname : Mouse
 name : Mickey

Found document at USERS collection with next values:
 city : New York
 _id : 5a11df209ea4cbaa30bf0214
 surname : Kent
 name : Clark
El método find() de la librería pymongo que acabamos de emplear devuelve un objecto de tipo cursor que podemos recorrer con un bucle for según hemos visto en el código anterior.

Para saber cuantos documentos vamos a encontrarnos al recorrer un cursor podemos invocar result.count() como vemos en el siguiente código. En caso de que no exista ningún documento el resultado de result.count() será 0.

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find(condition)
    print "Documents found: %d" % result.count()
    print ''
    for entry_json in result:
        print "Found document at %s collection with next values:" % destination
        for key in entry_json:
            print ' '  + key + ' : ' + str(entry_json[key])
        print ''
except Exception as error:
    print "Error getting data: %s" % str(error)
Al ejecutarlo obtendremos el número de elementos encontrados (remarcado en negrita) antes de recorrer el cursor para mostrarlos:

OK -- Connected to MongoDB at server 192.168.0.169
Documents found: 3

Found document at USERS collection with next values:
 city : New York
 _id : 5a1174a5fbb7132f5c7d091c
 surname : Wick
 name : John

Found document at USERS collection with next values:
 city : Paris
 _id : 5a11debd9ea4cbaa30bf01ff
 surname : Mouse
 name : Mickey

Found document at USERS collection with next values:
 city : New York
 _id : 5a11df209ea4cbaa30bf0214
 surname : Kent
 name : Clark
El método find() acepta uno o varios parámetros de cara a llevar a cabo una búsqueda. En los ejemplos anterior teníamos condition={}, que viene a traducirse como "dame todos las entradas".

Para especificar una condición en la búsqueda lo haremos definiendo la condición como un diccionario indicando clave:valor. Así por ejemplo, si queremos sacar la lista de usuarios que viven en "New York" definiremos condition={'city':'New Yor'}:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {'city':'New York'}
    result = collection.find(condition)
    for entry_json in result:
        print entry_json
except Exception as error:
    print "Error getting data: %s" % str(error)
La salida sería:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
{u'city': u'New York', u'_id': ObjectId('5a11df209ea4cbaa30bf0214'), u'surname': u'Kent', u'name': u'Clark'}
Si queremos especificar más de un parámetro al ejecutar la búsqueda, basta incluir varios entradas clave:valor al diccionario. De este modo si definimos:

condition = {'city':'New York', 'name':'John'}
obtendremos un único documento JSON al recorrer el cursor result:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
En ocasiones puede que nos interese obtener un solo documento, bien porque sepamos que solamente hay uno que cumpla una cierta condición (clave única) o bien porque nos baste con obtener un solo documento.

En ese caso pymongo dispone del método find_one().

A diferencia de find() que siempre devolvía un cursor, la función find_one() va a devolver un solo documento o bien None si no existe ningún documento que cumpla las condiciones que le pasemos.

En caso de que lo invoquemos usando condition={} nos devolverá el primer documento de la colección:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find_one(condition)
    print result
except Exception as error:
    print "Error getting data: %s" % str(error)
Si ejecutamos el código obtendremos uno de los 3 elementos de la lista:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
Si definimos condition = {'city':'New York'} igual que hicimos antes con el método find() donde obtuvimos 2 documentos, vemos que en este caso obtendremos solamente uno:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = condition = {'city':'New York', 'name':'John'}
    result = collection.find_one(condition)
    print result
except Exception as error:
    print "Error getting data: %s" % str(error)
Efectivamente obtenemos un solo documento:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
Al emplear find_one() hay que contemplar la posibilidad de que nos devuelva un None:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = condition = {'city':'Tokio'}
    result = collection.find_one(condition)
    if result is None:
        print "None document found!!"
    else:
        print result
except Exception as error:
    print "Error getting data: %s" % str(error)
En este caso obtendremos el mensaje informando que no se ha encontrado ningún documento:

None document found!!

No vamos a entrar en más detalles, solo mencionar que también podemos usar las funciones find_one_and_delete(), find_one_and_replace() y find_one_and_update() cuya documentación podemos encontrar en http://api.mongodb.com/python/current/api/pymongo/collection.html