public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
while (!shouldStop) {
try {
- receiveMessages(Duration.ofMinutes(1), false, handler);
+ receiveMessages(Duration.ofMinutes(1), false, null, handler);
break;
} catch (IOException e) {
logger.warn("Receiving messages failed, retrying", e);
}
public void receiveMessages(
- Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
+ Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
) throws IOException {
needsToRetryFailedMessages = true;
hasCaughtUpWithOldMessages = false;
signalWebSocket.connect();
try {
- receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, handler, queuedActions);
+ receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
} finally {
hasCaughtUpWithOldMessages = false;
handleQueuedActions(queuedActions.keySet());
final SignalWebSocket signalWebSocket,
Duration timeout,
boolean returnOnTimeout,
+ Integer maxMessages,
Manager.ReceiveMessageHandler handler,
final Map<HandleAction, HandleAction> queuedActions
) throws IOException {
+ int remainingMessages = maxMessages == null ? -1 : maxMessages;
var backOffCounter = 0;
isWaitingForMessage = false;
- while (!shouldStop) {
+ while (!shouldStop && remainingMessages != 0) {
if (needsToRetryFailedMessages) {
retryFailedReceivedMessages(handler);
needsToRetryFailedMessages = false;
backOffCounter = 0;
if (result.isPresent()) {
+ if (remainingMessages > 0) {
+ remainingMessages -= 1;
+ }
envelope = result.get();
logger.debug("New message received from server");
} else {