X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/e22cc457ae91e22deb54373078bef7e8cd22edcb..4a1af0786c938f887a109a17dcc879da21704a8b:/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index 5ee2e7d3..69ae9269 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -108,6 +108,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.asamk.signal.manager.config.ServiceConfig.capabilities; @@ -139,6 +140,7 @@ public class ManagerImpl implements Manager { private boolean ignoreAttachments = false; private Thread receiveThread; + private final Set weakHandlers = new HashSet<>(); private final Set messageHandlers = new HashSet<>(); private boolean isReceivingSynchronous; @@ -401,6 +403,8 @@ public class ManagerImpl implements Manager { @Override public void submitRateLimitRecaptchaChallenge(String challenge, String captcha) throws IOException { + captcha = captcha == null ? null : captcha.replace("signalcaptcha://", ""); + dependencies.getAccountManager().submitRateLimitRecaptchaChallenge(challenge, captcha); } @@ -904,14 +908,17 @@ public class ManagerImpl implements Manager { } @Override - public void addReceiveHandler(final ReceiveMessageHandler handler) { + public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) { if (isReceivingSynchronous) { throw new IllegalStateException("Already receiving message synchronously."); } synchronized (messageHandlers) { - messageHandlers.add(handler); - - startReceiveThreadIfRequired(); + if (isWeakListener) { + weakHandlers.add(handler); + } else { + messageHandlers.add(handler); + startReceiveThreadIfRequired(); + } } } @@ -920,17 +927,18 @@ public class ManagerImpl implements Manager { return; } receiveThread = new Thread(() -> { + logger.debug("Starting receiving messages"); while (!Thread.interrupted()) { try { receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, e) -> { synchronized (messageHandlers) { - for (ReceiveMessageHandler h : messageHandlers) { + Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { try { h.handleMessage(envelope, e); } catch (Exception ex) { logger.warn("Message handler failed, ignoring", ex); } - } + }); } }); break; @@ -938,12 +946,14 @@ public class ManagerImpl implements Manager { logger.warn("Receiving messages failed, retrying", e); } } + logger.debug("Finished receiving messages"); hasCaughtUpWithOldMessages = false; synchronized (messageHandlers) { receiveThread = null; // Check if in the meantime another handler has been registered if (!messageHandlers.isEmpty()) { + logger.debug("Another handler has been registered, starting receive thread again"); startReceiveThreadIfRequired(); } } @@ -956,12 +966,13 @@ public class ManagerImpl implements Manager { public void removeReceiveHandler(final ReceiveMessageHandler handler) { final Thread thread; synchronized (messageHandlers) { - thread = receiveThread; - receiveThread = null; + weakHandlers.remove(handler); messageHandlers.remove(handler); - if (!messageHandlers.isEmpty() || isReceivingSynchronous) { + if (!messageHandlers.isEmpty() || receiveThread == null || isReceivingSynchronous) { return; } + thread = receiveThread; + receiveThread = null; } stopReceiveThread(thread); @@ -1114,6 +1125,7 @@ public class ManagerImpl implements Manager { } handleQueuedActions(queuedActions); queuedActions.clear(); + dependencies.getSignalWebSocket().disconnect(); } @Override @@ -1376,6 +1388,7 @@ public class ManagerImpl implements Manager { private void close(boolean closeAccount) throws IOException { Thread thread; synchronized (messageHandlers) { + weakHandlers.clear(); messageHandlers.clear(); thread = receiveThread; receiveThread = null;