Use new threads API

This commit is contained in:
AsamK 2023-10-17 21:56:10 +02:00
parent 8d55dfb66b
commit ed8ac5b84c
7 changed files with 40 additions and 75 deletions

View file

@ -1019,7 +1019,7 @@ public class ManagerImpl implements Manager {
if (receiveThread != null || isReceivingSynchronous) { if (receiveThread != null || isReceivingSynchronous) {
return; return;
} }
receiveThread = new Thread(() -> { receiveThread = Thread.ofPlatform().name("receive-" + threadNumber.getAndIncrement()).start(() -> {
logger.debug("Starting receiving messages"); logger.debug("Starting receiving messages");
context.getReceiveHelper().receiveMessagesContinuously(this::passReceivedMessageToHandlers); context.getReceiveHelper().receiveMessagesContinuously(this::passReceivedMessageToHandlers);
logger.debug("Finished receiving messages"); logger.debug("Finished receiving messages");
@ -1033,9 +1033,6 @@ public class ManagerImpl implements Manager {
} }
} }
}); });
receiveThread.setName("receive-" + threadNumber.getAndIncrement());
receiveThread.start();
} }
private void passReceivedMessageToHandlers(MessageEnvelope envelope, Throwable e) { private void passReceivedMessageToHandlers(MessageEnvelope envelope, Throwable e) {
@ -1310,7 +1307,7 @@ public class ManagerImpl implements Manager {
if (thread != null) { if (thread != null) {
stopReceiveThread(thread); stopReceiveThread(thread);
} }
executor.shutdown(); executor.close();
dependencies.getSignalWebSocket().disconnect(); dependencies.getSignalWebSocket().disconnect();
disposable.dispose(); disposable.dispose();

View file

@ -38,7 +38,7 @@ public class MessageSendLogStore implements AutoCloseable {
public MessageSendLogStore(final Database database, final boolean disableMessageSendLog) { public MessageSendLogStore(final Database database, final boolean disableMessageSendLog) {
this.database = database; this.database = database;
this.sendLogDisabled = disableMessageSendLog; this.sendLogDisabled = disableMessageSendLog;
this.cleanupThread = new Thread(() -> { this.cleanupThread = Thread.ofPlatform().name("msl-cleanup").daemon().start(() -> {
try { try {
final var interval = Duration.ofHours(1).toMillis(); final var interval = Duration.ofHours(1).toMillis();
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
@ -55,9 +55,6 @@ public class MessageSendLogStore implements AutoCloseable {
logger.debug("Stopping msl cleanup thread"); logger.debug("Stopping msl cleanup thread");
} }
}); });
cleanupThread.setName("msl-cleanup");
cleanupThread.setDaemon(true);
cleanupThread.start();
} }
public static void createSql(Connection connection) throws SQLException { public static void createSql(Connection connection) throws SQLException {

View file

@ -39,6 +39,7 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -257,19 +258,19 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
m.addReceiveHandler(handler, isWeakListener); m.addReceiveHandler(handler, isWeakListener);
} }
private void runSocketSingleAccount( private Thread runSocketSingleAccount(
final Manager m, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart final Manager m, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
) { ) {
runSocket(serverChannel, channel -> { return runSocket(serverChannel, channel -> {
final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart); final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
handler.handleConnection(m); handler.handleConnection(m);
}); });
} }
private void runSocketMultiAccount( private Thread runSocketMultiAccount(
final MultiAccountManager c, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart final MultiAccountManager c, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
) { ) {
runSocket(serverChannel, channel -> { return runSocket(serverChannel, channel -> {
final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart); final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
handler.handleConnection(c); handler.handleConnection(c);
}); });
@ -277,8 +278,9 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
private static final AtomicInteger threadNumber = new AtomicInteger(0); private static final AtomicInteger threadNumber = new AtomicInteger(0);
private void runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) { private Thread runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
final var thread = new Thread(() -> { return Thread.ofPlatform().name("daemon-listener").start(() -> {
try (final var executor = Executors.newCachedThreadPool()) {
while (true) { while (true) {
final var connectionId = threadNumber.getAndIncrement(); final var connectionId = threadNumber.getAndIncrement();
final SocketChannel channel; final SocketChannel channel;
@ -289,12 +291,9 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
logger.info("Accepted new client connection {}: {}", connectionId, clientString); logger.info("Accepted new client connection {}: {}", connectionId, clientString);
} catch (IOException e) { } catch (IOException e) {
logger.error("Failed to accept new socket connection", e); logger.error("Failed to accept new socket connection", e);
synchronized (this) {
notifyAll();
}
break; break;
} }
final var connectionThread = new Thread(() -> { executor.submit(() -> {
try (final var c = channel) { try (final var c = channel) {
socketHandler.accept(c); socketHandler.accept(c);
} catch (IOException e) { } catch (IOException e) {
@ -304,12 +303,12 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
} }
logger.info("Connection {} closed: {}", connectionId, clientString); logger.info("Connection {} closed: {}", connectionId, clientString);
}); });
connectionThread.setName("daemon-connection-" + connectionId); }
connectionThread.start(); }
synchronized (this) {
notifyAll();
} }
}); });
thread.setName("daemon-listener");
thread.start();
} }
private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler( private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(
@ -411,11 +410,8 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
final DBusConnection conn, final String objectPath, final Manager m, final boolean noReceiveOnStart final DBusConnection conn, final String objectPath, final Manager m, final boolean noReceiveOnStart
) { ) {
final var signal = new DbusSignalImpl(m, conn, objectPath, noReceiveOnStart); final var signal = new DbusSignalImpl(m, conn, objectPath, noReceiveOnStart);
final var initThread = new Thread(signal::initObjects);
initThread.setName("dbus-init");
initThread.start();
return initThread; return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects);
} }
interface DbusRunner { interface DbusRunner {

View file

@ -103,7 +103,7 @@ public class DbusSignalControlImpl implements org.asamk.SignalControl {
public String link(final String newDeviceName) throws Error.Failure { public String link(final String newDeviceName) throws Error.Failure {
try { try {
final URI deviceLinkUri = c.getNewProvisioningDeviceLinkUri(); final URI deviceLinkUri = c.getNewProvisioningDeviceLinkUri();
final var thread = new Thread(() -> { Thread.ofPlatform().name("dbus-link").start(() -> {
final ProvisioningManager provisioningManager = c.getProvisioningManagerFor(deviceLinkUri); final ProvisioningManager provisioningManager = c.getProvisioningManagerFor(deviceLinkUri);
try { try {
provisioningManager.finishDeviceLink(newDeviceName); provisioningManager.finishDeviceLink(newDeviceName);
@ -111,8 +111,6 @@ public class DbusSignalControlImpl implements org.asamk.SignalControl {
e.printStackTrace(); e.printStackTrace();
} }
}); });
thread.setName("dbus-link");
thread.start();
return deviceLinkUri.toString(); return deviceLinkUri.toString();
} catch (TimeoutException | IOException e) { } catch (TimeoutException | IOException e) {
throw new SignalControl.Error.Failure(e.getClass().getSimpleName() + " " + e.getMessage()); throw new SignalControl.Error.Failure(e.getClass().getSimpleName() + " " + e.getMessage());

View file

@ -54,7 +54,7 @@ public class HttpServerHandler {
logger.info("Starting server on " + address.toString()); logger.info("Starting server on " + address.toString());
final var server = HttpServer.create(address, 0); final var server = HttpServer.create(address, 0);
server.setExecutor(Executors.newFixedThreadPool(10)); server.setExecutor(Executors.newCachedThreadPool());
server.createContext("/api/v1/rpc", this::handleRpcEndpoint); server.createContext("/api/v1/rpc", this::handleRpcEndpoint);
server.createContext("/api/v1/events", this::handleEventsEndpoint); server.createContext("/api/v1/events", this::handleEventsEndpoint);

View file

@ -55,8 +55,7 @@ public class JsonRpcReader {
return; return;
} }
final var executor = Executors.newFixedThreadPool(10); try (final var executor = Executors.newCachedThreadPool()) {
try {
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
final var input = lineSupplier.get(); final var input = lineSupplier.get();
if (input == null) { if (input == null) {
@ -72,8 +71,6 @@ public class JsonRpcReader {
executor.submit(() -> handleMessage(message, requestHandler, responseHandler)); executor.submit(() -> handleMessage(message, requestHandler, responseHandler));
} }
} finally {
Util.closeExecutorService(executor);
} }
} }
@ -94,8 +91,7 @@ public class JsonRpcReader {
case JsonRpcBatchMessage jsonRpcBatchMessage -> { case JsonRpcBatchMessage jsonRpcBatchMessage -> {
final var messages = jsonRpcBatchMessage.getMessages(); final var messages = jsonRpcBatchMessage.getMessages();
final var responseList = new ArrayList<JsonRpcResponse>(messages.size()); final var responseList = new ArrayList<JsonRpcResponse>(messages.size());
final var executor = Executors.newFixedThreadPool(10); try (final var executor = Executors.newCachedThreadPool()) {
try {
final var lock = new ReentrantLock(); final var lock = new ReentrantLock();
messages.forEach(jsonNode -> { messages.forEach(jsonNode -> {
final JsonRpcRequest request; final JsonRpcRequest request;
@ -124,8 +120,6 @@ public class JsonRpcReader {
} }
}); });
}); });
} finally {
Util.closeExecutorService(executor);
} }
if (!responseList.isEmpty()) { if (!responseList.isEmpty()) {

View file

@ -16,8 +16,6 @@ import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class Util { public class Util {
@ -86,19 +84,4 @@ public class Util {
} }
return map; return map;
} }
public static void closeExecutorService(ExecutorService executor) {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
executor.shutdownNow();
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
logger.warn("Failed to shutdown executor service");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
} }