From 471d2c0b5d671d0caf98e84a88673de7b5778801 Mon Sep 17 00:00:00 2001 From: Narek Karapetian Date: Thu, 6 Oct 2022 12:51:10 +0400 Subject: [PATCH] Fix CircularBuffer and add unit tests (#3411) --- .../buffers/CircularBuffer.java | 147 +++++------------- .../buffers/CircularBufferTest.java | 127 +++++++++++++++ 2 files changed, 165 insertions(+), 109 deletions(-) create mode 100644 src/test/java/com/thealgorithms/datastructures/buffers/CircularBufferTest.java diff --git a/src/main/java/com/thealgorithms/datastructures/buffers/CircularBuffer.java b/src/main/java/com/thealgorithms/datastructures/buffers/CircularBuffer.java index 6a6dcd8fe..96c72fc04 100644 --- a/src/main/java/com/thealgorithms/datastructures/buffers/CircularBuffer.java +++ b/src/main/java/com/thealgorithms/datastructures/buffers/CircularBuffer.java @@ -1,132 +1,61 @@ package com.thealgorithms.datastructures.buffers; -import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -public class CircularBuffer { +public class CircularBuffer { + private final Item[] buffer; + private final CircularPointer putPointer; + private final CircularPointer getPointer; + private final AtomicInteger size = new AtomicInteger(0); - private char[] _buffer; - public final int _buffer_size; - private int _write_index = 0; - private int _read_index = 0; - private AtomicInteger _readable_data = new AtomicInteger(0); - - public CircularBuffer(int buffer_size) { - if (!IsPowerOfTwo(buffer_size)) { - throw new IllegalArgumentException(); - } - this._buffer_size = buffer_size; - _buffer = new char[buffer_size]; + public CircularBuffer(int size) { + //noinspection unchecked + this.buffer = (Item[]) new Object[size]; + this.putPointer = new CircularPointer(0, size); + this.getPointer = new CircularPointer(0, size); } - private boolean IsPowerOfTwo(int i) { - return (i & (i - 1)) == 0; + public boolean isEmpty() { + return size.get() == 0; } - private int getTrueIndex(int i) { - return i % _buffer_size; + public boolean isFull() { + return size.get() == buffer.length; } - public Character readOutChar() { - Character result = null; + public Item get() { + if (isEmpty()) + return null; - // if we have data to read - if (_readable_data.get() > 0) { - result = Character.valueOf(_buffer[getTrueIndex(_read_index)]); - _readable_data.decrementAndGet(); - _read_index++; - } - - return result; + Item item = buffer[getPointer.getAndIncrement()]; + size.decrementAndGet(); + return item; } - public boolean writeToCharBuffer(char c) { - boolean result = false; + public boolean put(Item item) { + if (isFull()) + return false; - // if we can write to the buffer - if (_readable_data.get() < _buffer_size) { - // write to buffer - _buffer[getTrueIndex(_write_index)] = c; - _readable_data.incrementAndGet(); - _write_index++; - result = true; - } - - return result; + buffer[putPointer.getAndIncrement()] = item; + size.incrementAndGet(); + return true; } - private static class TestWriteWorker implements Runnable { + private static class CircularPointer { + private int pointer; + private final int max; - String _alphabet = "abcdefghijklmnopqrstuvwxyz0123456789"; - Random _random = new Random(); - CircularBuffer _buffer; - - public TestWriteWorker(CircularBuffer cb) { - this._buffer = cb; + public CircularPointer(int pointer, int max) { + this.pointer = pointer; + this.max = max; } - private char getRandomChar() { - return _alphabet.charAt(_random.nextInt(_alphabet.length())); + public int getAndIncrement() { + if (pointer == max) + pointer = 0; + int tmp = pointer; + pointer++; + return tmp; } - - public void run() { - while (!Thread.interrupted()) { - if (!_buffer.writeToCharBuffer(getRandomChar())) { - Thread.yield(); - try { - Thread.sleep(10); - } catch (InterruptedException e) { - return; - } - } - } - } - } - - private static class TestReadWorker implements Runnable { - - CircularBuffer _buffer; - - public TestReadWorker(CircularBuffer cb) { - this._buffer = cb; - } - - @Override - public void run() { - System.out.println("Printing Buffer:"); - while (!Thread.interrupted()) { - Character c = _buffer.readOutChar(); - if (c != null) { - System.out.print(c.charValue()); - } else { - Thread.yield(); - try { - Thread.sleep(10); - } catch (InterruptedException e) { - System.out.println(); - return; - } - } - } - } - } - - public static void main(String[] args) throws InterruptedException { - int buffer_size = 1024; - // create circular buffer - CircularBuffer cb = new CircularBuffer(buffer_size); - - // create threads that read and write the buffer. - Thread write_thread = new Thread(new TestWriteWorker(cb)); - Thread read_thread = new Thread(new TestReadWorker(cb)); - read_thread.start(); - write_thread.start(); - - // wait some amount of time - Thread.sleep(10000); - - // interrupt threads and exit - write_thread.interrupt(); - read_thread.interrupt(); } } diff --git a/src/test/java/com/thealgorithms/datastructures/buffers/CircularBufferTest.java b/src/test/java/com/thealgorithms/datastructures/buffers/CircularBufferTest.java new file mode 100644 index 000000000..70fb6608e --- /dev/null +++ b/src/test/java/com/thealgorithms/datastructures/buffers/CircularBufferTest.java @@ -0,0 +1,127 @@ +package com.thealgorithms.datastructures.buffers; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicIntegerArray; + +import static org.junit.jupiter.api.Assertions.*; + +class CircularBufferTest { + private static final int BUFFER_SIZE = 10; + private CircularBuffer buffer; + + @BeforeEach + void setUp() { + buffer = new CircularBuffer<>(BUFFER_SIZE); + } + + @Test + void isEmpty() { + assertTrue(buffer.isEmpty()); + buffer.put(generateInt()); + assertFalse(buffer.isEmpty()); + } + + @Test + void isFull() { + assertFalse(buffer.isFull()); + buffer.put(generateInt()); + assertFalse(buffer.isFull()); + + for (int i = 1; i < BUFFER_SIZE; i++) + buffer.put(generateInt()); + assertTrue(buffer.isFull()); + } + + @Test + void get() { + assertNull(buffer.get()); + for (int i = 0; i < 100; i++) + buffer.put(i); + for (int i = 0; i < BUFFER_SIZE; i++) + assertEquals(i, buffer.get()); + assertNull(buffer.get()); + } + + @Test + void put() { + for (int i = 0; i < BUFFER_SIZE; i++) + assertTrue(buffer.put(generateInt())); + assertFalse(buffer.put(generateInt())); + } + + @RepeatedTest(1000) + void concurrentTest() throws InterruptedException { + final int numberOfThreadsForProducers = 3; + final int numberOfThreadsForConsumers = 2; + final int numberOfItems = 300; + final CountDownLatch producerCountDownLatch = new CountDownLatch(numberOfItems); + final CountDownLatch consumerCountDownLatch = new CountDownLatch(numberOfItems); + final AtomicIntegerArray resultAtomicArray = new AtomicIntegerArray(numberOfItems); + + // We are running 2 ExecutorService simultaneously 1 - producer, 2 - consumer + // Run producer threads to populate buffer. + ExecutorService putExecutors = Executors.newFixedThreadPool(numberOfThreadsForProducers); + putExecutors.execute(() -> { + while (producerCountDownLatch.getCount() > 0) { + int count = (int) producerCountDownLatch.getCount(); + boolean put = buffer.put(count); + while (!put) put = buffer.put(count); + producerCountDownLatch.countDown(); + } + }); + + // Run consumer threads to retrieve the data from buffer. + ExecutorService getExecutors = Executors.newFixedThreadPool(numberOfThreadsForConsumers); + getExecutors.execute(() -> { + while (consumerCountDownLatch.getCount() > 0) { + int count = (int) consumerCountDownLatch.getCount(); + Integer item = buffer.get(); + while (item == null) item = buffer.get(); + resultAtomicArray.set(count - 1, item); + consumerCountDownLatch.countDown(); + } + }); + + producerCountDownLatch.await(); + consumerCountDownLatch.await(); + putExecutors.shutdown(); + getExecutors.shutdown(); + shutDownExecutorSafely(putExecutors); + shutDownExecutorSafely(getExecutors); + + List resultArray = getSortedListFrom(resultAtomicArray); + for (int i = 0; i < numberOfItems; i++) { + int expectedItem = i + 1; + assertEquals(expectedItem, resultArray.get(i)); + } + } + + private int generateInt() { + return ThreadLocalRandom.current().nextInt(0, 100); + } + + private void shutDownExecutorSafely(ExecutorService executorService) { + try { + if (!executorService.awaitTermination(1_000, TimeUnit.MILLISECONDS)) + executorService.shutdownNow(); + } catch (InterruptedException e) { + executorService.shutdownNow(); + } + } + + public List getSortedListFrom(AtomicIntegerArray atomicArray) { + int length = atomicArray.length(); + ArrayList result = new ArrayList<>(length); + for (int i = 0; i < length; i++) + result.add(atomicArray.get(i)); + result.sort(Comparator.comparingInt(o -> o)); + return result; + } +}