Extract ReceiveHelper

This commit is contained in:
AsamK 2021-12-30 22:44:38 +01:00
parent c7a7d00da5
commit f5e5dd551d
3 changed files with 315 additions and 232 deletions

View file

@ -16,7 +16,6 @@
*/ */
package org.asamk.signal.manager; package org.asamk.signal.manager;
import org.asamk.signal.manager.actions.HandleAction;
import org.asamk.signal.manager.api.Configuration; import org.asamk.signal.manager.api.Configuration;
import org.asamk.signal.manager.api.Device; import org.asamk.signal.manager.api.Device;
import org.asamk.signal.manager.api.Group; import org.asamk.signal.manager.api.Group;
@ -43,7 +42,6 @@ import org.asamk.signal.manager.helper.Context;
import org.asamk.signal.manager.storage.SignalAccount; import org.asamk.signal.manager.storage.SignalAccount;
import org.asamk.signal.manager.storage.groups.GroupInfo; import org.asamk.signal.manager.storage.groups.GroupInfo;
import org.asamk.signal.manager.storage.identities.IdentityInfo; import org.asamk.signal.manager.storage.identities.IdentityInfo;
import org.asamk.signal.manager.storage.messageCache.CachedMessage;
import org.asamk.signal.manager.storage.recipients.Contact; import org.asamk.signal.manager.storage.recipients.Contact;
import org.asamk.signal.manager.storage.recipients.Profile; import org.asamk.signal.manager.storage.recipients.Profile;
import org.asamk.signal.manager.storage.recipients.RecipientAddress; import org.asamk.signal.manager.storage.recipients.RecipientAddress;
@ -59,7 +57,6 @@ import org.whispersystems.libsignal.ecc.ECPublicKey;
import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalSessionLock; import org.whispersystems.signalservice.api.SignalSessionLock;
import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage; import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import org.whispersystems.signalservice.api.messages.SignalServiceReceiptMessage; import org.whispersystems.signalservice.api.messages.SignalServiceReceiptMessage;
import org.whispersystems.signalservice.api.messages.SignalServiceTypingMessage; import org.whispersystems.signalservice.api.messages.SignalServiceTypingMessage;
import org.whispersystems.signalservice.api.push.ACI; import org.whispersystems.signalservice.api.push.ACI;
@ -67,8 +64,6 @@ import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedE
import org.whispersystems.signalservice.api.util.DeviceNameUtil; import org.whispersystems.signalservice.api.util.DeviceNameUtil;
import org.whispersystems.signalservice.api.util.InvalidNumberException; import org.whispersystems.signalservice.api.util.InvalidNumberException;
import org.whispersystems.signalservice.api.util.PhoneNumberFormatter; import org.whispersystems.signalservice.api.util.PhoneNumberFormatter;
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
import org.whispersystems.signalservice.internal.util.DynamicCredentialsProvider; import org.whispersystems.signalservice.internal.util.DynamicCredentialsProvider;
import org.whispersystems.signalservice.internal.util.Hex; import org.whispersystems.signalservice.internal.util.Hex;
import org.whispersystems.signalservice.internal.util.Util; import org.whispersystems.signalservice.internal.util.Util;
@ -81,7 +76,6 @@ import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -91,14 +85,10 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import static org.asamk.signal.manager.config.ServiceConfig.capabilities; import static org.asamk.signal.manager.config.ServiceConfig.capabilities;
public class ManagerImpl implements Manager { public class ManagerImpl implements Manager {
@ -113,15 +103,11 @@ public class ManagerImpl implements Manager {
private final Context context; private final Context context;
private boolean hasCaughtUpWithOldMessages = false;
private boolean ignoreAttachments = false;
private Thread receiveThread; private Thread receiveThread;
private final Set<ReceiveMessageHandler> weakHandlers = new HashSet<>(); private final Set<ReceiveMessageHandler> weakHandlers = new HashSet<>();
private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>(); private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
private final List<Runnable> closedListeners = new ArrayList<>(); private final List<Runnable> closedListeners = new ArrayList<>();
private boolean isReceivingSynchronous; private boolean isReceivingSynchronous;
private boolean needsToRetryFailedMessages = false;
ManagerImpl( ManagerImpl(
SignalAccount account, SignalAccount account,
@ -155,6 +141,18 @@ public class ManagerImpl implements Manager {
final var stickerPackStore = new StickerPackStore(pathConfig.stickerPacksPath()); final var stickerPackStore = new StickerPackStore(pathConfig.stickerPacksPath());
this.context = new Context(account, dependencies, avatarStore, attachmentStore, stickerPackStore); this.context = new Context(account, dependencies, avatarStore, attachmentStore, stickerPackStore);
this.context.getReceiveHelper().setAuthenticationFailureListener(() -> {
try {
close();
} catch (IOException e) {
logger.warn("Failed to close account after authentication failure", e);
}
});
this.context.getReceiveHelper().setCaughtUpWithOldMessagesListener(() -> {
synchronized (this) {
this.notifyAll();
}
});
} }
@Override @Override
@ -257,7 +255,7 @@ public class ManagerImpl implements Manager {
@Override @Override
public void updateConfiguration( public void updateConfiguration(
Configuration configuration Configuration configuration
) throws IOException, NotMasterDeviceException { ) throws NotMasterDeviceException {
if (!account.isMasterDevice()) { if (!account.isMasterDevice()) {
throw new NotMasterDeviceException(); throw new NotMasterDeviceException();
} }
@ -762,7 +760,7 @@ public class ManagerImpl implements Manager {
@Override @Override
public void setGroupBlocked( public void setGroupBlocked(
final GroupId groupId, final boolean blocked final GroupId groupId, final boolean blocked
) throws GroupNotFoundException, IOException, NotMasterDeviceException { ) throws GroupNotFoundException, NotMasterDeviceException {
if (!account.isMasterDevice()) { if (!account.isMasterDevice()) {
throw new NotMasterDeviceException(); throw new NotMasterDeviceException();
} }
@ -832,54 +830,6 @@ public class ManagerImpl implements Manager {
} }
} }
private void retryFailedReceivedMessages(ReceiveMessageHandler handler) {
Set<HandleAction> queuedActions = new HashSet<>();
for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
var actions = retryFailedReceivedMessage(handler, cachedMessage);
if (actions != null) {
queuedActions.addAll(actions);
}
}
handleQueuedActions(queuedActions);
}
private List<HandleAction> retryFailedReceivedMessage(
final ReceiveMessageHandler handler, final CachedMessage cachedMessage
) {
var envelope = cachedMessage.loadEnvelope();
if (envelope == null) {
cachedMessage.delete();
return null;
}
final var result = context.getIncomingMessageHandler()
.handleRetryEnvelope(envelope, ignoreAttachments, handler);
final var actions = result.first();
final var exception = result.second();
if (exception instanceof UntrustedIdentityException) {
if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
// Envelope is more than a month old, cleaning up.
cachedMessage.delete();
return null;
}
if (!envelope.hasSourceUuid()) {
final var identifier = ((UntrustedIdentityException) exception).getSender();
final var recipientId = account.getRecipientStore().resolveRecipient(identifier);
try {
account.getMessageCache().replaceSender(cachedMessage, recipientId);
} catch (IOException ioException) {
logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
}
}
return null;
}
// If successful and for all other errors that are not recoverable, delete the cached message
cachedMessage.delete();
return actions;
}
@Override @Override
public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) { public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) {
if (isReceivingSynchronous) { if (isReceivingSynchronous) {
@ -903,7 +853,7 @@ public class ManagerImpl implements Manager {
logger.debug("Starting receiving messages"); logger.debug("Starting receiving messages");
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
try { try {
receiveMessagesInternal(Duration.ofMinutes(1), false, (envelope, e) -> { context.getReceiveHelper().receiveMessages(Duration.ofMinutes(1), false, (envelope, e) -> {
synchronized (messageHandlers) { synchronized (messageHandlers) {
Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
try { try {
@ -920,7 +870,6 @@ public class ManagerImpl implements Manager {
} }
} }
logger.debug("Finished receiving messages"); logger.debug("Finished receiving messages");
hasCaughtUpWithOldMessages = false;
synchronized (messageHandlers) { synchronized (messageHandlers) {
receiveThread = null; receiveThread = null;
@ -988,180 +937,21 @@ public class ManagerImpl implements Manager {
isReceivingSynchronous = true; isReceivingSynchronous = true;
receiveThread = Thread.currentThread(); receiveThread = Thread.currentThread();
try { try {
receiveMessagesInternal(timeout, returnOnTimeout, handler); context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, handler);
} finally { } finally {
receiveThread = null; receiveThread = null;
hasCaughtUpWithOldMessages = false;
isReceivingSynchronous = false; isReceivingSynchronous = false;
} }
} }
private void receiveMessagesInternal(
Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler
) throws IOException {
needsToRetryFailedMessages = true;
// Use a Map here because java Set doesn't have a get method ...
Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
final var signalWebSocket = dependencies.getSignalWebSocket();
final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
signalWebSocket.getWebSocketState())
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.distinctUntilChanged()
.subscribe(this::onWebSocketStateChange);
signalWebSocket.connect();
hasCaughtUpWithOldMessages = false;
var backOffCounter = 0;
final var MAX_BACKOFF_COUNTER = 9;
while (!Thread.interrupted()) {
if (needsToRetryFailedMessages) {
retryFailedReceivedMessages(handler);
needsToRetryFailedMessages = false;
}
SignalServiceEnvelope envelope;
final CachedMessage[] cachedMessage = {null};
final var nowMillis = System.currentTimeMillis();
if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
account.setLastReceiveTimestamp(nowMillis);
}
logger.debug("Checking for new message from server");
try {
var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
.resolveRecipient(envelope1.getSourceAddress()) : null;
// store message on disk, before acknowledging receipt to the server
cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
});
backOffCounter = 0;
if (result.isPresent()) {
envelope = result.get();
logger.debug("New message received from server");
} else {
logger.debug("Received indicator that server queue is empty");
handleQueuedActions(queuedActions.keySet());
queuedActions.clear();
hasCaughtUpWithOldMessages = true;
synchronized (this) {
this.notifyAll();
}
// Continue to wait another timeout for new messages
continue;
}
} catch (AssertionError e) {
if (e.getCause() instanceof InterruptedException) {
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
} catch (IOException e) {
logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
try {
Thread.sleep(sleepMilliseconds);
} catch (InterruptedException interruptedException) {
return;
}
hasCaughtUpWithOldMessages = false;
signalWebSocket.connect();
continue;
}
throw e;
} catch (TimeoutException e) {
backOffCounter = 0;
if (returnOnTimeout) return;
continue;
}
final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, ignoreAttachments, handler);
for (final var h : result.first()) {
final var existingAction = queuedActions.get(h);
if (existingAction == null) {
queuedActions.put(h, h);
} else {
existingAction.mergeOther(h);
}
}
final var exception = result.second();
if (hasCaughtUpWithOldMessages) {
handleQueuedActions(queuedActions.keySet());
queuedActions.clear();
}
if (cachedMessage[0] != null) {
if (exception instanceof UntrustedIdentityException) {
logger.debug("Keeping message with untrusted identity in message cache");
final var address = ((UntrustedIdentityException) exception).getSender();
final var recipientId = account.getRecipientStore().resolveRecipient(address);
if (!envelope.hasSourceUuid()) {
try {
cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
} catch (IOException ioException) {
logger.warn("Failed to move cached message to recipient folder: {}",
ioException.getMessage());
}
}
} else {
cachedMessage[0].delete();
}
}
}
handleQueuedActions(queuedActions.keySet());
queuedActions.clear();
dependencies.getSignalWebSocket().disconnect();
webSocketStateDisposable.dispose();
}
private void onWebSocketStateChange(final WebSocketConnectionState s) {
if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
account.setRegistered(false);
try {
close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override @Override
public void setIgnoreAttachments(final boolean ignoreAttachments) { public void setIgnoreAttachments(final boolean ignoreAttachments) {
this.ignoreAttachments = ignoreAttachments; context.getReceiveHelper().setIgnoreAttachments(ignoreAttachments);
} }
@Override @Override
public boolean hasCaughtUpWithOldMessages() { public boolean hasCaughtUpWithOldMessages() {
return hasCaughtUpWithOldMessages; return context.getReceiveHelper().hasCaughtUpWithOldMessages();
}
private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
logger.debug("Handling message actions");
var interrupted = false;
for (var action : queuedActions) {
logger.debug("Executing action {}", action.getClass().getSimpleName());
try {
action.execute(context);
} catch (Throwable e) {
if ((e instanceof AssertionError || e instanceof RuntimeException)
&& e.getCause() instanceof InterruptedException) {
interrupted = true;
continue;
}
logger.warn("Message action failed.", e);
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
} }
@Override @Override
@ -1268,7 +1058,7 @@ public class ManagerImpl implements Manager {
} }
final var updated = context.getIdentityHelper().trustIdentityVerified(recipientId, fingerprint); final var updated = context.getIdentityHelper().trustIdentityVerified(recipientId, fingerprint);
if (updated && this.isReceiving()) { if (updated && this.isReceiving()) {
needsToRetryFailedMessages = true; context.getReceiveHelper().setNeedsToRetryFailedMessages(true);
} }
return updated; return updated;
} }
@ -1291,7 +1081,7 @@ public class ManagerImpl implements Manager {
} }
final var updated = context.getIdentityHelper().trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber); final var updated = context.getIdentityHelper().trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber);
if (updated && this.isReceiving()) { if (updated && this.isReceiving()) {
needsToRetryFailedMessages = true; context.getReceiveHelper().setNeedsToRetryFailedMessages(true);
} }
return updated; return updated;
} }
@ -1314,7 +1104,7 @@ public class ManagerImpl implements Manager {
} }
final var updated = context.getIdentityHelper().trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber); final var updated = context.getIdentityHelper().trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber);
if (updated && this.isReceiving()) { if (updated && this.isReceiving()) {
needsToRetryFailedMessages = true; context.getReceiveHelper().setNeedsToRetryFailedMessages(true);
} }
return updated; return updated;
} }
@ -1334,7 +1124,7 @@ public class ManagerImpl implements Manager {
} }
final var updated = context.getIdentityHelper().trustIdentityAllKeys(recipientId); final var updated = context.getIdentityHelper().trustIdentityAllKeys(recipientId);
if (updated && this.isReceiving()) { if (updated && this.isReceiving()) {
needsToRetryFailedMessages = true; context.getReceiveHelper().setNeedsToRetryFailedMessages(true);
} }
return updated; return updated;
} }

View file

@ -29,6 +29,7 @@ public class Context {
private PinHelper pinHelper; private PinHelper pinHelper;
private PreKeyHelper preKeyHelper; private PreKeyHelper preKeyHelper;
private ProfileHelper profileHelper; private ProfileHelper profileHelper;
private ReceiveHelper receiveHelper;
private RecipientHelper recipientHelper; private RecipientHelper recipientHelper;
private SendHelper sendHelper; private SendHelper sendHelper;
private StorageHelper storageHelper; private StorageHelper storageHelper;
@ -111,6 +112,10 @@ public class Context {
return getOrCreate(() -> profileHelper, () -> profileHelper = new ProfileHelper(this)); return getOrCreate(() -> profileHelper, () -> profileHelper = new ProfileHelper(this));
} }
public ReceiveHelper getReceiveHelper() {
return getOrCreate(() -> receiveHelper, () -> receiveHelper = new ReceiveHelper(this));
}
public RecipientHelper getRecipientHelper() { public RecipientHelper getRecipientHelper() {
return getOrCreate(() -> recipientHelper, () -> recipientHelper = new RecipientHelper(this)); return getOrCreate(() -> recipientHelper, () -> recipientHelper = new RecipientHelper(this));
} }

View file

@ -0,0 +1,288 @@
package org.asamk.signal.manager.helper;
import org.asamk.signal.manager.Manager;
import org.asamk.signal.manager.SignalDependencies;
import org.asamk.signal.manager.UntrustedIdentityException;
import org.asamk.signal.manager.actions.HandleAction;
import org.asamk.signal.manager.storage.SignalAccount;
import org.asamk.signal.manager.storage.messageCache.CachedMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class ReceiveHelper {
private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
private final static int MAX_BACKOFF_COUNTER = 9;
private final SignalAccount account;
private final SignalDependencies dependencies;
private final Context context;
private boolean ignoreAttachments = false;
private boolean needsToRetryFailedMessages = false;
private boolean hasCaughtUpWithOldMessages = false;
private Callable authenticationFailureListener;
private Callable caughtUpWithOldMessagesListener;
public ReceiveHelper(final Context context) {
this.account = context.getAccount();
this.dependencies = context.getDependencies();
this.context = context;
}
public void setIgnoreAttachments(final boolean ignoreAttachments) {
this.ignoreAttachments = ignoreAttachments;
}
public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
this.needsToRetryFailedMessages = needsToRetryFailedMessages;
}
public boolean hasCaughtUpWithOldMessages() {
return hasCaughtUpWithOldMessages;
}
public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
this.authenticationFailureListener = authenticationFailureListener;
}
public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
}
public void receiveMessages(
Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
) throws IOException {
needsToRetryFailedMessages = true;
hasCaughtUpWithOldMessages = false;
// Use a Map here because java Set doesn't have a get method ...
Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
final var signalWebSocket = dependencies.getSignalWebSocket();
final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
signalWebSocket.getWebSocketState())
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.distinctUntilChanged()
.subscribe(this::onWebSocketStateChange);
signalWebSocket.connect();
try {
receiveMessagesInternal(timeout, returnOnTimeout, handler, queuedActions);
} finally {
hasCaughtUpWithOldMessages = false;
handleQueuedActions(queuedActions.keySet());
queuedActions.clear();
dependencies.getSignalWebSocket().disconnect();
webSocketStateDisposable.dispose();
}
}
private void receiveMessagesInternal(
Duration timeout,
boolean returnOnTimeout,
Manager.ReceiveMessageHandler handler,
final Map<HandleAction, HandleAction> queuedActions
) throws IOException {
final var signalWebSocket = dependencies.getSignalWebSocket();
var backOffCounter = 0;
while (!Thread.interrupted()) {
if (needsToRetryFailedMessages) {
retryFailedReceivedMessages(handler);
needsToRetryFailedMessages = false;
}
SignalServiceEnvelope envelope;
final CachedMessage[] cachedMessage = {null};
final var nowMillis = System.currentTimeMillis();
if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
account.setLastReceiveTimestamp(nowMillis);
}
logger.debug("Checking for new message from server");
try {
var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
.resolveRecipient(envelope1.getSourceAddress()) : null;
logger.trace("Storing new message from {}", recipientId);
// store message on disk, before acknowledging receipt to the server
cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
});
backOffCounter = 0;
if (result.isPresent()) {
envelope = result.get();
logger.debug("New message received from server");
} else {
logger.debug("Received indicator that server queue is empty");
handleQueuedActions(queuedActions.keySet());
queuedActions.clear();
hasCaughtUpWithOldMessages = true;
caughtUpWithOldMessagesListener.call();
// Continue to wait another timeout for new messages
continue;
}
} catch (AssertionError e) {
if (e.getCause() instanceof InterruptedException) {
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
} catch (IOException e) {
logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
try {
Thread.sleep(sleepMilliseconds);
} catch (InterruptedException interruptedException) {
return;
}
hasCaughtUpWithOldMessages = false;
signalWebSocket.connect();
continue;
}
throw e;
} catch (TimeoutException e) {
backOffCounter = 0;
if (returnOnTimeout) return;
continue;
}
final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, ignoreAttachments, handler);
for (final var h : result.first()) {
final var existingAction = queuedActions.get(h);
if (existingAction == null) {
queuedActions.put(h, h);
} else {
existingAction.mergeOther(h);
}
}
final var exception = result.second();
if (hasCaughtUpWithOldMessages) {
handleQueuedActions(queuedActions.keySet());
queuedActions.clear();
}
if (cachedMessage[0] != null) {
if (exception instanceof UntrustedIdentityException) {
logger.debug("Keeping message with untrusted identity in message cache");
final var address = ((UntrustedIdentityException) exception).getSender();
final var recipientId = account.getRecipientStore().resolveRecipient(address);
if (!envelope.hasSourceUuid()) {
try {
cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
} catch (IOException ioException) {
logger.warn("Failed to move cached message to recipient folder: {}",
ioException.getMessage());
}
}
} else {
cachedMessage[0].delete();
}
}
}
}
private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
Set<HandleAction> queuedActions = new HashSet<>();
for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
var actions = retryFailedReceivedMessage(handler, cachedMessage);
if (actions != null) {
queuedActions.addAll(actions);
}
}
handleQueuedActions(queuedActions);
}
private List<HandleAction> retryFailedReceivedMessage(
final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
) {
var envelope = cachedMessage.loadEnvelope();
if (envelope == null) {
cachedMessage.delete();
return null;
}
final var result = context.getIncomingMessageHandler()
.handleRetryEnvelope(envelope, ignoreAttachments, handler);
final var actions = result.first();
final var exception = result.second();
if (exception instanceof UntrustedIdentityException) {
if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
// Envelope is more than a month old, cleaning up.
cachedMessage.delete();
return null;
}
if (!envelope.hasSourceUuid()) {
final var identifier = ((UntrustedIdentityException) exception).getSender();
final var recipientId = account.getRecipientStore().resolveRecipient(identifier);
try {
account.getMessageCache().replaceSender(cachedMessage, recipientId);
} catch (IOException ioException) {
logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
}
}
return null;
}
// If successful and for all other errors that are not recoverable, delete the cached message
cachedMessage.delete();
return actions;
}
private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
logger.debug("Handling message actions");
var interrupted = false;
for (var action : queuedActions) {
logger.debug("Executing action {}", action.getClass().getSimpleName());
try {
action.execute(context);
} catch (Throwable e) {
if ((e instanceof AssertionError || e instanceof RuntimeException)
&& e.getCause() instanceof InterruptedException) {
interrupted = true;
continue;
}
logger.warn("Message action failed.", e);
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
private void onWebSocketStateChange(final WebSocketConnectionState s) {
if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
account.setRegistered(false);
authenticationFailureListener.call();
}
}
public interface Callable {
void call();
}
}