Handle queued actions also when thread is interrupted

This commit is contained in:
AsamK 2021-08-23 14:39:40 +02:00
parent 9a9dd3b217
commit 6dd1a21606
4 changed files with 19 additions and 27 deletions

View file

@ -1790,16 +1790,7 @@ public class Manager implements Closeable {
queuedActions.addAll(actions); queuedActions.addAll(actions);
} }
} }
for (var action : queuedActions) { handleQueuedActions(queuedActions);
try {
action.execute(this);
} catch (Throwable e) {
if (e instanceof AssertionError && e.getCause() instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.warn("Message action failed.", e);
}
}
} }
private List<HandleAction> retryFailedReceivedMessage( private List<HandleAction> retryFailedReceivedMessage(
@ -1843,7 +1834,7 @@ public class Manager implements Closeable {
boolean returnOnTimeout, boolean returnOnTimeout,
boolean ignoreAttachments, boolean ignoreAttachments,
ReceiveMessageHandler handler ReceiveMessageHandler handler
) throws IOException, InterruptedException { ) throws IOException {
retryFailedReceivedMessages(handler, ignoreAttachments); retryFailedReceivedMessages(handler, ignoreAttachments);
Set<HandleAction> queuedActions = new HashSet<>(); Set<HandleAction> queuedActions = new HashSet<>();
@ -1875,16 +1866,7 @@ public class Manager implements Closeable {
// Received indicator that server queue is empty // Received indicator that server queue is empty
hasCaughtUpWithOldMessages = true; hasCaughtUpWithOldMessages = true;
for (var action : queuedActions) { handleQueuedActions(queuedActions);
try {
action.execute(this);
} catch (Throwable e) {
if (e instanceof AssertionError && e.getCause() instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.warn("Message action failed.", e);
}
}
queuedActions.clear(); queuedActions.clear();
// Continue to wait another timeout for new messages // Continue to wait another timeout for new messages
@ -1892,7 +1874,8 @@ public class Manager implements Closeable {
} }
} catch (AssertionError e) { } catch (AssertionError e) {
if (e.getCause() instanceof InterruptedException) { if (e.getCause() instanceof InterruptedException) {
throw (InterruptedException) e.getCause(); Thread.currentThread().interrupt();
break;
} else { } else {
throw e; throw e;
} }
@ -1970,6 +1953,20 @@ public class Manager implements Closeable {
} }
} }
} }
handleQueuedActions(queuedActions);
}
private void handleQueuedActions(final Set<HandleAction> queuedActions) {
for (var action : queuedActions) {
try {
action.execute(this);
} catch (Throwable e) {
if (e instanceof AssertionError && e.getCause() instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.warn("Message action failed.", e);
}
}
} }
private boolean isMessageBlocked( private boolean isMessageBlocked(

View file

@ -134,8 +134,6 @@ public class DaemonCommand implements MultiLocalCommand {
break; break;
} catch (IOException e) { } catch (IOException e) {
logger.warn("Receiving messages failed, retrying", e); logger.warn("Receiving messages failed, retrying", e);
} catch (InterruptedException ignored) {
break;
} }
} }
}); });

View file

@ -168,8 +168,6 @@ public class JsonRpcDispatcherCommand implements LocalCommand {
break; break;
} catch (IOException e) { } catch (IOException e) {
logger.warn("Receiving messages failed, retrying", e); logger.warn("Receiving messages failed, retrying", e);
} catch (InterruptedException e) {
break;
} }
} }
}); });

View file

@ -158,7 +158,6 @@ public class ReceiveCommand implements ExtendedDbusCommand, LocalCommand {
handler); handler);
} catch (IOException e) { } catch (IOException e) {
throw new IOErrorException("Error while receiving messages: " + e.getMessage()); throw new IOErrorException("Error while receiving messages: " + e.getMessage());
} catch (InterruptedException ignored) {
} }
} }
} }