import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
}
}
- @Override
- public void receiveMessages(final ReceiveMessageHandler handler) throws IOException {
- addReceiveHandler(handler);
- try {
- synchronized (this) {
- this.wait();
- }
- } catch (InterruptedException ignored) {
- }
- removeReceiveHandler(handler);
- }
-
@Override
public void receiveMessages(
- final Duration timeout, final ReceiveMessageHandler handler
+ Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler
) throws IOException {
+ final var remainingMessages = new AtomicInteger(maxMessages.orElse(-1));
final var lastMessage = new AtomicLong(System.currentTimeMillis());
+ final var thread = Thread.currentThread();
final ReceiveMessageHandler receiveHandler = (envelope, e) -> {
lastMessage.set(System.currentTimeMillis());
handler.handleMessage(envelope, e);
+ if (remainingMessages.get() > 0) {
+ if (remainingMessages.decrementAndGet() <= 0) {
+ remainingMessages.set(0);
+ thread.interrupt();
+ }
+ }
};
addReceiveHandler(receiveHandler);
- while (true) {
+ if (timeout.isPresent()) {
+ while (remainingMessages.get() != 0) {
+ try {
+ final var passedTime = System.currentTimeMillis() - lastMessage.get();
+ final var sleepTimeRemaining = timeout.get().toMillis() - passedTime;
+ if (sleepTimeRemaining < 0) {
+ break;
+ }
+ Thread.sleep(sleepTimeRemaining);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ } else {
try {
- final var sleepTimeRemaining = timeout.toMillis() - (System.currentTimeMillis() - lastMessage.get());
- if (sleepTimeRemaining < 0) {
- break;
+ synchronized (this) {
+ this.wait();
}
- Thread.sleep(sleepTimeRemaining);
} catch (InterruptedException ignored) {
}
}
+
removeReceiveHandler(receiveHandler);
}
}).toList();
}
+ @Override
+ public InputStream retrieveAttachment(final String id) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
@SuppressWarnings("unchecked")
private <T> T getValue(
final Map<String, Variant<?>> stringVariantMap, final String field