*/
package org.asamk.signal.manager;
+import org.asamk.signal.manager.api.AlreadyReceivingException;
import org.asamk.signal.manager.api.AttachmentInvalidException;
import org.asamk.signal.manager.api.Configuration;
import org.asamk.signal.manager.api.Device;
import org.asamk.signal.manager.api.InvalidDeviceLinkException;
import org.asamk.signal.manager.api.InvalidStickerException;
import org.asamk.signal.manager.api.Message;
+import org.asamk.signal.manager.api.MessageEnvelope;
import org.asamk.signal.manager.api.NotPrimaryDeviceException;
import org.asamk.signal.manager.api.Pair;
import org.asamk.signal.manager.api.PendingAdminApprovalException;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@Override
public Pair<GroupId, SendGroupMessageResults> createGroup(
- String name, Set<RecipientIdentifier.Single> members, File avatarFile
+ String name, Set<RecipientIdentifier.Single> members, String avatarFile
) throws IOException, AttachmentInvalidException, UnregisteredRecipientException {
return context.getGroupHelper()
.createGroup(name,
final var textAttachment = AttachmentUtils.createAttachmentStream(new StreamDetails(new ByteArrayInputStream(
messageBytes), MimeUtils.LONG_TEXT, messageBytes.length), Optional.empty());
messageBuilder.withBody(message.messageText().substring(0, 2000));
- messageBuilder.withAttachment(textAttachment);
+ messageBuilder.withAttachment(context.getAttachmentHelper().uploadAttachment(textAttachment));
} else {
messageBuilder.withBody(message.messageText());
}
quote.message(),
List.of(),
resolveMentions(quote.mentions()),
- SignalServiceDataMessage.Quote.Type.NORMAL));
+ SignalServiceDataMessage.Quote.Type.NORMAL,
+ List.of()));
}
if (message.sticker().isPresent()) {
final var sticker = message.sticker().get();
byte[] receipt, String note, RecipientIdentifier.Single recipient
) throws IOException {
final var paymentNotification = new SignalServiceDataMessage.PaymentNotification(receipt, note);
- final var payment = new SignalServiceDataMessage.Payment(paymentNotification);
+ final var payment = new SignalServiceDataMessage.Payment(paymentNotification, null);
final var messageBuilder = SignalServiceDataMessage.newBuilder().withPayment(payment);
try {
return sendMessage(messageBuilder, Set.of(recipient));
@Override
public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) {
- if (isReceivingSynchronous) {
- throw new IllegalStateException("Already receiving message synchronously.");
- }
synchronized (messageHandlers) {
if (isWeakListener) {
weakHandlers.add(handler);
private static final AtomicInteger threadNumber = new AtomicInteger(0);
private void startReceiveThreadIfRequired() {
- if (receiveThread != null) {
+ if (receiveThread != null || isReceivingSynchronous) {
return;
}
receiveThread = new Thread(() -> {
logger.debug("Starting receiving messages");
- context.getReceiveHelper().receiveMessagesContinuously((envelope, e) -> {
- synchronized (messageHandlers) {
- final var handlers = Stream.concat(messageHandlers.stream(), weakHandlers.stream()).toList();
- handlers.forEach(h -> {
- try {
- h.handleMessage(envelope, e);
- } catch (Throwable ex) {
- logger.warn("Message handler failed, ignoring", ex);
- }
- });
- }
- });
+ context.getReceiveHelper().receiveMessagesContinuously(this::passReceivedMessageToHandlers);
logger.debug("Finished receiving messages");
synchronized (messageHandlers) {
receiveThread = null;
receiveThread.start();
}
+ private void passReceivedMessageToHandlers(MessageEnvelope envelope, Throwable e) {
+ synchronized (messageHandlers) {
+ Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
+ try {
+ h.handleMessage(envelope, e);
+ } catch (Throwable ex) {
+ logger.warn("Message handler failed, ignoring", ex);
+ }
+ });
+ }
+ }
+
@Override
public void removeReceiveHandler(final ReceiveMessageHandler handler) {
final Thread thread;
}
@Override
- public void receiveMessages(Duration timeout, ReceiveMessageHandler handler) throws IOException {
- receiveMessages(timeout, true, handler);
- }
-
- @Override
- public void receiveMessages(ReceiveMessageHandler handler) throws IOException {
- receiveMessages(Duration.ofMinutes(1), false, handler);
+ public void receiveMessages(
+ Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler
+ ) throws IOException, AlreadyReceivingException {
+ receiveMessages(timeout.orElse(Duration.ofMinutes(1)), timeout.isPresent(), maxMessages.orElse(null), handler);
}
private void receiveMessages(
- Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler
- ) throws IOException {
- if (isReceiving()) {
- throw new IllegalStateException("Already receiving message.");
+ Duration timeout, boolean returnOnTimeout, Integer maxMessages, ReceiveMessageHandler handler
+ ) throws IOException, AlreadyReceivingException {
+ synchronized (messageHandlers) {
+ if (isReceiving()) {
+ throw new AlreadyReceivingException("Already receiving message.");
+ }
+ isReceivingSynchronous = true;
+ receiveThread = Thread.currentThread();
}
- isReceivingSynchronous = true;
- receiveThread = Thread.currentThread();
try {
- context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, handler);
+ context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, maxMessages, (envelope, e) -> {
+ passReceivedMessageToHandlers(envelope, e);
+ handler.handleMessage(envelope, e);
+ });
} finally {
- receiveThread = null;
- isReceivingSynchronous = false;
+ synchronized (messageHandlers) {
+ receiveThread = null;
+ isReceivingSynchronous = false;
+ if (messageHandlers.size() > 0) {
+ startReceiveThreadIfRequired();
+ }
+ }
}
}
}
}
+ @Override
+ public InputStream retrieveAttachment(final String id) throws IOException {
+ return context.getAttachmentHelper().retrieveAttachment(id).getStream();
+ }
+
@Override
public void close() {
Thread thread;