Only handle jsonRpc requests, after receive thread has caught up with old messages

This commit is contained in:
AsamK 2021-09-04 15:06:25 +02:00
parent ac18006abb
commit 5a2e37a6e2
2 changed files with 29 additions and 5 deletions

View file

@ -141,6 +141,7 @@ public class Manager implements Closeable {
private final IncomingMessageHandler incomingMessageHandler; private final IncomingMessageHandler incomingMessageHandler;
private final Context context; private final Context context;
private boolean hasCaughtUpWithOldMessages = false;
Manager( Manager(
SignalAccount account, SignalAccount account,
@ -865,7 +866,7 @@ public class Manager implements Closeable {
final var signalWebSocket = dependencies.getSignalWebSocket(); final var signalWebSocket = dependencies.getSignalWebSocket();
signalWebSocket.connect(); signalWebSocket.connect();
var hasCaughtUpWithOldMessages = false; hasCaughtUpWithOldMessages = false;
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
SignalServiceEnvelope envelope; SignalServiceEnvelope envelope;
@ -885,11 +886,14 @@ public class Manager implements Closeable {
envelope = result.get(); envelope = result.get();
} else { } else {
// Received indicator that server queue is empty // Received indicator that server queue is empty
hasCaughtUpWithOldMessages = true;
handleQueuedActions(queuedActions); handleQueuedActions(queuedActions);
queuedActions.clear(); queuedActions.clear();
hasCaughtUpWithOldMessages = true;
synchronized (this) {
this.notifyAll();
}
// Continue to wait another timeout for new messages // Continue to wait another timeout for new messages
continue; continue;
} }
@ -936,17 +940,27 @@ public class Manager implements Closeable {
handleQueuedActions(queuedActions); handleQueuedActions(queuedActions);
} }
public boolean hasCaughtUpWithOldMessages() {
return hasCaughtUpWithOldMessages;
}
private void handleQueuedActions(final Collection<HandleAction> queuedActions) { private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
var interrupted = false;
for (var action : queuedActions) { for (var action : queuedActions) {
try { try {
action.execute(context); action.execute(context);
} catch (Throwable e) { } catch (Throwable e) {
if (e instanceof AssertionError && e.getCause() instanceof InterruptedException) { if ((e instanceof AssertionError || e instanceof RuntimeException)
Thread.currentThread().interrupt(); && e.getCause() instanceof InterruptedException) {
interrupted = true;
continue;
} }
logger.warn("Message action failed.", e); logger.warn("Message action failed.", e);
} }
} }
if (interrupted) {
Thread.currentThread().interrupt();
}
} }
public boolean isContactBlocked(final RecipientIdentifier.Single recipient) { public boolean isContactBlocked(final RecipientIdentifier.Single recipient) {

View file

@ -75,6 +75,16 @@ public class JsonRpcDispatcherCommand implements LocalCommand {
objectMapper.valueToTree(s), objectMapper.valueToTree(s),
null)), m, ignoreAttachments); null)), m, ignoreAttachments);
// Maybe this should be handled inside the Manager
while (!m.hasCaughtUpWithOldMessages()) {
try {
synchronized (m) {
m.wait();
}
} catch (InterruptedException ignored) {
}
}
final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
final var jsonRpcReader = new JsonRpcReader(jsonRpcSender, () -> { final var jsonRpcReader = new JsonRpcReader(jsonRpcSender, () -> {