Improve JobExecutor

This commit is contained in:
AsamK 2024-01-25 21:47:40 +01:00
parent 7d3db03d4a
commit f696097301

View file

@ -5,6 +5,8 @@ import org.asamk.signal.manager.jobs.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -13,6 +15,8 @@ public class JobExecutor implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(JobExecutor.class);
private final Context context;
private final ExecutorService executorService;
private Job running;
private final Queue<Job> queue = new ArrayDeque<>();
public JobExecutor(final Context context) {
this.context = context;
@ -20,13 +24,64 @@ public class JobExecutor implements AutoCloseable {
}
public void enqueueJob(Job job) {
logger.debug("Enqueuing {} job", job.getClass().getSimpleName());
if (executorService.isShutdown()) {
logger.debug("Not enqueuing {} job, shutting down", job.getClass().getSimpleName());
return;
}
executorService.execute(() -> job.run(context));
synchronized (queue) {
logger.trace("Enqueuing {} job", job.getClass().getSimpleName());
queue.add(job);
}
runNextJob();
}
private void runNextJob() {
Job job;
synchronized (queue) {
if (running != null) {
return;
}
job = queue.poll();
running = job;
}
if (job == null) {
synchronized (this) {
this.notifyAll();
}
return;
}
logger.debug("Running {} job", job.getClass().getSimpleName());
executorService.execute(() -> {
try {
job.run(context);
} catch (Throwable e) {
logger.warn("Job {} failed", job.getClass().getSimpleName(), e);
} finally {
synchronized (queue) {
running = null;
}
runNextJob();
}
});
}
@Override
public void close() {
synchronized (queue) {
if (queue.isEmpty()) {
executorService.close();
return;
}
}
synchronized (this) {
try {
this.wait();
} catch (InterruptedException ignored) {
}
}
executorService.close();
}
}