Mi granito de java: Cassandra

Google+ Badge

jueves, 23 de junio de 2011

Cassandra

Junto a Guillermo Kessler hemos hecho una investigación sobre una base de datos NO SQL llamada Cassandra. Primero veremos sus características principales y luego les mostraremos el código fuente utilizado.

Cassandra es una Base de Datos NO SQL. Posee una estructura par-valor para almacenar datos de manera escalable, consistente y distribuída. Fue creada para manejar grandes volúmenes de datos distribuidos en numerosos servidores estándar, ofreciendo alta disponibilidad.
La historia de Cassandra nace directamente de la experiencia que supuso Dynamo para Amazon. Facebook contrató a uno de sus autores y le encargó diseñar un nuevo sistema para sus datos. Este ingeniero desarrolló Cassandra y, en 2008, Facebook liberó su código.

Características
  • Punto de falla: maneja grandes volúmenes de datos distribuidos en numerosos servidores estándar, ofreciendo alta disponibilidad sin ningún punto único de falla.
  • Basada en clave-valor: tiene un almacén de valores de claves manejado con consistencia eventual (modelo de consistencia usado en programación paralela). Las claves mapean hacia múltiples valores que se agrupan en familias de columnas. Esas familias se definen cuando se crea una base Cassandra, pero luego se les puede agregar columnas a las diferentes familias. Las columnas se pueden agregar a claves específicas y así diferentes claves tendrán diferentes cantidades de columnas dentro de una misma familia. Los valores de una familia de columnas para cada clave se almacenan juntos, haciendo de Cassandra un híbrido entre una DBMS orientada a columnas y un almacén de datos orientado a filas.
  • Soporte comercial: debido a que es un producto relativamente nuevo, recién acaba de aparecer una primer empresa ofreciendo soporte comercial para Cassandra. Se trata de Riptano (http://www.riptano.com/), que además está creando herramientas para usar sobre el core de Cassandra. El plan de Riptano consiste en usar a Cassandra como plataforma central e ir agregándole porciones propietarias que hagan a su versión comercial. Uno de sus primeros servicios podría ser el de migración para que usuarios de otras bases de datos puedan pasar su información a Cassandra.
  • Releases: Apache se encarga de sacar un nuevo release cada tres o 4 meses. Hoy esta en su versión 0.6 y ya esta trabajando en su versión 0.7.

Clientes

Un cliente es una API que nos permite conectar y realizar operaciones contra Cassandra. Esta última usa como cliente por defecto Thrift que es un cliente de bajo nivel. Al ser un cliente de bajo nivel resulta más complejo y lento operar contra esta base de datos.
Existen clientes de alto nivel para diferentes lenguajes como Java, PHP, Perl, etc. Un cliente de alto nivel nos va a permitir operar mas fácil y rápido contra la base de datos, facilitandonos las cosas. En la documentación oficial de Cassandra se recomienda el uso de estos clientes de alto nivel por encima de Thrift.

Los clientes de alto nivel recomendados para Java en la última versión estable de Cassandra (0.6.7) son:
Lamentablemente la mayoría de estos clientes tiene documentación escasa y poco mantenimiento.
Dentro de los nombrados en la lista, el más robusto es Hector. Si bien es sencillo de usar comparado con Thift, su documentación no es del todo completa y se debe tener cuidado que en cada versión de Cassandra se usan librerias diferentes de Hector, por lo cual se debe tener la versión correspondiente de Hector con la de Cassandra.


Ventajas
  1. Dispone de consistencia eventual, al igual que el sistema Dynamo de Amazon.
  2. Proporciona un modelo de datos basados en ColumnFamily, más rico que el tradicional modelo de clave/valor, al igual que el modelo BigTable de Google.
  3. Es altamente escalable y distribuida. Normalmente se ejecutan en clusters de servidores formados por ordenadores baratos, por lo que la expansión del sistema es realmente sencilla.
  4. Es rápida ya que elimina el cuello de botella que supone el tener que traducir las consultas a lenguaje SQL.

Pruebas

Hemos realizado dos tipos de pruebas: en modo local y en modo distribuído. En ambos casos se ha observado una gran velocidad en la lectura/escritura de datos. Un ejemplo claro, es que con una configuración local ha podido traer a memoria 50.000 objetos repetidos de un total de 800.000 en sólo 3.08 segundos. La configuración y el código de esta prueba se encuentra al final de este post.
No hemos observado problema con la integración de varios nodos para trabajar conjuntamente en red: hemos probado 3 máquinas trabajando juntas sin mayores problemas.


Es obvio que la velocidad es el fuerte de Cassandra, es por eso que es utilizada por empresas que tienen que manejar un gran volumen de datos.

Conclusión

Observando a las compañías que manejan un enorme volúmen de datos, tales como Google, Amazon o Facebook, nos damos cuenta que sus sistemas de datos no están basados en bases de datos tradicionales (relacionales). Google utiliza BigTable, Amazon usa Dynamo, y Facebook y Digg gestionan sus datos con Cassandra. Actualmente, Twitter1 también esta migrando sus datos a esta tecnología.
¿Por qué las compañías más relevantes del mundo de la tecnología abandonan las bases de datos relacionales? Principalmente, porque son muy costosas y lentas cuando manejan tal cantidad de datos. Por ejemplo, Cassandra es capaz de escribir en disco 50GB de datos en tan sólo 0.12 milisegundos, 2500 veces más rápido que MySQL.
Entonces, ¿todas las compañias seguirán estos pasos en un futuro? No, estas bases de datos deben utilizarse en casos especiales, cuando la información debe ser accedida por millones de personas y la velocidad es un tema primordial.
Para una compañía que no posee estas características hay varios puntos a tener en cuenta: no soporta trasacciones ya que tiene un almacén de valores de claves manejado con consistencia eventual, lo que quiere decir que los datos “eventualmente” se van a ver, pero no siempre reflejan la realidad. Por otro lado, no se tiene todas las ventajas conocidas de las base de datos relacionales como DB2 u Oracle, que van desde el soporte técnico hasta la facilidad para obtener información mediante reportes. Por último, la documentación es escasa y pobre.
Dicho de otra manera, si no tenes una red social o algo que se le parezca ni se te ocurra complicarte sin sentido!!!!

Detalle de las pruebas:

Adjuntamos la configuración/código utilizado:
storage-conf.xml

<Keyspace Name="CDT">          

       <!-- Un Column Family podr&iacute;a asimilarse al concepto de Tabla -->  

    
       <ColumnFamily Name="Proyectos" CompareWith="BytesType"/>  

         
       <ColumnFamily Name="Desarrolladores" CompareWith="BytesType"/>          

         <!-- Entradas en un blog, esta ser&aacute; una super columna.-->          

       <ColumnFamily CompareWith="TimeUUIDType" ColumnType="Super"  

   CompareSubcolumnsWith="BytesType" Name="Trabajan"/>          

         

       <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>  

     <!-- Number of replicas of the data -->  

     <ReplicationFactor>1</ReplicationFactor>  

     <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>  

    </Keyspace>        

 </Keyspaces>


package main;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.io.UnsupportedEncodingException;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.TException;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.SuperColumn;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.safehaus.uuid.UUID;
import org.safehaus.uuid.UUIDGenerator;

public class TestCassandra {

    public static final String UTF8 = "UTF8";

    public static final String KEYSPACE = "CDT";
    public static final String PROYECTOS_FAMILY = "Proyectos";
    public static final String DESA_FAMILY = "Desarrolladores";
    public static final String TRABAJAN_FAMILY = "Trabajan";

    public static void test(long cantidad, boolean abrirTr) throws Exception {
        long inicio, intermedio, fin = 0;

        TTransport tr = new TSocket("localhost", 9160);
        TProtocol proto = new TBinaryProtocol(tr);
        Cassandra.Client client = new Cassandra.Client(proto);
        if (abrirTr) {
            System.out.println("Comienzo la insercion");
            inicio = new Date().getTime();
            for (int i = 0; i < cantidad; i++) {
                tr.open();
                inserting(client, i);
                tr.close();
            }
            System.out.println("Fin inserción");
            intermedio = new Date().getTime();
            System.out.println("Comienzo la lectura");

            for (int i = 0; i < cantidad; i++) {
                tr.open();
                reading(client, i);
                tr.close();
            }
            fin = new Date().getTime();

        } else {

            tr.open();
            inicio = new Date().getTime();
            System.out.println("Comienzo la inserción");
            for (int i = 0; i < cantidad; i++) {

                inserting(client, i);

            }
            tr.close();

            System.out.println("Fin inserción");
            intermedio = new Date().getTime();
            System.out.println("Comienzo la lectura");
            tr.open();
            for (int i = 0; i < cantidad; i++) {

                reading(client, i);

            }
            System.out.println("Fin lectura");

            tr.close();
            fin = new Date().getTime();
        }
        double insercion = (double) (intermedio - inicio) / 1000;
        double lectura = (double) (fin - intermedio) / 1000;
        double total = insercion + lectura;
        System.out.println("Entrada de datos: " + insercion);
        System.out.println("Lectura de datos: " + lectura);
        System.out.println("Total proceso: " + total);

    }

    private static void inserting(Cassandra.Client client, long i)
            throws UnsupportedEncodingException, InvalidRequestException,
            UnavailableException, TimedOutException, TException {

        String keyID = "java" + i;
        // INSERTAMOS DATOS.
        long timestamp = System.currentTimeMillis();

        // Un proyecto
        ColumnPath colPathName = new ColumnPath(PROYECTOS_FAMILY);
        colPathName.setColumn("NOMBRE".getBytes(UTF8));
        client.insert(KEYSPACE, keyID, colPathName, "Proyecto".getBytes(UTF8),
                timestamp, ConsistencyLevel.ONE);
        ColumnPath colPath = new ColumnPath(PROYECTOS_FAMILY);
        colPath.setColumn("email".getBytes(UTF8));

        client.insert(KEYSPACE, keyID, colPath, "blablabla".getBytes(UTF8),
                timestamp, ConsistencyLevel.ONE);

       
        String llaveID = "Me gusta esto" + i;
        ColumnPath colName = new ColumnPath(DESA_FAMILY);
        colName.setColumn("unaColumna".getBytes(UTF8));
        client.insert(KEYSPACE, llaveID, colName, keyID.getBytes(UTF8),
                timestamp, ConsistencyLevel.ONE);

        List<Column> entriesColumns = new ArrayList<Column>();

        entriesColumns.add(new Column("observaciones".getBytes("UTF8"),
                "Hijos nuestros".getBytes("UTF-8"), timestamp));

        entriesColumns.add(new Column("pm".getBytes("UTF8"),
                "te vas a la B".getBytes("UTF-8"), timestamp));

        UUID uuid = UUIDGenerator.getInstance().generateTimeBasedUUID();

        SuperColumn superColumn = new SuperColumn(uuid.toByteArray(),
                entriesColumns);

        ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
        cosc.setSuper_column(superColumn);

        List<Mutation> mutList = new ArrayList<Mutation>();
        Mutation mut = new Mutation();
        mut.setColumn_or_supercolumn(cosc);
        mutList.add(mut);

        Map<String, List<Mutation>> mapCF = new HashMap<String, List<Mutation>>(
                1);
        mapCF.put(TRABAJAN_FAMILY, mutList);

        Map<String, Map<String, List<Mutation>>> muts = new HashMap<String, Map<String, List<Mutation>>>();
        muts.put(llaveID, mapCF);

        client.batch_mutate(KEYSPACE, muts, ConsistencyLevel.ONE);

    }

    private static void reading(Cassandra.Client client, long i)
            throws InvalidRequestException, NotFoundException,
            UnavailableException, TimedOutException, TException,
            UnsupportedEncodingException {
        String keyID = "java" + i;
        String llaveID = "Me gusta esto" + i;

        ColumnPath colPathName = new ColumnPath(PROYECTOS_FAMILY);
        colPathName.setColumn("NOMBRE".getBytes(UTF8));
        // Leemos una columna (clave, valor y timestamp)
        Column col = client.get(KEYSPACE, keyID, colPathName,
                ConsistencyLevel.ONE).getColumn();

        // System.out.println("--------- Leemos una columna ------");
        // System.out.println(new String(col.name, UTF8) + ":"
        // + new String(col.value, UTF8) + ":" + new Date(col.timestamp));

        // Leemos las filas completas...
        SlicePredicate predicate = new SlicePredicate();
        SliceRange sliceRange = new SliceRange();
        sliceRange.setStart(new byte[0]);
        sliceRange.setFinish(new byte[0]);
        sliceRange.setCount(50000);
        predicate.setSlice_range(sliceRange);

        // System.out.println("--------- Leemos todas las filas ------");
        ColumnParent parent = new ColumnParent(PROYECTOS_FAMILY);
        List<ColumnOrSuperColumn> results = client.get_slice(KEYSPACE, keyID,
                parent, predicate, ConsistencyLevel.ONE);
        // for (ColumnOrSuperColumn result : results) {
        // Column column = result.column;
        // System.out.print(new String(column.name, UTF8) + ":"
        // + new String(column.value, UTF8) + ":"
        // + new Date(column.timestamp)+" | ");
        // }
        // System.out.println("----------------------------");
        // Leemos las entradas
        //
        parent = new ColumnParent(TRABAJAN_FAMILY);

        results = client.get_slice(KEYSPACE, llaveID, parent, predicate,
                ConsistencyLevel.ONE);

        for (ColumnOrSuperColumn result : results) {
            SuperColumn superColumn = result.super_column;
            UUID uuid = new UUID(superColumn.name);
            // System.out.println(uuid.toString() + " [ ");
            for (Column column : superColumn.columns) {
                // System.out.print(new String(column.name, UTF8) + ":"
                // + new String(column.value, UTF8) + ":"
                // + new Date(column.timestamp)+" | ");
            }
            // System.out.println(" ] ");
        }
    }

    public static void lecturaParticular(int i) throws InvalidRequestException,
            NotFoundException, UnavailableException, TimedOutException,
            TException, UnsupportedEncodingException {
        long a = 0;
        TTransport tr = new TSocket("localhost", 9160);
        TProtocol proto = new TBinaryProtocol(tr);
        Cassandra.Client client = new Cassandra.Client(proto);
        tr.open();
        long inicio = f();
        String keyID = "java" + i;
        String llaveID = "Me gusta Cassandra" + i;

        ColumnPath colPathName = new ColumnPath(PROYECTOS_FAMILY);
        colPathName.setColumn("NOMBRE".getBytes(UTF8));
        // Leemos una columna (clave, valor y timestamp)
        Column col = client.get(KEYSPACE, keyID, colPathName,
                ConsistencyLevel.ALL).getColumn();

        // System.out.println("--------- Leemos una columna ------");
        // System.out.println(new String(col.name, UTF8) + ":"
        // + new String(col.value, UTF8) + ":" + new Date(col.timestamp));

        // Leemos las filas completas...
        SlicePredicate predicate = new SlicePredicate();
        SliceRange sliceRange = new SliceRange();
        sliceRange.setStart(new byte[0]);
        sliceRange.setFinish(new byte[0]);
        sliceRange.setCount(50000);
        predicate.setSlice_range(sliceRange);

        // System.out.println("--------- Leemos todas las filas ------");
        ColumnParent parent = new ColumnParent(PROYECTOS_FAMILY);
        List<ColumnOrSuperColumn> results = client.get_slice(KEYSPACE, keyID,
                parent, predicate, ConsistencyLevel.ALL);
        System.out.println("Total columnas: " + results.size());
        for (ColumnOrSuperColumn result : results) {
            Column column = result.column;
            // System.out.print(new String(column.name, UTF8) + ":"
            // + new String(column.value, UTF8) + ":"
            // + new Date(column.timestamp)+" | ");
        }
        // System.out.println("----------------------------");

        System.out.println("--------- ------");
        parent = new ColumnParent(TRABAJAN_FAMILY);

        results = client.get_slice(KEYSPACE, llaveID, parent, predicate,
                ConsistencyLevel.ALL);
        a = results.size();
        for (ColumnOrSuperColumn result : results) {
            SuperColumn superColumn = result.super_column;
            UUID uuid = new UUID(superColumn.name);
            // System.out.println(uuid.toString() + " [ ");
            for (Column column : superColumn.columns) {
                // System.out.print(new String(column.name, UTF8) + ":"
                // + new String(column.value, UTF8) + ":"
                // + new Date(column.timestamp)+" | ");
            }
            // System.out.println(" ] ");
        }

        tr.close();
        long fin = f();
        double total = (double) (fin - inicio) / 1000;
        System.out.println("Total encontrados: " + a);
        System.out.println("Total proceso: " + total);
    }

    private static long f() {

        return new Date().getTime();
    }
}



Ejemplo de codigo de Hector:

// nombre del cluster, ip de conexion puede ser cualquiera del nodo
        Cluster cluster = HFactory.getOrCreateCluster("Test Cluster", "192.168.254.66:9160");
        // nombre del keyspace
        Keyspace keyspace = HFactory.createKeyspace("CDT", cluster);

        Mutator mutator = HFactory.createMutator(keyspace);
        // inserta datos
        mutator.insert("jsmith", "Proyectos", HFactory.createStringColumn("first", "John"));
        mutator.insert("pedroloco", "Proyectos", HFactory.createStringColumn("first", "Pedro"));
        mutator.insert("juanmiguel", "Proyectos", HFactory.createStringColumn("first", "Juan"));

        // recupera un dato
        ColumnQuery<String, String> columnQuery = HFactory.createStringColumnQuery(keyspace);
        columnQuery.setColumnFamily("Proyectos").setKey("jsmith").setName("first");
        QueryResult<HColumn<String, String>> result = columnQuery.execute();

        System.out.println(result.get().getValue());
       
        //recupera varios datos
        MultigetSliceQuery<String, String> multigetSliceQuery = HFactory.createMultigetSliceQuery(keyspace, StringSerializer.get(), StringSerializer.get());
        multigetSliceQuery.setColumnFamily("Proyectos");
        multigetSliceQuery.setKeys("java1", "java100", "java999");
        multigetSliceQuery.setRange("", "", false, 3);
        QueryResult<Rows<String, String>> result2 = multigetSliceQuery.execute();

        System.out.println(result2.get().getCount());

        for (Row<String, String> r : result2.get()) {
            System.out.println(r.getColumnSlice().getColumnByName(r.getColumnSlice().getColumns().get(0).getName()).getValue());
Es mucho mas sencillo usar Hector como cliente que Thrift.
Publicar un comentario