import org.asamk.signal.manager.storage.recipients.RecipientAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.whispersystems.signalservice.api.SignalWebSocket;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import org.whispersystems.signalservice.api.push.ServiceId;
import org.whispersystems.signalservice.api.push.ServiceId.ACI;
+import org.whispersystems.signalservice.api.websocket.SignalWebSocket;
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
import java.util.Set;
import java.util.concurrent.TimeoutException;
-import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class ReceiveHelper {
private final Context context;
private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
- private boolean needsToRetryFailedMessages = false;
private boolean hasCaughtUpWithOldMessages = false;
private boolean isWaitingForMessage = false;
private boolean shouldStop = false;
dependencies.setAllowStories(!receiveConfig.ignoreStories());
}
- public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
- this.needsToRetryFailedMessages = needsToRetryFailedMessages;
- }
-
public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
this.authenticationFailureListener = authenticationFailureListener;
}
}
public void receiveMessages(
- Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
+ Duration timeout,
+ boolean returnOnTimeout,
+ Integer maxMessages,
+ Manager.ReceiveMessageHandler handler
) throws IOException {
- needsToRetryFailedMessages = true;
+ account.setNeedsToRetryFailedMessages(true);
hasCaughtUpWithOldMessages = false;
// Use a Map here because java Set doesn't have a get method ...
Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
- final var signalWebSocket = dependencies.getSignalWebSocket();
- final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
- signalWebSocket.getWebSocketState())
+ final var signalWebSocket = dependencies.getAuthenticatedSignalWebSocket();
+ final var webSocketStateDisposable = signalWebSocket.getState()
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.distinctUntilChanged()
}
private void receiveMessagesInternal(
- final SignalWebSocket signalWebSocket,
+ final SignalWebSocket.AuthenticatedWebSocket signalWebSocket,
Duration timeout,
boolean returnOnTimeout,
Integer maxMessages,
isWaitingForMessage = false;
while (!shouldStop && remainingMessages != 0) {
- if (needsToRetryFailedMessages) {
+ if (account.getNeedsToRetryFailedMessages()) {
retryFailedReceivedMessages(handler);
- needsToRetryFailedMessages = false;
}
SignalServiceEnvelope envelope;
final CachedMessage[] cachedMessage = {null};
if (exception instanceof UntrustedIdentityException) {
logger.debug("Keeping message with untrusted identity in message cache");
final var address = ((UntrustedIdentityException) exception).getSender();
- if (envelope.getSourceServiceId().isEmpty() && address.uuid().isPresent()) {
+ if (envelope.getSourceServiceId().isEmpty() && address.aci().isPresent()) {
final var recipientId = account.getRecipientResolver()
- .resolveRecipient(ACI.from(address.uuid().get()));
+ .resolveRecipient(ACI.parseOrThrow(address.aci().get()));
try {
cachedMessage[0] = account.getMessageCache()
.replaceSender(cachedMessage[0], recipientId);
}
}
handleQueuedActions(queuedActions);
+ account.setNeedsToRetryFailedMessages(false);
}
private List<HandleAction> retryFailedReceivedMessage(
- final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
+ final Manager.ReceiveMessageHandler handler,
+ final CachedMessage cachedMessage
) {
var envelope = cachedMessage.loadEnvelope();
if (envelope == null) {
final var exception = result.second();
if (exception instanceof UntrustedIdentityException) {
- if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
- // Envelope is more than a month old, cleaning up.
+ if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 14) {
+ // Envelope is more than two weeks old, cleaning up.
cachedMessage.delete();
return null;
}