]> nmode's Git Repositories - signal-cli/commitdiff
Improve JobExecutor
authorAsamK <asamk@gmx.de>
Thu, 25 Jan 2024 20:47:40 +0000 (21:47 +0100)
committerAsamK <asamk@gmx.de>
Thu, 25 Jan 2024 20:47:40 +0000 (21:47 +0100)
lib/src/main/java/org/asamk/signal/manager/internal/JobExecutor.java

index bbd36800751af18f7b5a978900e03156c1f72916..ee4c222244e0a4ed226f3740fb6648d861cfc8eb 100644 (file)
@@ -5,6 +5,8 @@ import org.asamk.signal.manager.jobs.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayDeque;
+import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -13,6 +15,8 @@ public class JobExecutor implements AutoCloseable {
     private static final Logger logger = LoggerFactory.getLogger(JobExecutor.class);
     private final Context context;
     private final ExecutorService executorService;
+    private Job running;
+    private final Queue<Job> queue = new ArrayDeque<>();
 
     public JobExecutor(final Context context) {
         this.context = context;
@@ -20,13 +24,64 @@ public class JobExecutor implements AutoCloseable {
     }
 
     public void enqueueJob(Job job) {
-        logger.debug("Enqueuing {} job", job.getClass().getSimpleName());
+        if (executorService.isShutdown()) {
+            logger.debug("Not enqueuing {} job, shutting down", job.getClass().getSimpleName());
+            return;
+        }
 
-        executorService.execute(() -> job.run(context));
+        synchronized (queue) {
+            logger.trace("Enqueuing {} job", job.getClass().getSimpleName());
+            queue.add(job);
+        }
+
+        runNextJob();
+    }
+
+    private void runNextJob() {
+        Job job;
+        synchronized (queue) {
+            if (running != null) {
+                return;
+            }
+            job = queue.poll();
+            running = job;
+        }
+
+        if (job == null) {
+            synchronized (this) {
+                this.notifyAll();
+            }
+            return;
+        }
+        logger.debug("Running {} job", job.getClass().getSimpleName());
+        executorService.execute(() -> {
+            try {
+                job.run(context);
+            } catch (Throwable e) {
+                logger.warn("Job {} failed", job.getClass().getSimpleName(), e);
+            } finally {
+                synchronized (queue) {
+                    running = null;
+                }
+                runNextJob();
+            }
+        });
     }
 
     @Override
     public void close() {
+        synchronized (queue) {
+            if (queue.isEmpty()) {
+                executorService.close();
+                return;
+            }
+        }
+        synchronized (this) {
+            try {
+                this.wait();
+            } catch (InterruptedException ignored) {
+            }
+        }
         executorService.close();
     }
 }