}
receiveThread = new Thread(() -> {
logger.debug("Starting receiving messages");
- while (!Thread.interrupted()) {
- try {
- context.getReceiveHelper().receiveMessages(Duration.ofMinutes(1), false, (envelope, e) -> {
- synchronized (messageHandlers) {
- Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
- try {
- h.handleMessage(envelope, e);
- } catch (Exception ex) {
- logger.warn("Message handler failed, ignoring", ex);
- }
- });
+ context.getReceiveHelper().receiveMessagesContinuously((envelope, e) -> {
+ synchronized (messageHandlers) {
+ Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
+ try {
+ h.handleMessage(envelope, e);
+ } catch (Throwable ex) {
+ logger.warn("Message handler failed, ignoring", ex);
}
});
- break;
- } catch (IOException e) {
- logger.warn("Receiving messages failed, retrying", e);
}
- }
+ });
logger.debug("Finished receiving messages");
synchronized (messageHandlers) {
receiveThread = null;
}
private void stopReceiveThread(final Thread thread) {
- thread.interrupt();
+ if (context.getReceiveHelper().requestStopReceiveMessages()) {
+ logger.debug("Receive stop requested, interrupting read from server.");
+ thread.interrupt();
+ }
try {
thread.join();
} catch (InterruptedException ignored) {
dependencies.getSignalWebSocket().disconnect();
disposable.dispose();
+ if (account != null) {
+ account.close();
+ }
+
synchronized (closedListeners) {
closedListeners.forEach(Runnable::run);
closedListeners.clear();
}
- if (account != null) {
- account.close();
- }
account = null;
}
}