+ final var handler = switch (outputWriter) {
+ case JsonWriter writer -> new JsonReceiveMessageHandler(m, writer);
+ case PlainTextWriter writer -> new ReceiveMessageHandler(m, writer);
+ };
+ final var duration = timeout < 0 ? null : Duration.ofMillis((long) (timeout * 1000));
+ final var maxMessages = maxMessagesRaw < 0 ? null : maxMessagesRaw;
+ Shutdown.registerShutdownListener(m::stopReceiveMessages);
+ m.receiveMessages(Optional.ofNullable(duration), Optional.ofNullable(maxMessages), handler);
+ } catch (IOException e) {
+ throw new IOErrorException("Error while receiving messages: " + e.getMessage(), e);
+ } catch (AlreadyReceivingException e) {
+ throw new UserErrorException("Receive command cannot be used if messages are already being received.", e);
+ }
+ }
+
+ @Override
+ public TypeReference<ReceiveParams> getRequestType() {
+ return new TypeReference<>() {};
+ }
+
+ @Override
+ public void handleCommand(
+ final ReceiveParams request, final Manager m, final JsonWriter jsonWriter
+ ) throws CommandException {
+ final var timeout = request.timeout() == null ? 3.0 : request.timeout();
+ final var maxMessagesRaw = request.maxMessages() == null ? -1 : request.maxMessages();
+
+ try {
+ final var messages = new ArrayList<>();
+ final var handler = new JsonReceiveMessageHandler(m, messages::add);