From f696097301055577076b53f24ff8e60bab1ce3d7 Mon Sep 17 00:00:00 2001 From: AsamK Date: Thu, 25 Jan 2024 21:47:40 +0100 Subject: [PATCH] Improve JobExecutor --- .../signal/manager/internal/JobExecutor.java | 59 ++++++++++++++++++- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/lib/src/main/java/org/asamk/signal/manager/internal/JobExecutor.java b/lib/src/main/java/org/asamk/signal/manager/internal/JobExecutor.java index bbd36800..ee4c2222 100644 --- a/lib/src/main/java/org/asamk/signal/manager/internal/JobExecutor.java +++ b/lib/src/main/java/org/asamk/signal/manager/internal/JobExecutor.java @@ -5,6 +5,8 @@ import org.asamk.signal.manager.jobs.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayDeque; +import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -13,6 +15,8 @@ public class JobExecutor implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(JobExecutor.class); private final Context context; private final ExecutorService executorService; + private Job running; + private final Queue queue = new ArrayDeque<>(); public JobExecutor(final Context context) { this.context = context; @@ -20,13 +24,64 @@ public class JobExecutor implements AutoCloseable { } public void enqueueJob(Job job) { - logger.debug("Enqueuing {} job", job.getClass().getSimpleName()); + if (executorService.isShutdown()) { + logger.debug("Not enqueuing {} job, shutting down", job.getClass().getSimpleName()); + return; + } - executorService.execute(() -> job.run(context)); + synchronized (queue) { + logger.trace("Enqueuing {} job", job.getClass().getSimpleName()); + queue.add(job); + } + + runNextJob(); + } + + private void runNextJob() { + Job job; + synchronized (queue) { + if (running != null) { + return; + } + job = queue.poll(); + running = job; + } + + if (job == null) { + synchronized (this) { + this.notifyAll(); + } + return; + } + logger.debug("Running {} job", job.getClass().getSimpleName()); + executorService.execute(() -> { + try { + job.run(context); + } catch (Throwable e) { + logger.warn("Job {} failed", job.getClass().getSimpleName(), e); + } finally { + synchronized (queue) { + running = null; + } + runNextJob(); + } + }); } @Override public void close() { + synchronized (queue) { + if (queue.isEmpty()) { + executorService.close(); + return; + } + } + synchronized (this) { + try { + this.wait(); + } catch (InterruptedException ignored) { + } + } executorService.close(); } } -- 2.50.1