- addReceiveHandler(handler);
- try {
- Thread.sleep(unit.toMillis(timeout));
- } catch (InterruptedException ignored) {
+ final var lastMessage = new AtomicLong(System.currentTimeMillis());
+
+ final ReceiveMessageHandler receiveHandler = (envelope, e) -> {
+ lastMessage.set(System.currentTimeMillis());
+ handler.handleMessage(envelope, e);
+ };
+ addReceiveHandler(receiveHandler);
+ while (true) {
+ try {
+ final var sleepTimeRemaining = timeout.toMillis() - (System.currentTimeMillis() - lastMessage.get());
+ if (sleepTimeRemaining < 0) {
+ break;
+ }
+ Thread.sleep(sleepTimeRemaining);
+ } catch (InterruptedException ignored) {
+ }