import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.asamk.signal.manager.config.ServiceConfig.capabilities;
private boolean ignoreAttachments = false;
private Thread receiveThread;
+ private final Set<ReceiveMessageHandler> weakHandlers = new HashSet<>();
private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
private boolean isReceivingSynchronous;
@Override
public void submitRateLimitRecaptchaChallenge(String challenge, String captcha) throws IOException {
+ captcha = captcha == null ? null : captcha.replace("signalcaptcha://", "");
+
dependencies.getAccountManager().submitRateLimitRecaptchaChallenge(challenge, captcha);
}
}
@Override
- public void addReceiveHandler(final ReceiveMessageHandler handler) {
+ public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) {
if (isReceivingSynchronous) {
throw new IllegalStateException("Already receiving message synchronously.");
}
synchronized (messageHandlers) {
- messageHandlers.add(handler);
-
- startReceiveThreadIfRequired();
+ if (isWeakListener) {
+ weakHandlers.add(handler);
+ } else {
+ messageHandlers.add(handler);
+ startReceiveThreadIfRequired();
+ }
}
}
return;
}
receiveThread = new Thread(() -> {
+ logger.debug("Starting receiving messages");
while (!Thread.interrupted()) {
try {
receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, e) -> {
synchronized (messageHandlers) {
- for (ReceiveMessageHandler h : messageHandlers) {
+ Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
try {
h.handleMessage(envelope, e);
} catch (Exception ex) {
logger.warn("Message handler failed, ignoring", ex);
}
- }
+ });
}
});
break;
logger.warn("Receiving messages failed, retrying", e);
}
}
+ logger.debug("Finished receiving messages");
hasCaughtUpWithOldMessages = false;
synchronized (messageHandlers) {
receiveThread = null;
// Check if in the meantime another handler has been registered
if (!messageHandlers.isEmpty()) {
+ logger.debug("Another handler has been registered, starting receive thread again");
startReceiveThreadIfRequired();
}
}
public void removeReceiveHandler(final ReceiveMessageHandler handler) {
final Thread thread;
synchronized (messageHandlers) {
- thread = receiveThread;
- receiveThread = null;
+ weakHandlers.remove(handler);
messageHandlers.remove(handler);
- if (!messageHandlers.isEmpty() || isReceivingSynchronous) {
+ if (!messageHandlers.isEmpty() || receiveThread == null || isReceivingSynchronous) {
return;
}
+ thread = receiveThread;
+ receiveThread = null;
}
stopReceiveThread(thread);
}
handleQueuedActions(queuedActions);
queuedActions.clear();
+ dependencies.getSignalWebSocket().disconnect();
}
@Override
private void close(boolean closeAccount) throws IOException {
Thread thread;
synchronized (messageHandlers) {
+ weakHandlers.clear();
messageHandlers.clear();
thread = receiveThread;
receiveThread = null;