Refactor DaemonCommand

This commit is contained in:
AsamK 2023-11-10 15:18:06 +01:00
parent c0aa338d7c
commit 7e9940be4a
5 changed files with 301 additions and 199 deletions

View file

@ -4,29 +4,21 @@ import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser; import net.sourceforge.argparse4j.inf.Subparser;
import org.asamk.signal.DbusConfig;
import org.asamk.signal.OutputType; import org.asamk.signal.OutputType;
import org.asamk.signal.ReceiveMessageHandler; import org.asamk.signal.ReceiveMessageHandler;
import org.asamk.signal.Shutdown; import org.asamk.signal.Shutdown;
import org.asamk.signal.commands.exceptions.CommandException; import org.asamk.signal.commands.exceptions.CommandException;
import org.asamk.signal.commands.exceptions.IOErrorException; import org.asamk.signal.commands.exceptions.IOErrorException;
import org.asamk.signal.commands.exceptions.UnexpectedErrorException; import org.asamk.signal.dbus.DbusHandler;
import org.asamk.signal.commands.exceptions.UserErrorException;
import org.asamk.signal.dbus.DbusSignalControlImpl;
import org.asamk.signal.dbus.DbusSignalImpl;
import org.asamk.signal.http.HttpServerHandler; import org.asamk.signal.http.HttpServerHandler;
import org.asamk.signal.json.JsonReceiveMessageHandler; import org.asamk.signal.json.JsonReceiveMessageHandler;
import org.asamk.signal.jsonrpc.SignalJsonRpcDispatcherHandler; import org.asamk.signal.jsonrpc.SocketHandler;
import org.asamk.signal.manager.Manager; import org.asamk.signal.manager.Manager;
import org.asamk.signal.manager.MultiAccountManager; import org.asamk.signal.manager.MultiAccountManager;
import org.asamk.signal.output.JsonWriter; import org.asamk.signal.output.JsonWriter;
import org.asamk.signal.output.JsonWriterImpl;
import org.asamk.signal.output.OutputWriter; import org.asamk.signal.output.OutputWriter;
import org.asamk.signal.output.PlainTextWriter; import org.asamk.signal.output.PlainTextWriter;
import org.asamk.signal.util.IOUtils; import org.asamk.signal.util.IOUtils;
import org.freedesktop.dbus.connections.impl.DBusConnection;
import org.freedesktop.dbus.connections.impl.DBusConnectionBuilder;
import org.freedesktop.dbus.exceptions.DBusException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -35,16 +27,9 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.UnixDomainSocketAddress; import java.net.UnixDomainSocketAddress;
import java.nio.channels.Channel; import java.nio.channels.Channel;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static org.asamk.signal.util.CommandUtil.getReceiveConfig; import static org.asamk.signal.util.CommandUtil.getReceiveConfig;
@ -60,7 +45,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
@Override @Override
public void attachToSubparser(final Subparser subparser) { public void attachToSubparser(final Subparser subparser) {
final var defaultSocketPath = new File(new File(IOUtils.getRuntimeDir(), "signal-cli"), "socket"); final var defaultSocketPath = new File(new File(IOUtils.getRuntimeDir(), "signal-cli"), "socket");
subparser.help("Run in daemon mode and provide an experimental dbus or JSON-RPC interface."); subparser.help("Run in daemon mode and provide a JSON-RPC or an experimental dbus interface.");
subparser.addArgument("--dbus") subparser.addArgument("--dbus")
.action(Arguments.storeTrue()) .action(Arguments.storeTrue())
.help("Expose a DBus interface on the user bus (the default, if no other options are given)."); .help("Expose a DBus interface on the user bus (the default, if no other options are given).");
@ -161,10 +146,12 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
private static void setup(final Namespace ns, final DaemonHandler daemonHandler) throws CommandException { private static void setup(final Namespace ns, final DaemonHandler daemonHandler) throws CommandException {
final Channel inheritedChannel; final Channel inheritedChannel;
try { try {
inheritedChannel = System.inheritedChannel(); if (System.inheritedChannel() instanceof ServerSocketChannel serverChannel) {
if (inheritedChannel instanceof ServerSocketChannel serverChannel) { inheritedChannel = serverChannel;
logger.info("Using inherited socket: " + serverChannel.getLocalAddress()); logger.info("Using inherited socket: " + serverChannel.getLocalAddress());
daemonHandler.runSocket(serverChannel); daemonHandler.runSocket(serverChannel);
} else {
inheritedChannel = null;
} }
} catch (IOException e) { } catch (IOException e) {
throw new IOErrorException("Failed to use inherited socket", e); throw new IOErrorException("Failed to use inherited socket", e);
@ -201,7 +188,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
&& socketFile == null && socketFile == null
&& tcpAddress == null && tcpAddress == null
&& httpAddress == null && httpAddress == null
&& !(inheritedChannel instanceof ServerSocketChannel) && inheritedChannel == null
)) { )) {
daemonHandler.runDbus(false); daemonHandler.runDbus(false);
} }
@ -221,9 +208,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
protected final ReceiveMode receiveMode; protected final ReceiveMode receiveMode;
protected final List<AutoCloseable> closeables = new ArrayList<>(); protected final List<AutoCloseable> closeables = new ArrayList<>();
private static final AtomicInteger threadNumber = new AtomicInteger(0); protected DaemonHandler(final ReceiveMode receiveMode) {
public DaemonHandler(final ReceiveMode receiveMode) {
this.receiveMode = receiveMode; this.receiveMode = receiveMode;
} }
@ -233,106 +218,37 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
public abstract void runHttp(InetSocketAddress address) throws CommandException; public abstract void runHttp(InetSocketAddress address) throws CommandException;
protected void runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) { protected final void runSocket(final SocketHandler socketHandler) {
final List<AutoCloseable> channels = new ArrayList<>(); socketHandler.init();
final var thread = Thread.ofPlatform().name("daemon-listener").start(() -> { this.closeables.add(socketHandler);
try (final var executor = Executors.newCachedThreadPool()) {
while (true) {
final var connectionId = threadNumber.getAndIncrement();
final SocketChannel channel;
final String clientString;
try {
channel = serverChannel.accept();
clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
logger.info("Accepted new client connection {}: {}", connectionId, clientString);
} catch (ClosedChannelException ignored) {
logger.trace("Listening socket has been closed");
break;
} catch (IOException e) {
logger.error("Failed to accept new socket connection", e);
break;
}
channels.add(channel);
executor.submit(() -> {
try (final var c = channel) {
socketHandler.accept(c);
} catch (IOException e) {
logger.warn("Failed to close channel", e);
} catch (Throwable e) {
logger.warn("Connection handler failed, closing connection", e);
}
logger.info("Connection {} closed: {}", connectionId, clientString);
channels.remove(channel);
});
}
}
});
closeables.add(() -> {
serverChannel.close();
for (final var c : new ArrayList<>(channels)) {
c.close();
}
thread.join();
});
} }
protected SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(final SocketChannel c) { protected final void runDbus(
final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8)); DbusHandler dbusHandler
final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8));
return new SignalJsonRpcDispatcherHandler(jsonOutputWriter,
lineSupplier,
receiveMode == ReceiveMode.MANUAL);
}
protected Thread exportDbusObject(final DBusConnection conn, final String objectPath, final Manager m) {
final var signal = new DbusSignalImpl(m, conn, objectPath, receiveMode != ReceiveMode.ON_START);
closeables.add(signal);
return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects);
}
protected void runDbus(
final boolean isDbusSystem, MultiAccountDaemonHandler.DbusRunner dbusRunner
) throws CommandException { ) throws CommandException {
DBusConnection.DBusBusType busType; dbusHandler.init();
if (isDbusSystem) { this.closeables.add(dbusHandler);
busType = DBusConnection.DBusBusType.SYSTEM;
} else {
busType = DBusConnection.DBusBusType.SESSION;
}
DBusConnection conn;
try {
conn = DBusConnectionBuilder.forType(busType).build();
dbusRunner.run(conn, DbusConfig.getObjectPath());
} catch (DBusException e) {
throw new UnexpectedErrorException("Dbus command failed: " + e.getMessage(), e);
} catch (UnsupportedOperationException e) {
throw new UserErrorException("Failed to connect to Dbus: " + e.getMessage(), e);
} }
protected final void runHttp(final HttpServerHandler handler) throws CommandException {
try { try {
conn.requestBusName(DbusConfig.getBusname()); handler.init();
} catch (DBusException e) { } catch (IOException ex) {
throw new UnexpectedErrorException( throw new IOErrorException("Failed to initialize HTTP Server", ex);
"Dbus command failed, maybe signal-cli dbus daemon is already running: " + e.getMessage(),
e);
} }
closeables.add(conn); this.closeables.add(handler);
logger.info("DBus daemon running on {} bus: {}", busType, DbusConfig.getBusname());
} }
@Override @Override
public void close() { public void close() {
for (final var closeable : new ArrayList<>(closeables)) { for (final var closeable : new ArrayList<>(this.closeables)) {
try { try {
closeable.close(); closeable.close();
} catch (Exception e) { } catch (Exception e) {
logger.warn("Failed to close daemon handler", e); logger.warn("Failed to close daemon handler", e);
} }
} }
closeables.clear(); this.closeables.clear();
} }
} }
@ -340,38 +256,24 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
private final Manager m; private final Manager m;
private SingleAccountDaemonHandler(final Manager m, final ReceiveMode receiveMode) { public SingleAccountDaemonHandler(final Manager m, final ReceiveMode receiveMode) {
super(receiveMode); super(receiveMode);
this.m = m; this.m = m;
} }
@Override @Override
public void runSocket(final ServerSocketChannel serverChannel) { public void runSocket(final ServerSocketChannel serverChannel) {
runSocket(serverChannel, channel -> { runSocket(new SocketHandler(serverChannel, m, receiveMode == ReceiveMode.MANUAL));
final var handler = getSignalJsonRpcDispatcherHandler(channel);
handler.handleConnection(m);
});
} }
@Override @Override
public void runDbus(final boolean isDbusSystem) throws CommandException { public void runDbus(final boolean isDbusSystem) throws CommandException {
runDbus(isDbusSystem, (conn, objectPath) -> { runDbus(new DbusHandler(isDbusSystem, m, receiveMode != ReceiveMode.ON_START));
try {
exportDbusObject(conn, objectPath, m).join();
} catch (InterruptedException ignored) {
}
});
} }
@Override @Override
public void runHttp(InetSocketAddress address) throws CommandException { public void runHttp(InetSocketAddress address) throws CommandException {
final var handler = new HttpServerHandler(address, m); runHttp(new HttpServerHandler(address, m));
try {
handler.init();
} catch (IOException ex) {
throw new IOErrorException("Failed to initialize HTTP Server", ex);
}
this.closeables.add(handler);
} }
} }
@ -379,74 +281,24 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
private final MultiAccountManager c; private final MultiAccountManager c;
private MultiAccountDaemonHandler(final MultiAccountManager c, final ReceiveMode receiveMode) { public MultiAccountDaemonHandler(final MultiAccountManager c, final ReceiveMode receiveMode) {
super(receiveMode); super(receiveMode);
this.c = c; this.c = c;
} }
@Override
public void runSocket(final ServerSocketChannel serverChannel) { public void runSocket(final ServerSocketChannel serverChannel) {
runSocket(serverChannel, channel -> { runSocket(new SocketHandler(serverChannel, c, receiveMode == ReceiveMode.MANUAL));
final var handler = getSignalJsonRpcDispatcherHandler(channel);
handler.handleConnection(c);
});
} }
@Override
public void runDbus(final boolean isDbusSystem) throws CommandException { public void runDbus(final boolean isDbusSystem) throws CommandException {
runDbus(isDbusSystem, (connection, objectPath) -> { runDbus(new DbusHandler(isDbusSystem, c, receiveMode != ReceiveMode.ON_START));
final var signalControl = new DbusSignalControlImpl(c, objectPath);
connection.exportObject(signalControl);
c.addOnManagerAddedHandler(m -> {
final var thread = exportManager(connection, m);
try {
thread.join();
} catch (InterruptedException ignored) {
}
});
c.addOnManagerRemovedHandler(m -> {
final var path = DbusConfig.getObjectPath(m.getSelfNumber());
try {
final var object = connection.getExportedObject(null, path);
if (object instanceof DbusSignalImpl dbusSignal) {
dbusSignal.close();
closeables.remove(dbusSignal);
}
} catch (DBusException ignored) {
}
});
final var initThreads = c.getManagers().stream().map(m -> exportManager(connection, m)).toList();
for (var t : initThreads) {
try {
t.join();
} catch (InterruptedException ignored) {
}
}
});
} }
@Override @Override
public void runHttp(final InetSocketAddress address) throws CommandException { public void runHttp(final InetSocketAddress address) throws CommandException {
final var handler = new HttpServerHandler(address, c); runHttp(new HttpServerHandler(address, c));
try {
handler.init();
} catch (IOException ex) {
throw new IOErrorException("Failed to initialize HTTP Server", ex);
}
this.closeables.add(handler);
}
private Thread exportManager(
final DBusConnection conn, final Manager m
) {
final var objectPath = DbusConfig.getObjectPath(m.getSelfNumber());
return exportDbusObject(conn, objectPath, m);
}
interface DbusRunner {
void run(DBusConnection connection, String objectPath) throws DBusException;
} }
} }
} }

View file

@ -0,0 +1,131 @@
package org.asamk.signal.dbus;
import org.asamk.signal.DbusConfig;
import org.asamk.signal.commands.exceptions.CommandException;
import org.asamk.signal.commands.exceptions.UnexpectedErrorException;
import org.asamk.signal.commands.exceptions.UserErrorException;
import org.asamk.signal.manager.Manager;
import org.asamk.signal.manager.MultiAccountManager;
import org.freedesktop.dbus.connections.impl.DBusConnection;
import org.freedesktop.dbus.connections.impl.DBusConnectionBuilder;
import org.freedesktop.dbus.exceptions.DBusException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class DbusHandler implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(DbusHandler.class);
private final boolean isDbusSystem;
private DBusConnection dBusConnection;
private final List<AutoCloseable> closeables = new ArrayList<>();
private final DbusRunner dbusRunner;
private final boolean noReceiveOnStart;
public DbusHandler(final boolean isDbusSystem, final Manager m, final boolean noReceiveOnStart) {
this.isDbusSystem = isDbusSystem;
this.dbusRunner = (connection) -> {
try {
exportDbusObject(connection, DbusConfig.getObjectPath(), m).join();
} catch (InterruptedException ignored) {
}
};
this.noReceiveOnStart = noReceiveOnStart;
}
public DbusHandler(final boolean isDbusSystem, final MultiAccountManager c, final boolean noReceiveOnStart) {
this.isDbusSystem = isDbusSystem;
this.dbusRunner = (connection) -> {
final var signalControl = new DbusSignalControlImpl(c, DbusConfig.getObjectPath());
connection.exportObject(signalControl);
c.addOnManagerAddedHandler(m -> {
final var thread = exportManager(connection, m);
try {
thread.join();
} catch (InterruptedException ignored) {
}
});
c.addOnManagerRemovedHandler(m -> {
final var path = DbusConfig.getObjectPath(m.getSelfNumber());
try {
final var object = connection.getExportedObject(null, path);
if (object instanceof DbusSignalImpl dbusSignal) {
dbusSignal.close();
closeables.remove(dbusSignal);
}
} catch (DBusException ignored) {
}
});
final var initThreads = c.getManagers().stream().map(m -> exportManager(connection, m)).toList();
for (var t : initThreads) {
try {
t.join();
} catch (InterruptedException ignored) {
}
}
};
this.noReceiveOnStart = noReceiveOnStart;
}
public void init() throws CommandException {
if (dBusConnection != null) {
throw new AssertionError("DbusHandler already initialized");
}
final var busType = isDbusSystem ? DBusConnection.DBusBusType.SYSTEM : DBusConnection.DBusBusType.SESSION;
logger.debug("Starting DBus server on {} bus: {}", busType, DbusConfig.getBusname());
try {
dBusConnection = DBusConnectionBuilder.forType(busType).build();
dbusRunner.run(dBusConnection);
} catch (DBusException e) {
throw new UnexpectedErrorException("Dbus command failed: " + e.getMessage(), e);
} catch (UnsupportedOperationException e) {
throw new UserErrorException("Failed to connect to Dbus: " + e.getMessage(), e);
}
try {
dBusConnection.requestBusName(DbusConfig.getBusname());
} catch (DBusException e) {
throw new UnexpectedErrorException("Dbus command failed, maybe signal-cli dbus daemon is already running: "
+ e.getMessage(), e);
}
logger.info("Started DBus server on {} bus: {}", busType, DbusConfig.getBusname());
}
@Override
public void close() throws Exception {
if (dBusConnection == null) {
return;
}
dBusConnection.close();
for (final var c : new ArrayList<>(closeables)) {
c.close();
}
closeables.clear();
dBusConnection = null;
}
private Thread exportDbusObject(final DBusConnection conn, final String objectPath, final Manager m) {
final var signal = new DbusSignalImpl(m, conn, objectPath, noReceiveOnStart);
closeables.add(signal);
return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects);
}
private Thread exportManager(final DBusConnection conn, final Manager m) {
final var objectPath = DbusConfig.getObjectPath(m.getSelfNumber());
return exportDbusObject(conn, objectPath, m);
}
private interface DbusRunner {
void run(DBusConnection connection) throws DBusException;
}
}

View file

@ -56,7 +56,7 @@ public class HttpServerHandler implements AutoCloseable {
if (server != null) { if (server != null) {
throw new AssertionError("HttpServerHandler already initialized"); throw new AssertionError("HttpServerHandler already initialized");
} }
logger.info("Starting server on " + address.toString()); logger.debug("Starting HTTP server on {}", address);
server = HttpServer.create(address, 0); server = HttpServer.create(address, 0);
server.setExecutor(Executors.newCachedThreadPool()); server.setExecutor(Executors.newCachedThreadPool());
@ -66,6 +66,21 @@ public class HttpServerHandler implements AutoCloseable {
server.createContext("/api/v1/check", this::handleCheckEndpoint); server.createContext("/api/v1/check", this::handleCheckEndpoint);
server.start(); server.start();
logger.info("Started HTTP server on {}", address);
}
@Override
public void close() {
if (server != null) {
shutdown.set(true);
synchronized (this) {
this.notifyAll();
}
// Increase this delay when https://bugs.openjdk.org/browse/JDK-8304065 is fixed
server.stop(2);
server = null;
shutdown.set(false);
}
} }
private void sendResponse(int status, Object response, HttpExchange httpExchange) throws IOException { private void sendResponse(int status, Object response, HttpExchange httpExchange) throws IOException {
@ -221,7 +236,7 @@ public class HttpServerHandler implements AutoCloseable {
return List.of(manager); return List.of(manager);
} }
} }
return List.of(); throw new AssertionError("Unreachable state");
} }
private List<Pair<Manager, Manager.ReceiveMessageHandler>> subscribeReceiveHandlers( private List<Pair<Manager, Manager.ReceiveMessageHandler>> subscribeReceiveHandlers(
@ -246,20 +261,6 @@ public class HttpServerHandler implements AutoCloseable {
m.removeReceiveHandler(handler); m.removeReceiveHandler(handler);
} }
@Override
public void close() {
if (server != null) {
shutdown.set(true);
synchronized (this) {
this.notifyAll();
}
// Increase this delay when https://bugs.openjdk.org/browse/JDK-8304065 is fixed
server.stop(2);
server = null;
shutdown.set(false);
}
}
private interface Callable { private interface Callable {
void call(); void call();

View file

@ -0,0 +1,118 @@
package org.asamk.signal.jsonrpc;
import org.asamk.signal.manager.Manager;
import org.asamk.signal.manager.MultiAccountManager;
import org.asamk.signal.output.JsonWriterImpl;
import org.asamk.signal.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
public class SocketHandler implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(SocketHandler.class);
private static final AtomicInteger threadNumber = new AtomicInteger(0);
private final ServerSocketChannel serverChannel;
private Thread listenerThread;
private final List<AutoCloseable> channels = new ArrayList<>();
private final Consumer<SocketChannel> socketHandler;
private final boolean noReceiveOnStart;
public SocketHandler(final ServerSocketChannel serverChannel, final Manager m, final boolean noReceiveOnStart) {
this.serverChannel = serverChannel;
this.socketHandler = channel -> getSignalJsonRpcDispatcherHandler(channel).handleConnection(m);
this.noReceiveOnStart = noReceiveOnStart;
}
public SocketHandler(
final ServerSocketChannel serverChannel, final MultiAccountManager c, final boolean noReceiveOnStart
) {
this.serverChannel = serverChannel;
this.socketHandler = channel -> getSignalJsonRpcDispatcherHandler(channel).handleConnection(c);
this.noReceiveOnStart = noReceiveOnStart;
}
public void init() {
if (listenerThread != null) {
throw new AssertionError("SocketHandler already initialized");
}
SocketAddress socketAddress;
try {
socketAddress = serverChannel.getLocalAddress();
} catch (IOException e) {
logger.debug("Failed to get socket address: {}", e.getMessage());
socketAddress = null;
}
final var address = socketAddress == null ? "<Unknown socket address>" : socketAddress;
logger.debug("Starting JSON-RPC server on {}", address);
listenerThread = Thread.ofPlatform().name("daemon-listener").start(() -> {
try (final var executor = Executors.newCachedThreadPool()) {
logger.info("Started JSON-RPC server on {}", address);
while (true) {
final var connectionId = threadNumber.getAndIncrement();
final SocketChannel channel;
final String clientString;
try {
channel = serverChannel.accept();
clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
logger.info("Accepted new client connection {}: {}", connectionId, clientString);
} catch (ClosedChannelException ignored) {
logger.trace("Listening socket has been closed");
break;
} catch (IOException e) {
logger.error("Failed to accept new socket connection", e);
break;
}
channels.add(channel);
executor.submit(() -> {
try (final var c = channel) {
socketHandler.accept(c);
} catch (IOException e) {
logger.warn("Failed to close channel", e);
} catch (Throwable e) {
logger.warn("Connection handler failed, closing connection", e);
}
logger.info("Connection {} closed: {}", connectionId, clientString);
channels.remove(channel);
});
}
}
});
}
@Override
public void close() throws Exception {
if (listenerThread == null) {
return;
}
serverChannel.close();
for (final var c : new ArrayList<>(channels)) {
c.close();
}
listenerThread.join();
channels.clear();
listenerThread = null;
}
private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(final SocketChannel c) {
final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8));
final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8));
return new SignalJsonRpcDispatcherHandler(jsonOutputWriter, lineSupplier, noReceiveOnStart);
}
}

View file

@ -142,7 +142,7 @@ public class IOUtils {
? ServerSocketChannel.open(StandardProtocolFamily.UNIX) ? ServerSocketChannel.open(StandardProtocolFamily.UNIX)
: ServerSocketChannel.open(); : ServerSocketChannel.open();
serverChannel.bind(address); serverChannel.bind(address);
logger.info("Listening on socket: " + address); logger.debug("Listening on socket: " + address);
postBind(address); postBind(address);
} catch (IOException e) { } catch (IOException e) {
throw new IOErrorException("Failed to bind socket " + address + ": " + e.getMessage(), e); throw new IOErrorException("Failed to bind socket " + address + ": " + e.getMessage(), e);