queuedActions.addAll(actions);
}
}
- for (var action : queuedActions) {
- try {
- action.execute(this);
- } catch (Throwable e) {
- if (e instanceof AssertionError && e.getCause() instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- logger.warn("Message action failed.", e);
- }
- }
+ handleQueuedActions(queuedActions);
}
private List<HandleAction> retryFailedReceivedMessage(
boolean returnOnTimeout,
boolean ignoreAttachments,
ReceiveMessageHandler handler
- ) throws IOException, InterruptedException {
+ ) throws IOException {
retryFailedReceivedMessages(handler, ignoreAttachments);
Set<HandleAction> queuedActions = new HashSet<>();
// Received indicator that server queue is empty
hasCaughtUpWithOldMessages = true;
- for (var action : queuedActions) {
- try {
- action.execute(this);
- } catch (Throwable e) {
- if (e instanceof AssertionError && e.getCause() instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- logger.warn("Message action failed.", e);
- }
- }
+ handleQueuedActions(queuedActions);
queuedActions.clear();
// Continue to wait another timeout for new messages
}
} catch (AssertionError e) {
if (e.getCause() instanceof InterruptedException) {
- throw (InterruptedException) e.getCause();
+ Thread.currentThread().interrupt();
+ break;
} else {
throw e;
}
}
}
}
+ handleQueuedActions(queuedActions);
+ }
+
+ private void handleQueuedActions(final Set<HandleAction> queuedActions) {
+ for (var action : queuedActions) {
+ try {
+ action.execute(this);
+ } catch (Throwable e) {
+ if (e instanceof AssertionError && e.getCause() instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ logger.warn("Message action failed.", e);
+ }
+ }
}
private boolean isMessageBlocked(