import org.asamk.signal.manager.api.Device;
import org.asamk.signal.manager.api.Group;
import org.asamk.signal.manager.api.Identity;
+import org.asamk.signal.manager.api.InvalidDeviceLinkException;
import org.asamk.signal.manager.api.Message;
+import org.asamk.signal.manager.api.Pair;
import org.asamk.signal.manager.api.RecipientIdentifier;
import org.asamk.signal.manager.api.SendGroupMessageResults;
import org.asamk.signal.manager.api.SendMessageResults;
import org.slf4j.LoggerFactory;
import org.whispersystems.libsignal.InvalidKeyException;
import org.whispersystems.libsignal.ecc.ECPublicKey;
-import org.whispersystems.libsignal.util.Pair;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalSessionLock;
import org.whispersystems.signalservice.api.groupsv2.GroupLinkNotActiveException;
private final Context context;
private boolean hasCaughtUpWithOldMessages = false;
+ private boolean ignoreAttachments = false;
+
+ private Thread receiveThread;
+ private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
+ private boolean isReceivingSynchronous;
ManagerImpl(
SignalAccount account,
account.getSignalProtocolStore(),
executor,
sessionLock);
- final var avatarStore = new AvatarStore(pathConfig.getAvatarsPath());
- final var attachmentStore = new AttachmentStore(pathConfig.getAttachmentsPath());
- final var stickerPackStore = new StickerPackStore(pathConfig.getStickerPacksPath());
+ final var avatarStore = new AvatarStore(pathConfig.avatarsPath());
+ final var attachmentStore = new AttachmentStore(pathConfig.attachmentsPath());
+ final var stickerPackStore = new StickerPackStore(pathConfig.stickerPacksPath());
this.attachmentHelper = new AttachmentHelper(dependencies, attachmentStore);
this.pinHelper = new PinHelper(dependencies.getKeyBackupService());
*/
@Override
public void setProfile(
- String givenName, final String familyName, String about, String aboutEmoji, Optional<File> avatar
+ String givenName, final String familyName, String about, String aboutEmoji, java.util.Optional<File> avatar
) throws IOException {
- profileHelper.setProfile(givenName, familyName, about, aboutEmoji, avatar);
+ profileHelper.setProfile(givenName,
+ familyName,
+ about,
+ aboutEmoji,
+ avatar == null ? null : Optional.fromNullable(avatar.orElse(null)));
syncHelper.sendSyncFetchProfileMessage();
}
}
@Override
- public void addDeviceLink(URI linkUri) throws IOException, InvalidKeyException {
+ public void addDeviceLink(URI linkUri) throws IOException, InvalidDeviceLinkException {
var info = DeviceLinkInfo.parseDeviceLinkUri(linkUri);
- addDevice(info.deviceIdentifier, info.deviceKey);
+ addDevice(info.deviceIdentifier(), info.deviceKey());
}
- private void addDevice(String deviceIdentifier, ECPublicKey deviceKey) throws IOException, InvalidKeyException {
+ private void addDevice(
+ String deviceIdentifier, ECPublicKey deviceKey
+ ) throws IOException, InvalidDeviceLinkException {
var identityKeyPair = account.getIdentityKeyPair();
var verificationCode = dependencies.getAccountManager().getNewDeviceVerificationCode();
- dependencies.getAccountManager()
- .addDevice(deviceIdentifier,
- deviceKey,
- identityKeyPair,
- Optional.of(account.getProfileKey().serialize()),
- verificationCode);
+ try {
+ dependencies.getAccountManager()
+ .addDevice(deviceIdentifier,
+ deviceKey,
+ identityKeyPair,
+ Optional.of(account.getProfileKey().serialize()),
+ verificationCode);
+ } catch (InvalidKeyException e) {
+ throw new InvalidDeviceLinkException("Invalid device link", e);
+ }
account.setMultiDevice(true);
}
@Override
- public void setRegistrationLockPin(Optional<String> pin) throws IOException, UnauthenticatedResponseException {
+ public void setRegistrationLockPin(java.util.Optional<String> pin) throws IOException, UnauthenticatedResponseException {
if (!account.isMasterDevice()) {
throw new RuntimeException("Only master device can set a PIN");
}
long timestamp = System.currentTimeMillis();
messageBuilder.withTimestamp(timestamp);
for (final var recipient : recipients) {
- if (recipient instanceof RecipientIdentifier.Single) {
- final var recipientId = resolveRecipient((RecipientIdentifier.Single) recipient);
+ if (recipient instanceof RecipientIdentifier.Single single) {
+ final var recipientId = resolveRecipient(single);
final var result = sendHelper.sendMessage(messageBuilder, recipientId);
results.put(recipient, List.of(result));
} else if (recipient instanceof RecipientIdentifier.NoteToSelf) {
final var result = sendHelper.sendSelfMessage(messageBuilder);
results.put(recipient, List.of(result));
- } else if (recipient instanceof RecipientIdentifier.Group) {
- final var groupId = ((RecipientIdentifier.Group) recipient).groupId;
- final var result = sendHelper.sendAsGroupMessage(messageBuilder, groupId);
+ } else if (recipient instanceof RecipientIdentifier.Group group) {
+ final var result = sendHelper.sendAsGroupMessage(messageBuilder, group.groupId);
results.put(recipient, result);
}
}
private void applyMessage(
final SignalServiceDataMessage.Builder messageBuilder, final Message message
) throws AttachmentInvalidException, IOException {
- messageBuilder.withBody(message.getMessageText());
- final var attachments = message.getAttachments();
+ messageBuilder.withBody(message.messageText());
+ final var attachments = message.attachments();
if (attachments != null) {
messageBuilder.withAttachments(attachmentHelper.uploadAttachments(attachments));
}
return registeredUsers;
}
- private void retryFailedReceivedMessages(ReceiveMessageHandler handler, boolean ignoreAttachments) {
+ private void retryFailedReceivedMessages(ReceiveMessageHandler handler) {
Set<HandleAction> queuedActions = new HashSet<>();
for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
- var actions = retryFailedReceivedMessage(handler, ignoreAttachments, cachedMessage);
+ var actions = retryFailedReceivedMessage(handler, cachedMessage);
if (actions != null) {
queuedActions.addAll(actions);
}
}
private List<HandleAction> retryFailedReceivedMessage(
- final ReceiveMessageHandler handler, final boolean ignoreAttachments, final CachedMessage cachedMessage
+ final ReceiveMessageHandler handler, final CachedMessage cachedMessage
) {
var envelope = cachedMessage.loadEnvelope();
if (envelope == null) {
}
@Override
- public void receiveMessages(
- long timeout,
- TimeUnit unit,
- boolean returnOnTimeout,
- boolean ignoreAttachments,
- ReceiveMessageHandler handler
+ public void addReceiveHandler(final ReceiveMessageHandler handler) {
+ if (isReceivingSynchronous) {
+ throw new IllegalStateException("Already receiving message synchronously.");
+ }
+ synchronized (messageHandlers) {
+ messageHandlers.add(handler);
+
+ startReceiveThreadIfRequired();
+ }
+ }
+
+ private void startReceiveThreadIfRequired() {
+ if (receiveThread != null) {
+ return;
+ }
+ receiveThread = new Thread(() -> {
+ while (!Thread.interrupted()) {
+ try {
+ receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, decryptedContent, e) -> {
+ synchronized (messageHandlers) {
+ for (ReceiveMessageHandler h : messageHandlers) {
+ try {
+ h.handleMessage(envelope, decryptedContent, e);
+ } catch (Exception ex) {
+ logger.warn("Message handler failed, ignoring", ex);
+ }
+ }
+ }
+ });
+ break;
+ } catch (IOException e) {
+ logger.warn("Receiving messages failed, retrying", e);
+ }
+ }
+ hasCaughtUpWithOldMessages = false;
+ synchronized (messageHandlers) {
+ receiveThread = null;
+
+ // Check if in the meantime another handler has been registered
+ if (!messageHandlers.isEmpty()) {
+ startReceiveThreadIfRequired();
+ }
+ }
+ });
+
+ receiveThread.start();
+ }
+
+ @Override
+ public void removeReceiveHandler(final ReceiveMessageHandler handler) {
+ final Thread thread;
+ synchronized (messageHandlers) {
+ thread = receiveThread;
+ receiveThread = null;
+ messageHandlers.remove(handler);
+ if (!messageHandlers.isEmpty() || isReceivingSynchronous) {
+ return;
+ }
+ }
+
+ stopReceiveThread(thread);
+ }
+
+ private void stopReceiveThread(final Thread thread) {
+ thread.interrupt();
+ try {
+ thread.join();
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ @Override
+ public boolean isReceiving() {
+ if (isReceivingSynchronous) {
+ return true;
+ }
+ synchronized (messageHandlers) {
+ return messageHandlers.size() > 0;
+ }
+ }
+
+ @Override
+ public void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException {
+ receiveMessages(timeout, unit, true, handler);
+ }
+
+ @Override
+ public void receiveMessages(ReceiveMessageHandler handler) throws IOException {
+ receiveMessages(1L, TimeUnit.HOURS, false, handler);
+ }
+
+ private void receiveMessages(
+ long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler
+ ) throws IOException {
+ if (isReceiving()) {
+ throw new IllegalStateException("Already receiving message.");
+ }
+ isReceivingSynchronous = true;
+ receiveThread = Thread.currentThread();
+ try {
+ receiveMessagesInternal(timeout, unit, returnOnTimeout, handler);
+ } finally {
+ receiveThread = null;
+ hasCaughtUpWithOldMessages = false;
+ isReceivingSynchronous = false;
+ }
+ }
+
+ private void receiveMessagesInternal(
+ long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler
) throws IOException {
- retryFailedReceivedMessages(handler, ignoreAttachments);
+ retryFailedReceivedMessages(handler);
Set<HandleAction> queuedActions = new HashSet<>();
signalWebSocket.connect();
hasCaughtUpWithOldMessages = false;
+ var backOffCounter = 0;
+ final var MAX_BACKOFF_COUNTER = 9;
while (!Thread.interrupted()) {
SignalServiceEnvelope envelope;
// store message on disk, before acknowledging receipt to the server
cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
});
+ backOffCounter = 0;
+
if (result.isPresent()) {
envelope = result.get();
logger.debug("New message received from server");
} else {
throw e;
}
- } catch (WebSocketUnavailableException e) {
- logger.debug("Pipe unexpectedly unavailable, connecting");
- signalWebSocket.connect();
- continue;
+ } catch (IOException e) {
+ logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
+ if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
+ final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
+ backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
+ logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
+ try {
+ Thread.sleep(sleepMilliseconds);
+ } catch (InterruptedException interruptedException) {
+ return;
+ }
+ hasCaughtUpWithOldMessages = false;
+ signalWebSocket.connect();
+ continue;
+ }
+ throw e;
} catch (TimeoutException e) {
+ backOffCounter = 0;
if (returnOnTimeout) return;
continue;
}
queuedActions.clear();
}
+ @Override
+ public void setIgnoreAttachments(final boolean ignoreAttachments) {
+ this.ignoreAttachments = ignoreAttachments;
+ }
+
@Override
public boolean hasCaughtUpWithOldMessages() {
return hasCaughtUpWithOldMessages;
}
final var address = account.getRecipientStore().resolveRecipientAddress(identityInfo.getRecipientId());
+ final var scannableFingerprint = identityHelper.computeSafetyNumberForScanning(identityInfo.getRecipientId(),
+ identityInfo.getIdentityKey());
return new Identity(address,
identityInfo.getIdentityKey(),
identityHelper.computeSafetyNumber(identityInfo.getRecipientId(), identityInfo.getIdentityKey()),
- identityHelper.computeSafetyNumberForScanning(identityInfo.getRecipientId(),
- identityInfo.getIdentityKey()).getSerialized(),
+ scannableFingerprint == null ? null : scannableFingerprint.getSerialized(),
identityInfo.getTrustLevel(),
identityInfo.getDateAdded());
}
}
private void close(boolean closeAccount) throws IOException {
+ Thread thread;
+ synchronized (messageHandlers) {
+ messageHandlers.clear();
+ thread = receiveThread;
+ receiveThread = null;
+ }
+ if (thread != null) {
+ stopReceiveThread(thread);
+ }
executor.shutdown();
dependencies.getSignalWebSocket().disconnect();