Only interrupt the receive thread if it is currently waiting for new
messages from the server, otherwise just set a stop flag.
{
"pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CH\\E"
},
{
"pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CH\\E"
},
+ {
+ "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CI\\E"
+ },
+ {
+ "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CL\\E"
+ },
{
"pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CN\\E"
},
{
"pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CN\\E"
},
{
"pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_TH\\E"
},
{
"pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_TH\\E"
},
+ {
+ "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_TR\\E"
+ },
{
"pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_UA\\E"
},
{
"pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_UA\\E"
},
{
"pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_US\\E"
},
{
"pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_US\\E"
},
+ {
+ "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_VE\\E"
+ },
{
"pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_XK\\E"
},
{
"pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_XK\\E"
},
"name":"net.sourceforge.argparse4j.internal.ArgumentParserImpl",
"locales":[
"",
"name":"net.sourceforge.argparse4j.internal.ArgumentParserImpl",
"locales":[
"",
}
receiveThread = new Thread(() -> {
logger.debug("Starting receiving messages");
}
receiveThread = new Thread(() -> {
logger.debug("Starting receiving messages");
- while (!Thread.interrupted()) {
- try {
- context.getReceiveHelper().receiveMessages(Duration.ofMinutes(1), false, (envelope, e) -> {
- synchronized (messageHandlers) {
- Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
- try {
- h.handleMessage(envelope, e);
- } catch (Exception ex) {
- logger.warn("Message handler failed, ignoring", ex);
- }
- });
+ context.getReceiveHelper().receiveMessagesContinuously((envelope, e) -> {
+ synchronized (messageHandlers) {
+ Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
+ try {
+ h.handleMessage(envelope, e);
+ } catch (Exception ex) {
+ logger.warn("Message handler failed, ignoring", ex);
- break;
- } catch (IOException e) {
- logger.warn("Receiving messages failed, retrying", e);
logger.debug("Finished receiving messages");
synchronized (messageHandlers) {
receiveThread = null;
logger.debug("Finished receiving messages");
synchronized (messageHandlers) {
receiveThread = null;
}
private void stopReceiveThread(final Thread thread) {
}
private void stopReceiveThread(final Thread thread) {
+ if (context.getReceiveHelper().requestStopReceiveMessages()) {
+ logger.debug("Receive stop requested, interrupting read from server.");
+ thread.interrupt();
+ }
try {
thread.join();
} catch (InterruptedException ignored) {
try {
thread.join();
} catch (InterruptedException ignored) {
dependencies.getSignalWebSocket().disconnect();
disposable.dispose();
dependencies.getSignalWebSocket().disconnect();
disposable.dispose();
+ if (account != null) {
+ account.close();
+ }
+
synchronized (closedListeners) {
closedListeners.forEach(Runnable::run);
closedListeners.clear();
}
synchronized (closedListeners) {
closedListeners.forEach(Runnable::run);
closedListeners.clear();
}
- if (account != null) {
- account.close();
- }
.storeProfileAvatar(address,
outputStream -> retrieveProfileAvatar(avatarPath, profileKey, outputStream));
} catch (Throwable e) {
.storeProfileAvatar(address,
outputStream -> retrieveProfileAvatar(avatarPath, profileKey, outputStream));
} catch (Throwable e) {
- if (e instanceof AssertionError && e.getCause() instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
logger.warn("Failed to download profile avatar, ignoring: {}", e.getMessage());
}
}
logger.warn("Failed to download profile avatar, ignoring: {}", e.getMessage());
}
}
import org.asamk.signal.manager.Manager;
import org.asamk.signal.manager.SignalDependencies;
import org.asamk.signal.manager.Manager;
import org.asamk.signal.manager.SignalDependencies;
-import org.asamk.signal.manager.api.UntrustedIdentityException;
import org.asamk.signal.manager.actions.HandleAction;
import org.asamk.signal.manager.actions.HandleAction;
+import org.asamk.signal.manager.api.UntrustedIdentityException;
import org.asamk.signal.manager.storage.SignalAccount;
import org.asamk.signal.manager.storage.messageCache.CachedMessage;
import org.slf4j.Logger;
import org.asamk.signal.manager.storage.SignalAccount;
import org.asamk.signal.manager.storage.messageCache.CachedMessage;
import org.slf4j.Logger;
private boolean ignoreAttachments = false;
private boolean needsToRetryFailedMessages = false;
private boolean hasCaughtUpWithOldMessages = false;
private boolean ignoreAttachments = false;
private boolean needsToRetryFailedMessages = false;
private boolean hasCaughtUpWithOldMessages = false;
+ private boolean isWaitingForMessage = false;
+ private boolean shouldStop = false;
private Callable authenticationFailureListener;
private Callable caughtUpWithOldMessagesListener;
private Callable authenticationFailureListener;
private Callable caughtUpWithOldMessagesListener;
this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
}
this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
}
+ public boolean requestStopReceiveMessages() {
+ this.shouldStop = true;
+ return isWaitingForMessage;
+ }
+
+ public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
+ while (!shouldStop) {
+ try {
+ receiveMessages(Duration.ofMinutes(1), false, handler);
+ break;
+ } catch (IOException e) {
+ logger.warn("Receiving messages failed, retrying", e);
+ }
+ }
+ }
+
public void receiveMessages(
Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
) throws IOException {
public void receiveMessages(
Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
) throws IOException {
queuedActions.clear();
dependencies.getSignalWebSocket().disconnect();
webSocketStateDisposable.dispose();
queuedActions.clear();
dependencies.getSignalWebSocket().disconnect();
webSocketStateDisposable.dispose();
final var signalWebSocket = dependencies.getSignalWebSocket();
var backOffCounter = 0;
final var signalWebSocket = dependencies.getSignalWebSocket();
var backOffCounter = 0;
+ isWaitingForMessage = false;
- while (!Thread.interrupted()) {
if (needsToRetryFailedMessages) {
retryFailedReceivedMessages(handler);
needsToRetryFailedMessages = false;
if (needsToRetryFailedMessages) {
retryFailedReceivedMessages(handler);
needsToRetryFailedMessages = false;
}
logger.debug("Checking for new message from server");
try {
}
logger.debug("Checking for new message from server");
try {
+ isWaitingForMessage = true;
var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
+ isWaitingForMessage = false;
final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
.resolveRecipient(envelope1.getSourceAddress()) : null;
logger.trace("Storing new message from {}", recipientId);
// store message on disk, before acknowledging receipt to the server
cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
});
final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
.resolveRecipient(envelope1.getSourceAddress()) : null;
logger.trace("Storing new message from {}", recipientId);
// store message on disk, before acknowledging receipt to the server
cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
});
+ isWaitingForMessage = false;
backOffCounter = 0;
if (result.isPresent()) {
backOffCounter = 0;
if (result.isPresent()) {
}
} catch (AssertionError e) {
if (e.getCause() instanceof InterruptedException) {
}
} catch (AssertionError e) {
if (e.getCause() instanceof InterruptedException) {
- Thread.currentThread().interrupt();
private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
logger.debug("Handling message actions");
private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
logger.debug("Handling message actions");
- var interrupted = false;
for (var action : queuedActions) {
logger.debug("Executing action {}", action.getClass().getSimpleName());
try {
action.execute(context);
} catch (Throwable e) {
for (var action : queuedActions) {
logger.debug("Executing action {}", action.getClass().getSimpleName());
try {
action.execute(context);
} catch (Throwable e) {
- if ((e instanceof AssertionError || e instanceof RuntimeException)
- && e.getCause() instanceof InterruptedException) {
- interrupted = true;
- continue;
- }
logger.warn("Message action failed.", e);
}
}
logger.warn("Message action failed.", e);
}
}
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
}
private void onWebSocketStateChange(final WebSocketConnectionState s) {
}
private void onWebSocketStateChange(final WebSocketConnectionState s) {
this.cleanupThread = new Thread(() -> {
try {
final var interval = Duration.ofHours(1).toMillis();
this.cleanupThread = new Thread(() -> {
try {
final var interval = Duration.ofHours(1).toMillis();
+ while (!Thread.interrupted()) {
try (final var connection = database.getConnection()) {
deleteOutdatedEntries(connection);
} catch (SQLException e) {
try (final var connection = database.getConnection()) {
deleteOutdatedEntries(connection);
} catch (SQLException e) {