Prevent a stale jsonrpc connection from interfering with message receiving

Fixes #893
This commit is contained in:
AsamK 2022-02-19 18:46:45 +01:00
parent 832604e763
commit 3f582e9c2e
2 changed files with 10 additions and 11 deletions

View file

@ -770,7 +770,7 @@ class ManagerImpl implements Manager {
Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
try { try {
h.handleMessage(envelope, e); h.handleMessage(envelope, e);
} catch (Exception ex) { } catch (Throwable ex) {
logger.warn("Message handler failed, ignoring", ex); logger.warn("Message handler failed, ignoring", ex);
} }
}); });

View file

@ -35,7 +35,6 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -240,12 +239,13 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
private void runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) { private void runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
final var thread = new Thread(() -> { final var thread = new Thread(() -> {
while (true) { while (true) {
final var connectionId = threadNumber.getAndIncrement();
final SocketChannel channel; final SocketChannel channel;
final String clientString; final String clientString;
try { try {
channel = serverChannel.accept(); channel = serverChannel.accept();
clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel); clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
logger.info("Accepted new client: " + clientString); logger.info("Accepted new client connection {}: {}", connectionId, clientString);
} catch (IOException e) { } catch (IOException e) {
logger.error("Failed to accept new socket connection", e); logger.error("Failed to accept new socket connection", e);
synchronized (this) { synchronized (this) {
@ -256,12 +256,14 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
final var connectionThread = new Thread(() -> { final var connectionThread = new Thread(() -> {
try (final var c = channel) { try (final var c = channel) {
socketHandler.accept(c); socketHandler.accept(c);
logger.info("Connection closed: " + clientString);
} catch (IOException e) { } catch (IOException e) {
logger.warn("Failed to close channel", e); logger.warn("Failed to close channel", e);
} catch (Throwable e) {
logger.warn("Connection handler failed, closing connection", e);
} }
logger.info("Connection {} closed: {}", connectionId, clientString);
}); });
connectionThread.setName("daemon-connection-" + threadNumber.getAndIncrement()); connectionThread.setName("daemon-connection-" + connectionId);
connectionThread.start(); connectionThread.start();
} }
}); });
@ -298,11 +300,9 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
c.addOnManagerAddedHandler(m -> { c.addOnManagerAddedHandler(m -> {
final var thread = exportMultiAccountManager(connection, m, noReceiveOnStart); final var thread = exportMultiAccountManager(connection, m, noReceiveOnStart);
if (thread != null) { try {
try { thread.join();
thread.join(); } catch (InterruptedException ignored) {
} catch (InterruptedException ignored) {
}
} }
}); });
c.addOnManagerRemovedHandler(m -> { c.addOnManagerRemovedHandler(m -> {
@ -319,7 +319,6 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
final var initThreads = c.getManagers() final var initThreads = c.getManagers()
.stream() .stream()
.map(m -> exportMultiAccountManager(connection, m, noReceiveOnStart)) .map(m -> exportMultiAccountManager(connection, m, noReceiveOnStart))
.filter(Objects::nonNull)
.toList(); .toList();
for (var t : initThreads) { for (var t : initThreads) {