import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.asamk.signal.manager.config.ServiceConfig.capabilities;
private boolean ignoreAttachments = false;
private Thread receiveThread;
+ private final Set<ReceiveMessageHandler> weakHandlers = new HashSet<>();
private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
private boolean isReceivingSynchronous;
@Override
public void submitRateLimitRecaptchaChallenge(String challenge, String captcha) throws IOException {
+ captcha = captcha == null ? null : captcha.replace("signalcaptcha://", "");
+
dependencies.getAccountManager().submitRateLimitRecaptchaChallenge(challenge, captcha);
}
}
@Override
- public void addReceiveHandler(final ReceiveMessageHandler handler) {
+ public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) {
if (isReceivingSynchronous) {
throw new IllegalStateException("Already receiving message synchronously.");
}
synchronized (messageHandlers) {
- messageHandlers.add(handler);
-
- startReceiveThreadIfRequired();
+ if (isWeakListener) {
+ weakHandlers.add(handler);
+ } else {
+ messageHandlers.add(handler);
+ startReceiveThreadIfRequired();
+ }
}
}
try {
receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, e) -> {
synchronized (messageHandlers) {
- for (ReceiveMessageHandler h : messageHandlers) {
+ Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
try {
h.handleMessage(envelope, e);
} catch (Exception ex) {
logger.warn("Message handler failed, ignoring", ex);
}
- }
+ });
}
});
break;
public void removeReceiveHandler(final ReceiveMessageHandler handler) {
final Thread thread;
synchronized (messageHandlers) {
+ weakHandlers.remove(handler);
messageHandlers.remove(handler);
- if (!messageHandlers.isEmpty() || isReceivingSynchronous) {
+ if (!messageHandlers.isEmpty() || receiveThread == null || isReceivingSynchronous) {
return;
}
thread = receiveThread;
private void close(boolean closeAccount) throws IOException {
Thread thread;
synchronized (messageHandlers) {
+ weakHandlers.clear();
messageHandlers.clear();
thread = receiveThread;
receiveThread = null;