Move receive thread handling to manager

This commit is contained in:
AsamK 2021-10-21 22:59:52 +02:00
parent 5c389c875d
commit fc0a9b4102
5 changed files with 156 additions and 49 deletions

View file

@ -193,6 +193,20 @@ public interface Manager extends Closeable {
void requestAllSyncData() throws IOException; void requestAllSyncData() throws IOException;
/**
* Add a handler to receive new messages.
* Will start receiving messages from server, if not already started.
*/
void addReceiveHandler(ReceiveMessageHandler handler);
/**
* Remove a handler to receive new messages.
* Will stop receiving messages from server, if this was the last registered receiver.
*/
void removeReceiveHandler(ReceiveMessageHandler handler);
boolean isReceiving();
/** /**
* Receive new messages from server, returns if no new message arrive in a timespan of timeout. * Receive new messages from server, returns if no new message arrive in a timespan of timeout.
*/ */

View file

@ -137,6 +137,10 @@ public class ManagerImpl implements Manager {
private boolean hasCaughtUpWithOldMessages = false; private boolean hasCaughtUpWithOldMessages = false;
private boolean ignoreAttachments = false; private boolean ignoreAttachments = false;
private Thread receiveThread;
private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
private boolean isReceivingSynchronous;
ManagerImpl( ManagerImpl(
SignalAccount account, SignalAccount account,
PathConfig pathConfig, PathConfig pathConfig,
@ -872,6 +876,88 @@ public class ManagerImpl implements Manager {
return actions; return actions;
} }
@Override
public void addReceiveHandler(final ReceiveMessageHandler handler) {
if (isReceivingSynchronous) {
throw new IllegalStateException("Already receiving message synchronously.");
}
synchronized (messageHandlers) {
messageHandlers.add(handler);
startReceiveThreadIfRequired();
}
}
private void startReceiveThreadIfRequired() {
if (receiveThread != null) {
return;
}
receiveThread = new Thread(() -> {
while (!Thread.interrupted()) {
try {
receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, decryptedContent, e) -> {
synchronized (messageHandlers) {
for (ReceiveMessageHandler h : messageHandlers) {
try {
h.handleMessage(envelope, decryptedContent, e);
} catch (Exception ex) {
logger.warn("Message handler failed, ignoring", ex);
}
}
}
});
break;
} catch (IOException e) {
logger.warn("Receiving messages failed, retrying", e);
}
}
hasCaughtUpWithOldMessages = false;
synchronized (messageHandlers) {
receiveThread = null;
// Check if in the meantime another handler has been registered
if (!messageHandlers.isEmpty()) {
startReceiveThreadIfRequired();
}
}
});
receiveThread.start();
}
@Override
public void removeReceiveHandler(final ReceiveMessageHandler handler) {
final Thread thread;
synchronized (messageHandlers) {
thread = receiveThread;
receiveThread = null;
messageHandlers.remove(handler);
if (!messageHandlers.isEmpty() || isReceivingSynchronous) {
return;
}
}
stopReceiveThread(thread);
}
private void stopReceiveThread(final Thread thread) {
thread.interrupt();
try {
thread.join();
} catch (InterruptedException ignored) {
}
}
@Override
public boolean isReceiving() {
if (isReceivingSynchronous) {
return true;
}
synchronized (messageHandlers) {
return messageHandlers.size() > 0;
}
}
@Override @Override
public void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException { public void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException {
receiveMessages(timeout, unit, true, handler); receiveMessages(timeout, unit, true, handler);
@ -884,6 +970,23 @@ public class ManagerImpl implements Manager {
private void receiveMessages( private void receiveMessages(
long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler
) throws IOException {
if (isReceiving()) {
throw new IllegalStateException("Already receiving message.");
}
isReceivingSynchronous = true;
receiveThread = Thread.currentThread();
try {
receiveMessagesInternal(timeout, unit, returnOnTimeout, handler);
} finally {
receiveThread = null;
hasCaughtUpWithOldMessages = false;
isReceivingSynchronous = false;
}
}
private void receiveMessagesInternal(
long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler
) throws IOException { ) throws IOException {
retryFailedReceivedMessages(handler); retryFailedReceivedMessages(handler);
@ -1249,6 +1352,15 @@ public class ManagerImpl implements Manager {
} }
private void close(boolean closeAccount) throws IOException { private void close(boolean closeAccount) throws IOException {
Thread thread;
synchronized (messageHandlers) {
messageHandlers.clear();
thread = receiveThread;
receiveThread = null;
}
if (thread != null) {
stopReceiveThread(thread);
}
executor.shutdown(); executor.shutdown();
dependencies.getSignalWebSocket().disconnect(); dependencies.getSignalWebSocket().disconnect();

View file

@ -71,6 +71,9 @@ public class DaemonCommand implements MultiLocalCommand {
try { try {
t.join(); t.join();
synchronized (this) {
wait();
}
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
} catch (DBusException | IOException e) { } catch (DBusException | IOException e) {
@ -128,27 +131,11 @@ public class DaemonCommand implements MultiLocalCommand {
logger.info("Exported dbus object: " + objectPath); logger.info("Exported dbus object: " + objectPath);
final var thread = new Thread(() -> { final var receiveMessageHandler = outputWriter instanceof JsonWriter ? new JsonDbusReceiveMessageHandler(m,
while (!Thread.interrupted()) { (JsonWriter) outputWriter,
try { conn,
final var receiveMessageHandler = outputWriter instanceof JsonWriter objectPath) : new DbusReceiveMessageHandler(m, (PlainTextWriter) outputWriter, conn, objectPath);
? new JsonDbusReceiveMessageHandler(m, (JsonWriter) outputWriter, conn, objectPath) m.addReceiveHandler(receiveMessageHandler);
: new DbusReceiveMessageHandler(m, (PlainTextWriter) outputWriter, conn, objectPath); return initThread;
m.receiveMessages(receiveMessageHandler);
break;
} catch (IOException e) {
logger.warn("Receiving messages failed, retrying", e);
}
}
try {
initThread.join();
} catch (InterruptedException ignored) {
}
signal.close();
});
thread.start();
return thread;
} }
} }

View file

@ -70,10 +70,11 @@ public class JsonRpcDispatcherCommand implements LocalCommand {
final var objectMapper = Util.createJsonObjectMapper(); final var objectMapper = Util.createJsonObjectMapper();
final var jsonRpcSender = new JsonRpcSender((JsonWriter) outputWriter); final var jsonRpcSender = new JsonRpcSender((JsonWriter) outputWriter);
final var receiveThread = receiveMessages(s -> jsonRpcSender.sendRequest(JsonRpcRequest.forNotification( final var receiveMessageHandler = new JsonReceiveMessageHandler(m,
"receive", s -> jsonRpcSender.sendRequest(JsonRpcRequest.forNotification("receive",
objectMapper.valueToTree(s), objectMapper.valueToTree(s),
null)), m); null)));
m.addReceiveHandler(receiveMessageHandler);
// Maybe this should be handled inside the Manager // Maybe this should be handled inside the Manager
while (!m.hasCaughtUpWithOldMessages()) { while (!m.hasCaughtUpWithOldMessages()) {
@ -97,11 +98,7 @@ public class JsonRpcDispatcherCommand implements LocalCommand {
jsonRpcReader.readRequests((method, params) -> handleRequest(m, objectMapper, method, params), jsonRpcReader.readRequests((method, params) -> handleRequest(m, objectMapper, method, params),
response -> logger.debug("Received unexpected response for id {}", response.getId())); response -> logger.debug("Received unexpected response for id {}", response.getId()));
receiveThread.interrupt(); m.removeReceiveHandler(receiveMessageHandler);
try {
receiveThread.join();
} catch (InterruptedException ignored) {
}
} }
private JsonNode handleRequest( private JsonNode handleRequest(
@ -166,22 +163,4 @@ public class JsonRpcDispatcherCommand implements LocalCommand {
} }
command.handleCommand(requestParams, m, outputWriter); command.handleCommand(requestParams, m, outputWriter);
} }
private Thread receiveMessages(JsonWriter jsonWriter, Manager m) {
final var thread = new Thread(() -> {
while (!Thread.interrupted()) {
try {
final var receiveMessageHandler = new JsonReceiveMessageHandler(m, jsonWriter);
m.receiveMessages(receiveMessageHandler);
break;
} catch (IOException e) {
logger.warn("Receiving messages failed, retrying", e);
}
}
});
thread.start();
return thread;
}
} }

View file

@ -423,6 +423,21 @@ public class DbusManagerImpl implements Manager {
signal.sendSyncRequest(); signal.sendSyncRequest();
} }
@Override
public void addReceiveHandler(final ReceiveMessageHandler handler) {
throw new UnsupportedOperationException();
}
@Override
public void removeReceiveHandler(final ReceiveMessageHandler handler) {
throw new UnsupportedOperationException();
}
@Override
public boolean isReceiving() {
throw new UnsupportedOperationException();
}
@Override @Override
public void receiveMessages(final ReceiveMessageHandler handler) throws IOException { public void receiveMessages(final ReceiveMessageHandler handler) throws IOException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();