private final IncomingMessageHandler incomingMessageHandler;
private final Context context;
+ private boolean hasCaughtUpWithOldMessages = false;
Manager(
SignalAccount account,
final var signalWebSocket = dependencies.getSignalWebSocket();
signalWebSocket.connect();
- var hasCaughtUpWithOldMessages = false;
+ hasCaughtUpWithOldMessages = false;
while (!Thread.interrupted()) {
SignalServiceEnvelope envelope;
envelope = result.get();
} else {
// Received indicator that server queue is empty
- hasCaughtUpWithOldMessages = true;
-
handleQueuedActions(queuedActions);
queuedActions.clear();
+ hasCaughtUpWithOldMessages = true;
+ synchronized (this) {
+ this.notifyAll();
+ }
+
// Continue to wait another timeout for new messages
continue;
}
handleQueuedActions(queuedActions);
}
+ public boolean hasCaughtUpWithOldMessages() {
+ return hasCaughtUpWithOldMessages;
+ }
+
private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
+ var interrupted = false;
for (var action : queuedActions) {
try {
action.execute(context);
} catch (Throwable e) {
- if (e instanceof AssertionError && e.getCause() instanceof InterruptedException) {
- Thread.currentThread().interrupt();
+ if ((e instanceof AssertionError || e instanceof RuntimeException)
+ && e.getCause() instanceof InterruptedException) {
+ interrupted = true;
+ continue;
}
logger.warn("Message action failed.", e);
}
}
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
public boolean isContactBlocked(final RecipientIdentifier.Single recipient) {
objectMapper.valueToTree(s),
null)), m, ignoreAttachments);
+ // Maybe this should be handled inside the Manager
+ while (!m.hasCaughtUpWithOldMessages()) {
+ try {
+ synchronized (m) {
+ m.wait();
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
+
final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
final var jsonRpcReader = new JsonRpcReader(jsonRpcSender, () -> {