Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler
) throws IOException, AlreadyReceivingException;
+ void stopReceiveMessages();
+
void setReceiveConfig(ReceiveConfig receiveConfig);
boolean isContactBlocked(RecipientIdentifier.Single recipient);
receiveMessages(timeout.orElse(Duration.ofMinutes(1)), timeout.isPresent(), maxMessages.orElse(null), handler);
}
+ @Override
+ public void stopReceiveMessages() {
+ Thread thread = null;
+ synchronized (messageHandlers) {
+ if (isReceivingSynchronous) {
+ thread = receiveThread;
+ receiveThread = null;
+ }
+ }
+ if (thread != null) {
+ stopReceiveThread(thread);
+ }
+ }
+
private void receiveMessages(
Duration timeout, boolean returnOnTimeout, Integer maxMessages, ReceiveMessageHandler handler
) throws IOException, AlreadyReceivingException {
import org.asamk.signal.OutputType;
import org.asamk.signal.ReceiveMessageHandler;
+import org.asamk.signal.Shutdown;
import org.asamk.signal.commands.exceptions.CommandException;
import org.asamk.signal.commands.exceptions.IOErrorException;
import org.asamk.signal.commands.exceptions.UserErrorException;
public void handleCommand(
final Namespace ns, final Manager m, final OutputWriter outputWriter
) throws CommandException {
+ Shutdown.installHandler();
final var timeout = ns.getDouble("timeout");
final var maxMessagesRaw = ns.getInt("max-messages");
final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
};
final var duration = timeout < 0 ? null : Duration.ofMillis((long) (timeout * 1000));
final var maxMessages = maxMessagesRaw < 0 ? null : maxMessagesRaw;
+ Shutdown.registerShutdownListener(m::stopReceiveMessages);
m.receiveMessages(Optional.ofNullable(duration), Optional.ofNullable(maxMessages), handler);
} catch (IOException e) {
throw new IOErrorException("Error while receiving messages: " + e.getMessage(), e);
import org.asamk.Signal;
import org.asamk.signal.DbusConfig;
import org.asamk.signal.manager.Manager;
+import org.asamk.signal.manager.api.AlreadyReceivingException;
import org.asamk.signal.manager.api.AttachmentInvalidException;
import org.asamk.signal.manager.api.CaptchaRequiredException;
import org.asamk.signal.manager.api.Configuration;
}
}
+ private Thread receiveThread;
+
@Override
public void receiveMessages(
Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler
- ) throws IOException {
+ ) throws IOException, AlreadyReceivingException {
+ if (receiveThread != null) {
+ throw new AlreadyReceivingException("Already receiving message.");
+ }
+ receiveThread = Thread.currentThread();
+
final var remainingMessages = new AtomicInteger(maxMessages.orElse(-1));
final var lastMessage = new AtomicLong(System.currentTimeMillis());
final var thread = Thread.currentThread();
}
Thread.sleep(sleepTimeRemaining);
} catch (InterruptedException ignored) {
+ break;
}
}
} else {
}
removeReceiveHandler(receiveHandler);
+ receiveThread = null;
+ }
+
+ @Override
+ public void stopReceiveMessages() {
+ if (receiveThread != null) {
+ receiveThread.interrupt();
+ }
}
@Override