Improve stop receive handling

Only interrupt the receive thread if it is currently waiting for new
messages from the server, otherwise just set a stop flag.
This commit is contained in:
AsamK 2022-02-12 12:26:42 +01:00
parent bb3b9692e3
commit cf0cc50e32
5 changed files with 55 additions and 35 deletions

View file

@ -765,24 +765,17 @@ class ManagerImpl implements Manager {
}
receiveThread = new Thread(() -> {
logger.debug("Starting receiving messages");
while (!Thread.interrupted()) {
try {
context.getReceiveHelper().receiveMessages(Duration.ofMinutes(1), false, (envelope, e) -> {
synchronized (messageHandlers) {
Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
try {
h.handleMessage(envelope, e);
} catch (Exception ex) {
logger.warn("Message handler failed, ignoring", ex);
}
});
context.getReceiveHelper().receiveMessagesContinuously((envelope, e) -> {
synchronized (messageHandlers) {
Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
try {
h.handleMessage(envelope, e);
} catch (Exception ex) {
logger.warn("Message handler failed, ignoring", ex);
}
});
break;
} catch (IOException e) {
logger.warn("Receiving messages failed, retrying", e);
}
}
});
logger.debug("Finished receiving messages");
synchronized (messageHandlers) {
receiveThread = null;
@ -816,7 +809,10 @@ class ManagerImpl implements Manager {
}
private void stopReceiveThread(final Thread thread) {
thread.interrupt();
if (context.getReceiveHelper().requestStopReceiveMessages()) {
logger.debug("Receive stop requested, interrupting read from server.");
thread.interrupt();
}
try {
thread.join();
} catch (InterruptedException ignored) {
@ -1030,14 +1026,15 @@ class ManagerImpl implements Manager {
dependencies.getSignalWebSocket().disconnect();
disposable.dispose();
if (account != null) {
account.close();
}
synchronized (closedListeners) {
closedListeners.forEach(Runnable::run);
closedListeners.clear();
}
if (account != null) {
account.close();
}
account = null;
}
}

View file

@ -347,9 +347,6 @@ public final class ProfileHelper {
.storeProfileAvatar(address,
outputStream -> retrieveProfileAvatar(avatarPath, profileKey, outputStream));
} catch (Throwable e) {
if (e instanceof AssertionError && e.getCause() instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.warn("Failed to download profile avatar, ignoring: {}", e.getMessage());
}
}

View file

@ -2,8 +2,8 @@ package org.asamk.signal.manager.helper;
import org.asamk.signal.manager.Manager;
import org.asamk.signal.manager.SignalDependencies;
import org.asamk.signal.manager.api.UntrustedIdentityException;
import org.asamk.signal.manager.actions.HandleAction;
import org.asamk.signal.manager.api.UntrustedIdentityException;
import org.asamk.signal.manager.storage.SignalAccount;
import org.asamk.signal.manager.storage.messageCache.CachedMessage;
import org.slf4j.Logger;
@ -37,6 +37,8 @@ public class ReceiveHelper {
private boolean ignoreAttachments = false;
private boolean needsToRetryFailedMessages = false;
private boolean hasCaughtUpWithOldMessages = false;
private boolean isWaitingForMessage = false;
private boolean shouldStop = false;
private Callable authenticationFailureListener;
private Callable caughtUpWithOldMessagesListener;
@ -66,6 +68,22 @@ public class ReceiveHelper {
this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
}
public boolean requestStopReceiveMessages() {
this.shouldStop = true;
return isWaitingForMessage;
}
public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
while (!shouldStop) {
try {
receiveMessages(Duration.ofMinutes(1), false, handler);
break;
} catch (IOException e) {
logger.warn("Receiving messages failed, retrying", e);
}
}
}
public void receiveMessages(
Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
) throws IOException {
@ -92,6 +110,7 @@ public class ReceiveHelper {
queuedActions.clear();
dependencies.getSignalWebSocket().disconnect();
webSocketStateDisposable.dispose();
shouldStop = false;
}
}
@ -104,8 +123,9 @@ public class ReceiveHelper {
final var signalWebSocket = dependencies.getSignalWebSocket();
var backOffCounter = 0;
isWaitingForMessage = false;
while (!Thread.interrupted()) {
while (!shouldStop) {
if (needsToRetryFailedMessages) {
retryFailedReceivedMessages(handler);
needsToRetryFailedMessages = false;
@ -118,13 +138,16 @@ public class ReceiveHelper {
}
logger.debug("Checking for new message from server");
try {
isWaitingForMessage = true;
var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
isWaitingForMessage = false;
final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
.resolveRecipient(envelope1.getSourceAddress()) : null;
logger.trace("Storing new message from {}", recipientId);
// store message on disk, before acknowledging receipt to the server
cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
});
isWaitingForMessage = false;
backOffCounter = 0;
if (result.isPresent()) {
@ -143,7 +166,6 @@ public class ReceiveHelper {
}
} catch (AssertionError e) {
if (e.getCause() instanceof InterruptedException) {
Thread.currentThread().interrupt();
break;
} else {
throw e;
@ -255,23 +277,14 @@ public class ReceiveHelper {
private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
logger.debug("Handling message actions");
var interrupted = false;
for (var action : queuedActions) {
logger.debug("Executing action {}", action.getClass().getSimpleName());
try {
action.execute(context);
} catch (Throwable e) {
if ((e instanceof AssertionError || e instanceof RuntimeException)
&& e.getCause() instanceof InterruptedException) {
interrupted = true;
continue;
}
logger.warn("Message action failed.", e);
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
private void onWebSocketStateChange(final WebSocketConnectionState s) {

View file

@ -49,7 +49,7 @@ public class MessageSendLogStore implements AutoCloseable {
this.cleanupThread = new Thread(() -> {
try {
final var interval = Duration.ofHours(1).toMillis();
while (true) {
while (!Thread.interrupted()) {
try (final var connection = database.getConnection()) {
deleteOutdatedEntries(connection);
} catch (SQLException e) {