Saturday, September 6, 2014

HowTo:: Threading: Producer Consumer with thread-safe PriorityBlockingQueue as buffer with custom ordering


Img.1.:: Producer Consumer Problem examples (powered by IntelliJ Idea 13.x)

  In the previous blog post I've blogged about commonly issue in multi-threading Producer-Consumer problem with shared not synchronised buffer. Buffer also showed how to employ some conditions to control the access to it. (link)
  This ConsumerProducer example is more sophisticated. In this example is used MathPriorityTransferQueue which extends PriorityBlockingQueue and implements TransferQueue to employ better control over the exchange process itself. 

// methods to @Override from interface TransferQueue< e >
boolean tryTransfer(E e){...
boolean tryTransfer(E e, long timeout, TimeUnit unit){...
void transfer(E e) throws InterruptedException {...
boolean hasWaitingConsumer(){...
int getWaitingConsumerCount(){..

// methods to @Override from class PriorityBlockingQueue
public E take() throws InterruptedException{...
public E peek(){...
  The Distribution Systems, Parallel computing, machine learning or artificial intelligence are always excited, this is only small introduction to some small part that is worthy to know to make them possible,  so let's  start slowly with the example. 

   The heart of the example is the implementation of MathPriorityTransferQueue which is the buffer used by MathProducers and MathConsumers. The buffer allows them to work together. The basic idea of the example is that each PRODUCER generates NUMBER_PRODUCER_EVENTs that number  of consumers (NUMBER_CONSUMER) process. The total number of MathEvents that CONSUMERs needs to consume is then number of producers multiplied by the number of MathEvents they generates. 
Each MathEvent has assigned the random priority. The Priority is the key how the event are sorted inside MathPriorityTransferQueue.
The main class MathPrioProdConMain takes responsibility about the process of Producers creation and number of consumers. Important is to point out that Consumers will work only if something is available to consume inside the queue (simply when data are available). 
public class MathPrioProdConMain {
    ...
    private static final int NUMBER_CONSUMER = 2;
    private static final int NUMBER_PRODUCERS= 10;
    private static final int NUMBER_PRODUCER_EVENTS= 1000;

    public static void main(String... args){
        MathPriorityTransferQueue< MathEvent > buffer = new MathPriorityTransferQueue<>();
        ...
        Thread[] producerThread = new Thread[ NUMBER_PRODUCERS ];
        for(int i=0; i < NUMBER_PRODUCERS; i++){
            producerThread[i] = new Thread(new MathProducer(NUMBER_PRODUCER_EVENTS, buffer));
            producerThread[i].start();
        }
        ...
        Thread[] consumerThread = new Thread[ NUMBER_CONSUMER ];
        for(int i=0; i < NUMBER_CONSUMER; i++){
            consumerThread[i] = new Thread(new MathConsumer(buffer));
            consumerThread[i].start();
        }
        ...
    }
}
  To test the ordering abilities of the shared data-structure the main method sends unexpected MathEvents into the buffer with defined priority. On the output shows that the ordering is implemented correctly.
...
MathEvent event = new MathEvent(" Transfer Math SuperEvent ", 22);
buffer.transfer( event );
...
  At the end we wait until all consumer processes are done (all threads simply die). 

...
for(int i=0; i < NUMBER_CONSUMER ; i++)
                consumerThread[ i ].join();
...>
  We may keep in mind that the whole multi-threading process  works like state machine with its locking, un-locking, notifying and operations synchronisation which helps to understand why the rest of the code works.  


  Before we move into the code of MathPriorityTransferQueue we take a brief look what individual MathProducer is doing: 
public class MathProducer implements Runnable{
    ...
    public MathProducer(int maxMathEvents, MathPriorityTransferQueue< MathEvent > buffer){
        this.maxMathEvents = maxMathEvents;
        this.buffer = buffer;
    }
    ...
    @Override
    public void run() {
        for(int i=0; i< maxMathEvents; i++){
            Random random=new Random();
            int priority= random.nextInt(100);
            MathEvent event = new MathEvent("MathProducer-" + Thread.currentThread().getName(), priority);
            buffer.put(event);
        }
    }

}
 MathProducer responsibility is to generate some MathEvents and give them random priority. 

   The same action we do with consumer. The individual MathConsumer  consumes all new MathEvent that are available to him in the queue (queue is responsible for MathEvents prioritising). 
public class MathConsumer implements Runnable {
    ...
    public MathConsumer(MathPriorityTransferQueue< MathEvent > buffer) {
        this.buffer = buffer;
    }
    ...
    @Override
    public void run() {
        while(buffer.peek() != null){
            try{
                MathEvent event = buffer.take();
                logger.debug("MathConsumer-" + Thread.currentThread().getName() +
                        " : " + event.getThread() + " priority: " + event.getPriority());
            } catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}
  Now all parts participating on the example are defined and we can move to the implementation of the example heart MathPriorityTransferQueue . Here is worthy to point out atomic counter implementation to avoid possible race-condition and that the counter shouldn't be implemented by using volatile. Volatile quarantines only happens-before (write-read) but not atomicity at all. 
public class MathPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E> {
    ...
    private AtomicInteger counter;
    private LinkedBlockingQueue<E> transferList;
    private ReentrantLock lock;
    ...
    public MathPriorityTransferQueue(){
        counter = new AtomicInteger(0);
        lock = new ReentrantLock();
        transferList = new LinkedBlockingQueue<>();
    }
...
  The counter is used to control important operations that are provided by our DataStructure
1. take() operation to get an MathEvent from the data structure (increment counter) and when operation is finished than decrement the counter.
...
@Override
public E take() throws InterruptedException {
   lock.lock();
   counter.incrementAndGet();
   E result = transferList.poll();

   if(result == null){
     lock.unlock();
     result = super.take();
     lock.lock();
   } else {
      synchronized (result){
        result.notifyAll();
      }
   }
   counter.decrementAndGet();
   lock.unlock();
   return result;
}
...
2. peek() operation to retrieve but don't remove head of the queue if the queue is empty then returns null. Here we need to take care also about the transferList that represents Blocking queue to store accidentally transfered MathEvents
@Override
public E peek() {
   lock.lock();
   E eventMain = super.peek();
   E eventTrans = transferList.peek();
   E result = eventMain != null ? eventMain : eventTrans;
   lock.unlock();
   return result;
}
3. tryTransfer(E e) which tries to transfer MathEvent to the consumer if any is waiting for.
...
@Override
public void transfer(E event) throws InterruptedException{
   lock.lock();
   if(counter.get() != 0){
      put(event);
      lock.unlock();
   } else {
      transferList.add(event);
      lock.unlock();
      synchronized (event){
         event.wait();
      }
   }
}
...
4. tryTransfer(E event, long timeout, TimeUnit unit) throws InterruptedException method tries to provide MathEvent to the Consumer who is waiting maximum time. If not successful then transferList for someTimeout or thread is interrupted. 
...
@Override
public boolean tryTransfer(E event, long timeout, TimeUnit unit) throws InterruptedException {
   lock.lock();
   if(counter.get() != 0){
      put(event);
      lock.unlock();
      return true;
   } else {
       transferList.add(event);
       long someTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
       lock.unlock();
       event.wait(someTimeout);
       lock.lock();
       if(transferList.contains(event)){
           transferList.remove(event);
           lock.unlock();
           return false;
       }else{
           lock.unlock();
           return true;
       }
    }
}
...
5. transfer(E e) throws InterruptedException method transfer successfully when Consumer is waiting for MathEvent if not it it put MathEvent into the transferList and waits until MathEvent is taken out. 
...
@Override
public void transfer(E event) throws InterruptedException{
   lock.lock();
   if(counter.get() != 0){
       put(event);
       lock.unlock();
   } else {
       transferList.add(event);
       lock.unlock();
       synchronized (event){
          event.wait();
       }
   }
}
...
  
and some more as you can find in the example source code but they are not critically important to make this example work (hasWaitingConsumergetWaitingConsumerCount). 
   As DataStructure is implemented we need to implement very important MathEvent method compareTo(MathEvent event) as MathEvent implements Comparable<MathEvent> interface.  
 In the example definition data structure orders elements based on their priority.  
public class MathEvent implements Comparable<MathEvent>{
    ...
    @Override
    public int compareTo(MathEvent event) {
        if (this.priority > event.getPriority()) {
            return -1;
        } else if (this.priority < event.getPriority()) {
            return 1;
        } else {
            return 0;
        }
    }
    ...
}
   Having all those steps done we can run successfully MathPrioProdConMain class. and we get following result (of course based on our setup of CONSUMERS and PRODUCERS with MathEvents )
...
MathConsumer-Thread-6 : MathProducer-Thread-4 priority: 71
MathConsumer-Thread-6 : MathProducer-Thread-2 priority: 71
MathPrioProdConMain: Buffer waiting consumers = 0
MathConsumer-Thread-6 : MathProducer-Thread-0 priority: 71
MathConsumer-Thread-6 : Transfer Math Calculation Event  priority: 1
MathConsumer-Thread-6 : MathProducer-Thread-0 priority: 70
MathConsumer-Thread-6 : MathProducer-Thread-0 priority: 70
...

   More comments can be found inside the source code that is available over my github account (example source code)
Enjoy Multi-threading !

No comments: