21 de noviembre de 2017

PyMongo - operaciones de lectura

Continuando con la entrada anterior vamos a ver algunas operaciones más que podemos llevar a cabo con la librería de pymongo.
Vamos a comenzar con las operaciones de lectura, lo que en términos de SQL serían las operaciones SELECT.

En este primer ejemplo vamos a ver como obtener todos los registros de una colección sin especificar ningún parámetro.

En este caso en la colección USERS de la base de datos TEST solo tenemos 3 registros que hemos guardado previamente utilizando el código que vimos en PyMongo - parte 1.

#!/usr/bin/env python

import pymongo

MONGODB_HOST = '192.168.0.169'
MONGODB_PORT = '27017'
MONGODB_TIMEOUT = 1000
MONGODB_DATABASE = 'TEST'

URI_CONNECTION = "mongodb://" + MONGODB_HOST + ":" + MONGODB_PORT +  "/"

try:
    client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT, maxPoolSize=10)
    client.server_info()
    print 'OK -- Connected to MongoDB at server %s' % (MONGODB_HOST)
except pymongo.errors.ServerSelectionTimeoutError as error:
    print 'Error with mongoDB connection: %s' % error
except pymongo.errors.ConnectionFailure as error:
    print 'Could not connect to MongoDB: %s' % error

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find(condition)
    for entry_json in result:
        print "Document got from collection %s: %s" % (destination, entry_json)
except Exception as error:
    print "Error getting data: %s" % str(error)

Si ejecutamos el código obtendremos los 3 documentos de la colección, en este caso:

OK -- Connected to MongoDB at server 192.168.0.169
Document got from collection USERS: {u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
Document got from collection USERS: {u'city': u'Paris', u'_id': ObjectId('5a11debd9ea4cbaa30bf01ff'), u'surname': u'Mouse', u'name': u'Mickey'}
Document got from collection USERS: {u'city': u'New York', u'_id': ObjectId('5a11df209ea4cbaa30bf0214'), u'surname': u'Kent', u'name': u'Clark'}
Lo primero que debe llamarnos la atención es el hecho de que nosotros hemos guardado en mongo un documento JSON con strings como valores y al recuperar estos datos con pymongo estamos obteniendo unicode.

La razón de esto es que mongo almacena los datos internamente en formato BSON (Binary JSON) y los string los codifica con UTF-8.

Python codifica unicode usando también UTF-8, de este modo para asegurar la compatibilidad pymongo devuelve unicode. Para obtener más información sobre unicode en python puede consultarse el siguiente link: https://docs.python.org/2/howto/unicode.html

La conversión de unicode a string y viceversa va a depender de la codificación empleada. En este caso es tan sencillo como convertir directamente (omitimos la primera parte del código en los posteriores ejemplos):

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find(condition)
    print ''
    for entry_json in result:
        print "Found document at %s collection with next values:" % destination
        for key in entry_json:
            print ' '  + key + ' : ' + str(entry_json[key])
        print ''
except Exception as error:
    print "Error getting data: %s" % str(error)
Al ejecutarlo ya podremos ver los datos guardados como strings:

OK -- Connected to MongoDB at server 192.168.0.169

Found document at USERS collection with next values:
 city : New York
 _id : 5a1174a5fbb7132f5c7d091c
 surname : Wick
 name : John

Found document at USERS collection with next values:
 city : Paris
 _id : 5a11debd9ea4cbaa30bf01ff
 surname : Mouse
 name : Mickey

Found document at USERS collection with next values:
 city : New York
 _id : 5a11df209ea4cbaa30bf0214
 surname : Kent
 name : Clark
El método find() de la librería pymongo que acabamos de emplear devuelve un objecto de tipo cursor que podemos recorrer con un bucle for según hemos visto en el código anterior.

Para saber cuantos documentos vamos a encontrarnos al recorrer un cursor podemos invocar result.count() como vemos en el siguiente código. En caso de que no exista ningún documento el resultado de result.count() será 0.

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find(condition)
    print "Documents found: %d" % result.count()
    print ''
    for entry_json in result:
        print "Found document at %s collection with next values:" % destination
        for key in entry_json:
            print ' '  + key + ' : ' + str(entry_json[key])
        print ''
except Exception as error:
    print "Error getting data: %s" % str(error)
Al ejecutarlo obtendremos el número de elementos encontrados (remarcado en negrita) antes de recorrer el cursor para mostrarlos:

OK -- Connected to MongoDB at server 192.168.0.169
Documents found: 3

Found document at USERS collection with next values:
 city : New York
 _id : 5a1174a5fbb7132f5c7d091c
 surname : Wick
 name : John

Found document at USERS collection with next values:
 city : Paris
 _id : 5a11debd9ea4cbaa30bf01ff
 surname : Mouse
 name : Mickey

Found document at USERS collection with next values:
 city : New York
 _id : 5a11df209ea4cbaa30bf0214
 surname : Kent
 name : Clark
El método find() acepta uno o varios parámetros de cara a llevar a cabo una búsqueda. En los ejemplos anterior teníamos condition={}, que viene a traducirse como "dame todos las entradas".

Para especificar una condición en la búsqueda lo haremos definiendo la condición como un diccionario indicando clave:valor. Así por ejemplo, si queremos sacar la lista de usuarios que viven en "New York" definiremos condition={'city':'New Yor'}:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {'city':'New York'}
    result = collection.find(condition)
    for entry_json in result:
        print entry_json
except Exception as error:
    print "Error getting data: %s" % str(error)
La salida sería:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
{u'city': u'New York', u'_id': ObjectId('5a11df209ea4cbaa30bf0214'), u'surname': u'Kent', u'name': u'Clark'}
Si queremos especificar más de un parámetro al ejecutar la búsqueda, basta incluir varios entradas clave:valor al diccionario. De este modo si definimos:

condition = {'city':'New York', 'name':'John'}
obtendremos un único documento JSON al recorrer el cursor result:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
En ocasiones puede que nos interese obtener un solo documento, bien porque sepamos que solamente hay uno que cumpla una cierta condición (clave única) o bien porque nos baste con obtener un solo documento.

En ese caso pymongo dispone del método find_one().

A diferencia de find() que siempre devolvía un cursor, la función find_one() va a devolver un solo documento o bien None si no existe ningún documento que cumpla las condiciones que le pasemos.

En caso de que lo invoquemos usando condition={} nos devolverá el primer documento de la colección:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = {}
    result = collection.find_one(condition)
    print result
except Exception as error:
    print "Error getting data: %s" % str(error)
Si ejecutamos el código obtendremos uno de los 3 elementos de la lista:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
Si definimos condition = {'city':'New York'} igual que hicimos antes con el método find() donde obtuvimos 2 documentos, vemos que en este caso obtendremos solamente uno:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = condition = {'city':'New York', 'name':'John'}
    result = collection.find_one(condition)
    print result
except Exception as error:
    print "Error getting data: %s" % str(error)
Efectivamente obtenemos un solo documento:

{u'city': u'New York', u'_id': ObjectId('5a1174a5fbb7132f5c7d091c'), u'surname': u'Wick', u'name': u'John'}
Al emplear find_one() hay que contemplar la posibilidad de que nos devuelva un None:

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    condition = condition = {'city':'Tokio'}
    result = collection.find_one(condition)
    if result is None:
        print "None document found!!"
    else:
        print result
except Exception as error:
    print "Error getting data: %s" % str(error)
En este caso obtendremos el mensaje informando que no se ha encontrado ningún documento:

None document found!!

No vamos a entrar en más detalles, solo mencionar que también podemos usar las funciones find_one_and_delete(), find_one_and_replace() y find_one_and_update() cuya documentación podemos encontrar en http://api.mongodb.com/python/current/api/pymongo/collection.html

19 de noviembre de 2017

PyMongo - conexión

En esta nueva entrada vamos a ver como interactuar con la base de datos MongoDB usando python.

MongoDB es una base de datos NoSQL y gracias a su buen rendimiento y escalado podemos considerarla como una alternativa posible dentro del mundo del Big Data. Tiene una amplia documentación disponible en su web

Para poder trabajar contra MongoDB vamos a emplear la librería pymongo: https://pypi.python.org/pypi/pymongo

Pymongo tiene varias virtudes, una de ellas es que es "thread-safe", es decir, que es apta para trabajar con aplicaciones multi-hilo.
En su lista de faq podéis encontrar esta junto a otras muchas features.

El primer paso para trabajar con pymongo es instalar la librería.
La forma más cómoda de instalarla es a través de pip ejecutando pip install pymongo, pero como siempre podemos optar por descargar los paquetes de la web y hacerlo de forma manual tanto en sistemas linux como windows: https://pypi.python.org/pypi/pymongo

Una vez instalada la librería ya podemos comenzar a escribir código.
Vamos a suponer que contamos con una instalación básica de mongo que no requiere autenticación para conectarse. El código para conectarse a MongoDB quedaría del siguiente modo:

#!/usr/bin/env python

import pymongo

MONGODB_HOST = '192.168.0.169'
MONGODB_PORT = '27017'
MONGODB_TIMEOUT = 1000

URI_CONNECTION = "mongodb://" + MONGODB_HOST + ":" + MONGODB_PORT +  "/"

try:
    client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT)
    client.server_info()
    print 'OK -- Connected to MongoDB at server %s' % (MONGODB_HOST)
    client.close()
except pymongo.errors.ServerSelectionTimeoutError as error:
    print 'Error with MongoDB connection: %s' % error
except pymongo.errors.ConnectionFailure as error:
    print 'Could not connect to MongoDB: %s' % error

Con el código anterior simplemente nos conectamos a la base de datos TEST (en caso de que no exista se crea de forma automática) y desconectamos.

Los usuarios de windows es posible que obtengan errores al incorporar el parámetro
serverSelectionTimeoutMS=MONGODB_TIMEOUT
dependiendo de la versión de la librería. En ese caso basta con eliminarlo y dejar simplemente
client = pymongo.MongoClient(URI_CONNECTION)

Al ejecutar el código, si todo va bien obtendremos un mensaje de confirmación:

OK -- Connected to MongoDB at server 192.168.0.169
En caso de error, obtendremos también un mensaje que nos dará la pista de qué está fallando.

Un dato importante a tener en cuenta es que cuando invocamos
pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT)
la librería en realidad está creando un pool de conexiones basado en threads con un tamaño máximo determinado por el parámetro maxPoolSize, que tiene un valor por defecto de 100.

Es decir, la línea:

client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT)
es equivalente a

client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT, maxPoolSize=100)

Bien, ya tenemos casi todo listo para realizar operaciones sobre la base de datos. Antes de continuar vamos a hacer un inciso para apuntar algunas características de MongoDB, sobre todo de cara a la gente que viene del mundo SQL.

En primer lugar mongo almacena los datos en colecciones, que vienen a ser el equivalente a las tablas del mundo SQL. Dentro de esas colecciones MongoDB va a almacenar documentos, que serían el equivalente a los registros del mundo SQL. Estos documentos son en realidad estructuras JSON, que en términos de python, podemos traducir a diccionarios.

Otra de las caracteríticas de MongoDB es que no es necesario definir de forma previa la estructura que van a tener los documentos de una colección, de hecho, ni siquiera es necesario crear la colección, al guardar un documento JSON se creará automáticamente la colección, de modo análogo a lo que sucede con la propia base de datos, que también se creará de forma automática si no existiera.
Dicho de otro modo, al crear el primer documento de una colección dentro de una base de datos se crearían de forma automática tanto la colección como la base de datos en caso de que no existieran,

Teniendo en cuenta esto, para guardar algo en nuestro MongoDB lo único que tenemos que hacer es crear un diccionario, obtener una conexión del pool y ejecutar el insert correspondiente.

Vamos a crear un diccionario con una serie de claves y valores y lo guardaremos. En este caso, vamos a salvar en la colección 'USERS' de la base de datos 'TEST' un registro y vamos a comprobar cómo efectivamente se crean de forma automática tanto la base de datos como la colección.

Suponiendo que partimos de una instalación limpia de mongo si nos conectamos a la consola de mongo y le decimos que nos muestre las bases de datos tendremos:

> show databases;
admin  0.000GB
local  0.000GB
Vamos a ejecutar el código siguiente, el cual creará un documento en la colección USERS de la base de datos TEST:

#!/usr/bin/env python

import pymongo

MONGODB_HOST = '192.168.0.169'
MONGODB_PORT = '27017'
MONGODB_TIMEOUT = 1000
MONGODB_DATABASE = 'TEST'

URI_CONNECTION = "mongodb://" + MONGODB_HOST + ":" + MONGODB_PORT +  "/"

try:
    client = pymongo.MongoClient(URI_CONNECTION, serverSelectionTimeoutMS=MONGODB_TIMEOUT, maxPoolSize=10)
    client.server_info()
    print 'OK -- Connected to MongoDB at server %s' % (MONGODB_HOST)
except pymongo.errors.ServerSelectionTimeoutError as error:
    print 'Error with mongoDB connection: %s' % error
except pymongo.errors.ConnectionFailure as error:
    print 'Could not connect to MongoDB: %s' % error

database_entry = {}
database_entry['name'] = 'John'
database_entry['surname'] = 'Wick'
database_entry['city'] = 'New York'

# or equivalent database_entry={'name':'John', 'surname':'Wick', 'city':'New York'}

try:
    destination = 'USERS'
    collection = client[MONGODB_DATABASE][destination]
    collection.insert(database_entry)
    print "Data saved at %s collection in %s database: %s" % (destination, MONGODB_DATABASE, database_entry)
except Exception as error:
    print "Error saving data: %s" % str(error)
Al ejecutar el código, si todo va bien obtendremos algo como lo siguiente:

OK -- Connected to MongoDB at server 192.168.0.169
Data saved at USERS collection in TEST database: {'city': 'New York', '_id': ObjectId('5a1174a5fbb7132f5c7d091c'), 'surname': 'Wick', 'name': 'John'}
Si vamos a la consola de mongo, comprobamos que efectivamente se han creado la base de datos y la colección y que además tenemos el registro guardado:

> show databases;
TEST   0.000GB
admin  0.000GB
local  0.000GB

> use TEST;
switched to db TEST

> show collections;
USERS

> db.USERS.find({}).pretty();
{
 "_id" : ObjectId("5a1174a5fbb7132f5c7d091c"),
 "city" : "New York",
 "surname" : "Wick",
 "name" : "John"
}
Lo primero que nos llamará la atención es el campo '_id'. Es un campo que crea de forma automática mongo y que juega el papel de índice actuando como clave única.

En futuras entradas veremos como explotar la librería pymongo ejecutando diferentes operaciones sobre MongoDB.

18 de noviembre de 2017

Colas (RabbitMQ) - Publicando mensajes

Siguiendo con la interacción entre RabbitMQ y python usando la librería puka, en esta entrada vamos a ver como publicar mensajes.

En la anterior entrada vimos como crear un pool de threads que se conectaba a un broker de RabbitMQ, consumía mensajes y ejecutaba un workflow para cada mensaje consumido. En esta entrada veremos como crear un pool de threads que van a publicar mensajes.

En este caso lo que vamos a publicar son strings aleatorios de cierta longitud que vamos a generar con la función genrandom que acepta como parámetro la longitud con la que queramos generar los mensajes.

Estableceremos un tiempo de espera entre la publicación de mensaje que vendrá determinado por el valor de PUBLISHER_SLEEP_TIME en segundos. Este parámetro junto con el número de threads WORKER_NUMBER van a determinar el rate de publicación.

Igual que en el ejemplo de los consumidores, en caso de problemas con la conexión al broker se establece una política de reconexión con reintentos de conexión cada BROKER_RETRY_CONNECTION segundos.

De este modo el código quedaría del siguiente modo:

#!/usr/bin/env python

import puka
import time
import threading
import random
import string

BROKER_HOST = '192.168.0.169'
QUEUE = 'QUEUE-1'
BROKER_RETRY_CONNECTION = 10
WORKER_NUMBER = 4
BROKER_CONNECTION_DEFINITION = 'amqp://' + BROKER_HOST + '/'
PUBLISHER_SLEEP_TIME = 0.2

def genrandom(length):
    chain = ''
    for i in range(length+1):
        chain = chain + random.choice(string.ascii_uppercase)
    return chain

class Threaded_worker(threading.Thread):
    BROKER_CONNECTION = 0

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        threadName = threading.currentThread().getName()
        while True:
            if self.BROKER_CONNECTION == 0:
                try:
                    self.client = puka.Client(BROKER_CONNECTION_DEFINITION)
                    self.client.wait(self.client.connect())
                    self.client.wait(self.client.queue_declare(QUEUE))
                    print '[%s] Succesfully Connected at broker %s' % (threadName, BROKER_HOST)
                    self.BROKER_CONNECTION = 1
                except Exception as error:
                    print '[%s] Error at broker connection: %s.....retrying after %d seconds' % (
                    threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)
            else:
                try:
                    msgToPublish = genrandom(40)
                    self.client.wait(self.client.basic_publish(exchange ='', routing_key=QUEUE, body=msgToPublish))
                    time.sleep(PUBLISHER_SLEEP_TIME)
                    print '[%s] Message published at queue: %s' % (threadName, msgToPublish)
                except Exception as error:
                    print '[%s] Error at publisher: %s.....retrying connection after %d seconds' % (
                    threadName, str(error), BROKER_RETRY_CONNECTION)
                    self.BROKER_CONNECTION = 0
                    time.sleep(BROKER_RETRY_CONNECTION)


print 'Starting %d publisher threads...' % (WORKER_NUMBER)
for i in range(WORKER_NUMBER):
    td = Threaded_worker()
    td.start()


Al ejecutarlo obtendremos lo siguiente:

Starting 4 publisher threads...
[Thread-1] Succesfully Connected at broker 192.168.0.169
[Thread-3] Succesfully Connected at broker 192.168.0.169
[Thread-2] Succesfully Connected at broker 192.168.0.169
[Thread-4] Succesfully Connected at broker 192.168.0.169
[Thread-1] Message publish at queue: YKHYRBRJWQLOLSREOGONKZICBVOQBXHUALGXCWSIT
[Thread-3] Message publish at queue: AXCLEQMGONKZUPNNOIFFZTLZBMAPRRNCELSWVORAU
[Thread-2] Message publish at queue: HAXNHDXBVJVUVOLTSYRTGEZMXGSWLOTIAMDWIQCQZ
[Thread-4] Message publish at queue: MDRJVZBHFHHCIPNXHFATUHEQWBQSJQWVOVTVIAEMA
[Thread-1] Message publish at queue: SBIRZMHIOGQDTQVSZKGGZICXJIEEXVLCCGFHONQIE
[Thread-3] Message publish at queue: WPLUWHCJWEBYETTDWFIYGBEERADWFVGPQTHPRJIKR
[Thread-2] Message publish at queue: JXGJBYEIUQRZRLFKUSKRPAMKSTZDZGOJIBFGKVLEM
[Thread-4] Message publish at queue: OELBGLBPYFTEBZVKGCGUTBCLQDXRCQLPLPDJDRRRM
........
Con el valor PUBLISHER_SLEEP_TIME = 0.2 deberíamos tener un rate de publicación de 5 mensajes por cada hilo, de forma que si fijamos WORKER_NUMBER = 4, al ejecutarse el código deberían publicarse 20 mensajes por segundo.

Efectivamente, si lo lanzamos y vamos a la web de administración de RabbitMQ vemos que el rate de publicación es el esperado: