1
0
mirror of synced 2025-12-25 02:09:19 -05:00

hide queue inside MemoryBoundedLinkedBlockingQueue (#26375)

Hiding the actual java Queue has an inner class to avoid the chance that someone tries to use native queue methods that we haven't overridden. Good thing that I did this too, because one of the changes we made during hack dayz wasn't reflected in our current feature branch. We need to override poll(time, unit) not just poll. This PR makes sure we won't make that mistake again!
This commit is contained in:
Charles
2023-05-22 18:04:05 -07:00
committed by GitHub
parent 1ed32e55bc
commit 5f3ed16408
2 changed files with 93 additions and 62 deletions

View File

@@ -12,83 +12,126 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
/**
* This class is meant to emulate the behavior of a LinkedBlockingQueue, but instead of being
* bounded on number of items in the queue, it is bounded by the memory it is allowed to use. The
* amount of memory it is allowed to use can be resized after it is instantiated.
* <p>
* This class intentaionally hides the underlying queue inside of it. For this class to work, it has
* to override each method on a queue that adds or removes records from the queue. The Queue
* interface has a lot of methods to override, and we don't want to spend the time overriding a lot
* of methods that won't be used. By hiding the queue, we avoid someone accidentally using a queue
* method that has not been modified. If you need access to another of the queue methods, pattern
* match adding the memory tracking as seen in {@link HiddenQueue}, and then delegate to that method
* from this top-level class.
*
* @param <E> type in the queue
*/
@Slf4j
class MemoryBoundedLinkedBlockingQueue<E> extends LinkedBlockingQueue<MemoryBoundedLinkedBlockingQueue.MemoryItem<E>> {
class MemoryBoundedLinkedBlockingQueue<E> {
private final AtomicLong currentMemoryUsage;
private final AtomicLong maxMemoryUsage;
private final AtomicReference<Instant> timeOfLastMessage;
private final HiddenQueue<E> hiddenQueue;
public MemoryBoundedLinkedBlockingQueue(final long maxMemoryUsage) {
currentMemoryUsage = new AtomicLong(0);
this.maxMemoryUsage = new AtomicLong(maxMemoryUsage);
timeOfLastMessage = new AtomicReference(null);
hiddenQueue = new HiddenQueue<>(maxMemoryUsage);
}
public long getCurrentMemoryUsage() {
return currentMemoryUsage.get();
}
public long getMaxMemoryUsage() {
return maxMemoryUsage.get();
return hiddenQueue.currentMemoryUsage.get();
}
public void addMaxMemory(final long maxMemoryUsage) {
this.maxMemoryUsage.addAndGet(maxMemoryUsage);
this.hiddenQueue.maxMemoryUsage.addAndGet(maxMemoryUsage);
}
public Optional<Instant> getTimeOfLastMessage() {
return Optional.ofNullable(timeOfLastMessage.get());
return Optional.ofNullable(hiddenQueue.timeOfLastMessage.get());
}
public int size() {
return hiddenQueue.size();
}
public boolean offer(final E e, final long itemSizeInBytes) {
final long newMemoryUsage = currentMemoryUsage.addAndGet(itemSizeInBytes);
if (newMemoryUsage <= maxMemoryUsage.get()) {
final boolean success = super.offer(new MemoryItem<>(e, itemSizeInBytes));
if (!success) {
currentMemoryUsage.addAndGet(-itemSizeInBytes);
} else {
// it succeeded!
timeOfLastMessage.set(Instant.now());
}
log.debug("offer status: {}", success);
return success;
} else {
currentMemoryUsage.addAndGet(-itemSizeInBytes);
log.debug("offer failed");
return false;
}
return hiddenQueue.offer(e, itemSizeInBytes);
}
@Override
public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> take() throws InterruptedException {
final MemoryItem<E> memoryItem = super.take();
if (memoryItem != null) {
currentMemoryUsage.addAndGet(-memoryItem.size());
return memoryItem;
}
return null;
return hiddenQueue.take();
}
@Override
public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> poll() {
final MemoryItem<E> memoryItem = super.poll();
if (memoryItem != null) {
currentMemoryUsage.addAndGet(-memoryItem.size());
return memoryItem;
}
return null;
return hiddenQueue.poll();
}
@Override
public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> poll(final long timeout, final TimeUnit unit) throws InterruptedException {
final MemoryItem<E> memoryItem = super.poll(timeout, unit);
if (memoryItem != null) {
return hiddenQueue.poll(timeout, unit);
}
/**
* Extends LinkedBlockingQueue so that we can get a LinkedBlockingQueue bounded by memory. Hidden as
* an inner class, so it doesn't get misused, see top-level javadoc comment.
*
* @param <E>
*/
private static class HiddenQueue<E> extends LinkedBlockingQueue<MemoryBoundedLinkedBlockingQueue.MemoryItem<E>> {
private final AtomicLong currentMemoryUsage;
private final AtomicLong maxMemoryUsage;
private final AtomicReference<Instant> timeOfLastMessage;
public HiddenQueue(final long maxMemoryUsage) {
currentMemoryUsage = new AtomicLong(0);
this.maxMemoryUsage = new AtomicLong(maxMemoryUsage);
timeOfLastMessage = new AtomicReference<>(null);
}
public boolean offer(final E e, final long itemSizeInBytes) {
final long newMemoryUsage = currentMemoryUsage.addAndGet(itemSizeInBytes);
if (newMemoryUsage <= maxMemoryUsage.get()) {
final boolean success = super.offer(new MemoryItem<>(e, itemSizeInBytes));
if (!success) {
currentMemoryUsage.addAndGet(-itemSizeInBytes);
} else {
// it succeeded!
timeOfLastMessage.set(Instant.now());
}
log.debug("offer status: {}", success);
return success;
} else {
currentMemoryUsage.addAndGet(-itemSizeInBytes);
log.debug("offer failed");
return false;
}
}
@Override
public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> take() throws InterruptedException {
final MemoryItem<E> memoryItem = super.take();
currentMemoryUsage.addAndGet(-memoryItem.size());
return memoryItem;
}
return null;
@Override
public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> poll() {
final MemoryItem<E> memoryItem = super.poll();
if (memoryItem != null) {
currentMemoryUsage.addAndGet(-memoryItem.size());
return memoryItem;
}
return null;
}
@Override
public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> poll(final long timeout, final TimeUnit unit) throws InterruptedException {
final MemoryItem<E> memoryItem = super.poll(timeout, unit);
if (memoryItem != null) {
currentMemoryUsage.addAndGet(-memoryItem.size());
return memoryItem;
}
return null;
}
}
public record MemoryItem<E> (E item, long size) {}

View File

@@ -21,23 +21,11 @@ public class MemoryBoundedLinkedBlockingQueueTest {
queue.offer("abc", 6);
var item = queue.take();
final var item = queue.take();
assertEquals("abc", item.item());
}
@Test
void offerAndToStreamShouldReturn() throws InterruptedException {
final MemoryBoundedLinkedBlockingQueue<String> queue = new MemoryBoundedLinkedBlockingQueue<>(1024);
queue.offer("abc", 6);
queue.offer("DEF", 6);
System.out.println(queue.size());
queue.stream().forEach(stringMemoryItem -> System.out.println(stringMemoryItem.item()));
System.out.println(queue.size());
}
@Test
void test() throws InterruptedException {
final MemoryBoundedLinkedBlockingQueue<String> queue = new MemoryBoundedLinkedBlockingQueue<>(1024);