Java Queue y BlockingQueue

De ChuWiki


¿Qué es una Queue en Java?[editar]

Prácticamente el 100% de los programas recogen o reciben datos de algún sitio, los prcesan de alguna manera y generan unos resultdos. Sin embargo, puede haber algunos problemas con esto. Es posible que los datos lleguen a un ritmo determinado, quizás variable, más rápido a veces, más lento en otros momentos. Y es posible que el tiempo necesario para procesarlos y generar resultados también sea variable, según el dato, quizás más rápido que lo que tardan los datos en llegar, quizás más lento.

Cuando se dan este tipo de casos, velocidades distintas e incluso variables en la recogida de datos y el proceso de los mismos, suele ser normal hacer dos hilos. Uno recoge los datos y los mete en una cola en espera de ser procesados. Otro los va retirando de la cola y procesando. De esta forma, podemos recoger datos muy rápido puesto que meterlos en una cola no suele ser un proceso costoso. Y podemos permitirnos procesar datos un poco más lento sin perder nada, ya que tendremos los datos en una cola en espera de ser preocesados en el orden que han llegado.

Una Queue es una cola donde un productor mete datos y un consumidor los va retirando.

Tipos de Queue en Java[editar]

Queue de capacidad limitada/ilimitada[editar]

Según la capacidad de datos que tenga una cola, puede ser ilimitada o limitada. En una cola ilimitada podemos encolar todos los datos que queramos sin más limite que la memoria de nuestro ordenador. En una cola de capacidad limitada indicamos el número máximo de datos que caben en la cola. Esta no permite que metamos más datos que los que indique su capcidad.

Queue vs BlockingQueue[editar]

Y según la forma de meter datos o retirarlos, la cola puede ser bloqueante o no. En una cola bloqueante:

  • Si la cola está llena según su capacidad, el proceso que intenta meter un dato más se queda bloqueado hasta que haya hueco.
  • Si la cola está vacía, el proceso que intenta leer un dato se queda bloqueado hasta que haya algún dato disponible.

En java, las colas bloqueantes implementan la interface BlockingQueue. Si es no bloqueante

  • Si la cola está llena según su capacidad, el proceso que intenta meter el dato no lo consigue, pero la llamada vuelve inmediatamente.
  • Si la cola está vacía, el proceso que intenta leer no recoge ningún dato, pero no se queda bloqueado.

En java, las colas no bloqueantes implementan la interface Queue. En cualquier caso, una cola bloqueante tiene también los métodos de una cola no bloqueante, por lo que podríamos usarla sin bloqueo.

Métodos de Queue y BlockingQueue[editar]

La siguiente tabla muestra los métodos habituales para tratar tanto con colas como con colas bloqueantes y cómo indican que la cola está llena al añadir un elemento o que está vacía al intentar recogerlo

No Bloqueante Bloqueante
Lanza una excepción Devuelve null, true/false Se bloquea Se bloquea durante un tiempo
Añadir a la cola add(e) offer(e) put(e) offer(e, time, unit)
Coger de la cola remove() poll() take() poll(time, unit)
Examinar la cola element() peek()

Ejemplo Java de Queue y BlockingQueue[editar]

Vamos a ver ejemplos tanto de colas bloqueantes como no bloqueantes con todos estos métodos. Tiene el código completo en QueueExample.java Lo primero es, por supuesto, instanciar una cola, bloqueante o no. Java nos ofrece varias implementaciones, con sus características cada una. Una de las más habituales es la siguiente

Queue<Integer> queue = new LinkedBlockingQueue<>(10);
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(10);

LinkedBlockingQueue implementa tanto la interface Queue como BlockingQueue, así que podemos usar cualquiera de las dos líneas de código anteriores según nos interese.

Tienen un constructor sin parámetros, haciendo que la cola sea ilimitada. Admite también un parámetro, que sería la capacidad máxima de la cola y que no se puede cambiar posteriormente. Aquí le ponemos una capacidad de 10, para que en nuestros ejemplos podamos ver cual es el efecto de intentar meter un elemento en la cola cuando está llena.

El tipo de dato que vamos a meter es Integer, pero puede ser cualquier clase Java que necesites como dato para la cola.

Métodos no bloqueantes de Queue[editar]

Vamos con los métodos no bloqueantes

Queue add y remove[editar]

add() y remove() son los métodos normales de una Collection de Java y se comportan de la misma forma. El siguiente código añade elementos a la cola

for (int i = 0; i < 20; i++) {
    try {
        queue.add(i);
    } catch (IllegalStateException e){
        e.printStackTrace();
    }
}

Hemos hecho un bucle de 20 elementos y los vamos añadiendo a la cola. Con los 10 primeros no tendremos problemas, se irán añadiendo. Como la capacidad es de 10, cuando intentemos añadir el elemento número 11 y siguientes nos saltará una excepción IllegalStateException indicando que la cola está llena.

El siguiente código nos sirve para leer elementos de la cola

for (int i = 0; i < 20; i++) {
    try {
       Integer value = queue.remove();
       ...
    } catch (NoSuchElementException e){
       e.printStackTrace();
    }
}

Si hemos ejecutado el código de inserción antes de este, habrá 10 elementos en la cola. La lectura de esos 10 elementos no nos dará problemas. remove() devuelve el elemento de la cola que toque y lo retira de la misma. Una vez vacía la cola después de retirar los 10 elementos, nos saltará una excepción NoSuchElementException cada vez que intentemos retirar un elemento más.

Queue offer y poll[editar]

El siguiente código añade elementos a la cola usando el método offer()

for (int i = 0; i < 20; i++) {
    if (queue.offer(i)) {
        System.out.printf("offer(%d) -> %s\n", i, "Exito");
    } else {
        System.out.println("No se ha podido añadir "+i);
    }
}

Nuevamente, los 10 primeros elementos que añadamos no nos darán problemas. offer() devuelve true si el elemento se ha podido añadir a la cola y false si no se ha podido añadir porque está llena.

El siguiente código intenta leer 20 elementos de la cola.

for (int i = 0; i < 20; i++) {
    Integer value = queue.poll();
    if (null!=value) {
        System.out.printf("poll() -> %d\n", value);
    } else {
        System.out.println("No se ha podido recuperar "+i);
    }
}

poll() nos devuelve el elemento de la cola que ha conseguido retirar o null si la cola está vacía. Podremos retirar 10 elementos y a partir de ahi obtendremos null

Métodos bloqueantes de BlockingQueue[editar]

Vamos ahora con los bloqueantes de BlockingQueue

BlockingQueue put y take[editar]

El siguiente código intenta añadir 20 elementos en una cola con capacidad de 10.

for (int i = 0; i < 20; i++) {
    try {
        blockingQueque.put(i);
        System.out.println("Puesto " + i);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

La llamada a put() se quedará bloqueada cuando intente meter el elemento número 11 y seguirá bloqueada indefinidamente hasta que alguien retire un elemento de la cola dejando un hueco para este nuevo elemento. Si alguien interrumpe el hilo donde se está ejecutando este código y que está bloqueado en put(), este método terminará con una excepción InterruptedException.

El siguente código intenta retirar 20 elementos de la cola

for (int i = 0; i < 20; i++) {
    try {
        Thread.sleep(100);
        System.out.println("Leido " + blockingQueque.take());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Si hay 10 elementos en la cola, podremos retirar los 10 primeros sin problemas, pero take() se quedará bloqueado cuando intentemos retirar el número 11. Se quedará indefinidamente bloqueado hasta que alguien meta un nuevo dato en la cola o hasta que alguien interrumpa el hilo donde corre este código. En este últimom caso, take() terminará con una excepción InterruptedException

BlockingQueue offer y poll con Timeout[editar]

Los métodos offer()y poll() que son no bloqueantes tienen una opción con parámetros adicionales en los que se le pasa un timeout. En este caso, si ponemos estos parámetros de timeout, sí se hacen bloqueantes. Se quedan bloqueados hasta que puedan añadir o retirar un elemento de la cola o hasta que pase el tiemout.

El siguiente código intenta añadir 20 elementos a una cola con capacidad de 10

for (int i = 0; i < 20; i++) {
    try {
        final boolean success = blockingQueque.offer(i, 10, TimeUnit.MILLISECONDS);
        if(success) {
            System.out.println("Puesto " + i);
        } else {
            System.out.println("Tiro "+ i);
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

En offer(), además del valor i que queremos meter en la cola, pasamos dos parámetros más. Uno indica el timeout (10 en este caso) y el otro las unidades de dicho valor (milisegundos en este caso). La llamada meterá el elemento en la cola y si la cola está llena, esperará hasta 10 milisegundos a ver si alguien retira algo de la cola y hace un hueco. Al igual que vimos antes, offer() devuelve true si ha conseguido meter el elemento en la cola y false en caso contrario.

El siguiente código intenta leer 20 elementos de una cola que solo tienen 10

for (int i = 0; i < 30; i++) {
    try {
        Integer value =  blockingQueque.poll(100, TimeUnit.MILLISECONDS);
        if (null!=value) {
            System.out.println("Leido " + blockingQueque.poll(100, TimeUnit.MILLISECONDS));
        } else {
            System.out.println("No leo "+i);
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Como en el caso anterior, en poll() hemos puesto dos parámetro para el timeout. 100 milisegundos en este ejemplo. Retiraremos los 10 primeros elementos sin problemas. Para el elemento 11 y siguientes, la llamada se quedará bloqueada un máximo de 100 milisegundos en espera de que alguien meta nuevos datos en la cola.

Como en el ejemplo de poll() no bloqueante, el método devolverá el valor retirado de la cola si lo ha podido retirar o null, una vez pasado el timeout, en caso de que no haya podido retirar nada.

Si la llamada está bloqueada en poll() y alguien interrumpe el hilo, el método terminará con una excepción InterruptedException.