refactor(queue): use test message wuth key prefix

This commit is contained in:
Ludovic DEHON
2025-11-23 23:12:25 +01:00
parent ac591956d6
commit 297c4aec4b
4 changed files with 75 additions and 42 deletions

View File

@@ -17,7 +17,7 @@ import java.util.stream.IntStream;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractBroadcastQueueTest {
public abstract class AbstractBroadcastQueueTest extends AbstractQueueTest {
private static final int DEFAULT_TIMEOUT_SECONDS = 10;
@Inject
@@ -35,9 +35,10 @@ public abstract class AbstractBroadcastQueueTest {
countDownLatch.countDown();
});
broadcastQueue.emit(new TestBroadcast(IdUtils.create(), 1));
broadcastQueue.emit(new TestBroadcast(IdUtils.create(), 2));
broadcastQueue.emit(new TestBroadcast(IdUtils.create(), 3));
String prefix = this.keyPrefix();
for (int i = 1; i <= 3; i++) {
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), i));
}
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.close();
@@ -47,6 +48,12 @@ public abstract class AbstractBroadcastQueueTest {
assertThat(list).contains(1, 2, 3);
}
@Test
void closingConsumer() throws QueueException, InterruptedException {
singleConsumer();
singleConsumer();
}
@Test
void multipleConsumer() throws QueueException, InterruptedException {
int rand = ThreadLocalRandom.current().nextInt(10, 50);;
@@ -64,12 +71,14 @@ public abstract class AbstractBroadcastQueueTest {
})
)));
broadcastQueue.emit(new TestBroadcast(IdUtils.create(), 1));
broadcastQueue.emit(new TestBroadcast(IdUtils.create(), 2));
broadcastQueue.emit(new TestBroadcast(IdUtils.create(), 3));
String prefix = this.keyPrefix();
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 1));
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 2));
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 3));
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscribers.forEach(QueueSubscriber::close);
// rebalancing can take some time, we multiply timeout by 5
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS * 5, TimeUnit.SECONDS);
subscribers.parallelStream().forEach(QueueSubscriber::close);
assertThat(await).isEqualTo(true);
assertThat(countDownLatch.getCount()).isEqualTo(0L);
@@ -98,8 +107,9 @@ public abstract class AbstractBroadcastQueueTest {
}
});
String prefix = this.keyPrefix();
// first round
broadcastQueue.emit(new TestBroadcast(IdUtils.create(), 1));
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 1));
boolean await1 = countDownLatchFirst.await(DEFAULT_TIMEOUT_SECONDS + 10, TimeUnit.SECONDS);
subscriber.pause();
@@ -109,8 +119,8 @@ public abstract class AbstractBroadcastQueueTest {
Instant resumeTime = Instant.now();
subscriber.resume();
broadcastQueue.emit(new TestBroadcast(IdUtils.create(), 2));
broadcastQueue.emit(new TestBroadcast(IdUtils.create(), 3));
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 2));
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 3));
boolean await2 = countDownLatchSecond.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.pause();
@@ -120,8 +130,8 @@ public abstract class AbstractBroadcastQueueTest {
Instant resumeTime2 = Instant.now();
subscriber.resume();
broadcastQueue.emit(new TestBroadcast(IdUtils.create(), 4));
broadcastQueue.emit(new TestBroadcast(IdUtils.create(), 5));
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 4));
broadcastQueue.emit(new TestBroadcast(prefix + "_" + IdUtils.create(), 5));
boolean await3 = countDownLatchOthers.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.close();

View File

@@ -22,18 +22,12 @@ import java.util.stream.IntStream;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractDispatchQueueTest {
public abstract class AbstractDispatchQueueTest extends AbstractQueueTest {
private static final int DEFAULT_TIMEOUT_SECONDS = 5;
@Inject
private DispatchQueueInterface<TestDispatch> dispatchQueue;
@Test
void closingConsumer() throws QueueException, InterruptedException, IOException {
singleConsumer();
singleConsumer();
}
@Test
void singleConsumer() throws QueueException, InterruptedException, IOException {
CountDownLatch countDownLatch = new CountDownLatch(2);
@@ -46,8 +40,9 @@ public abstract class AbstractDispatchQueueTest {
countDownLatch.countDown();
});
dispatchQueue.emit(new TestDispatch(IdUtils.create(), 1));
dispatchQueue.emit(new TestDispatch(IdUtils.create(), 2));
String prefix = this.keyPrefix();
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 1));
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 2));
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.close();
@@ -57,6 +52,12 @@ public abstract class AbstractDispatchQueueTest {
assertThat(list).containsExactlyInAnyOrder(1, 2);
}
@Test
void closingConsumer() throws QueueException, InterruptedException, IOException {
singleConsumer();
singleConsumer();
}
@Test
void multipleConsumer() throws QueueException, InterruptedException {
int rand = ThreadLocalRandom.current().nextInt(10, 50);;
@@ -75,12 +76,14 @@ public abstract class AbstractDispatchQueueTest {
})
)));
String prefix = this.keyPrefix();
for (int i = 0; i < rand; i++) {
dispatchQueue.emit(new TestDispatch(IdUtils.create(), i));
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), i));
}
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscribers.forEach(QueueSubscriber::close);
// rebalancing can take some time, we multiply timeout by 5
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS * 5, TimeUnit.SECONDS);
subscribers.parallelStream().forEach(QueueSubscriber::close);
assertThat(await).isEqualTo(true);
assertThat(countDownLatch.getCount()).isEqualTo(0L);
@@ -92,9 +95,15 @@ public abstract class AbstractDispatchQueueTest {
@Test
void errorProcessing() throws QueueException, InterruptedException {
// @TODO: failed on rabbitmq, the published message seems to be not durable
dispatchQueue.emit(List.of(new TestDispatch(IdUtils.create(), 1), new TestDispatch(IdUtils.create(), 2), new TestDispatch(IdUtils.create(), 3)));
String prefix = this.keyPrefix();
CountDownLatch countDownLatch = new CountDownLatch(4);
dispatchQueue.emit(IntStream.range(1, 15)
.boxed()
.map(i -> new TestDispatch(prefix + "_" + IdUtils.create(), i))
.toList()
);
CountDownLatch countDownLatch = new CountDownLatch(15);
Collection<Integer> list = Collections.synchronizedCollection(new ArrayList<>());
var crashed = new AtomicBoolean(false);
@@ -116,7 +125,7 @@ public abstract class AbstractDispatchQueueTest {
assertThat(await).isEqualTo(true);
assertThat(countDownLatch.getCount()).isEqualTo(0L);
assertThat(list).containsExactlyInAnyOrder(1, 2, 3);
assertThat(list).containsExactlyInAnyOrder(IntStream.range(1, 15).boxed().toArray(Integer[]::new));
}
@Test
@@ -140,7 +149,8 @@ public abstract class AbstractDispatchQueueTest {
});
// first round
dispatchQueue.emit(new TestDispatch(IdUtils.create(), 1));
String prefix = this.keyPrefix();
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 1));
boolean await1 = countDownLatchFirst.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.pause();
@@ -150,8 +160,8 @@ public abstract class AbstractDispatchQueueTest {
Instant resumeTime = Instant.now();
subscriber.resume();
dispatchQueue.emit(new TestDispatch(IdUtils.create(), 2));
dispatchQueue.emit(new TestDispatch(IdUtils.create(), 3));
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 2));
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 3));
boolean await2 = countDownLatchSecond.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.pause();
@@ -161,8 +171,8 @@ public abstract class AbstractDispatchQueueTest {
Instant resumeTime2 = Instant.now();
subscriber.resume();
dispatchQueue.emit(new TestDispatch(IdUtils.create(), 4));
dispatchQueue.emit(new TestDispatch(IdUtils.create(), 5));
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 4));
dispatchQueue.emit(new TestDispatch(prefix + "_" + IdUtils.create(), 5));
boolean await3 = countDownLatchOthers.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.close();

View File

@@ -15,7 +15,7 @@ import java.util.stream.IntStream;
import static io.kestra.core.utils.Rethrow.throwConsumer;
import static org.assertj.core.api.Assertions.assertThat;
public abstract class AbstractKeyedDispatchQueueTest {
public abstract class AbstractKeyedDispatchQueueTest extends AbstractQueueTest {
private static final int DEFAULT_TIMEOUT_SECONDS = 10;
@Inject
@@ -35,8 +35,9 @@ public abstract class AbstractKeyedDispatchQueueTest {
countDownLatch.countDown();
});
keyDispatchQueue.emit(groupKey, new TestKeyedDispatch(IdUtils.create(), 1));
keyDispatchQueue.emit(groupKey, new TestKeyedDispatch(IdUtils.create(), 2));
String prefix = this.keyPrefix();
keyDispatchQueue.emit(groupKey, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), 1));
keyDispatchQueue.emit(groupKey, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), 2));
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscriber.close();
@@ -65,12 +66,14 @@ public abstract class AbstractKeyedDispatchQueueTest {
})
)));
String prefix = this.keyPrefix();
for (int i = 0; i < rand; i++) {
keyDispatchQueue.emit(groupKey, new TestKeyedDispatch(IdUtils.create(), i));
keyDispatchQueue.emit(groupKey, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), i));
}
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
subscribers.forEach(QueueSubscriber::close);
// rebalancing can take some time, we multiply timeout by 5
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS * 5, TimeUnit.SECONDS);
subscribers.parallelStream().forEach(QueueSubscriber::close);
assertThat(await).isEqualTo(true);
assertThat(countDownLatch.getCount()).isEqualTo(0L);
@@ -98,9 +101,10 @@ public abstract class AbstractKeyedDispatchQueueTest {
}));
}));
String prefix = this.keyPrefix();
for (int i = 0; i < 3; i++) {
keyDispatchQueue.emit("group-" + i, new TestKeyedDispatch(IdUtils.create(), 1));
keyDispatchQueue.emit("group-" + i, new TestKeyedDispatch(IdUtils.create(), 2));
keyDispatchQueue.emit("group-" + i, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), 1));
keyDispatchQueue.emit("group-" + i, new TestKeyedDispatch(prefix + "_" + IdUtils.create(), 2));
}
boolean await = countDownLatch.await(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);

View File

@@ -0,0 +1,9 @@
package io.kestra.queue;
public class AbstractQueueTest {
protected String keyPrefix() {
StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace();
StackTraceElement e = stacktrace[2];
return e.getMethodName();
}
}