Use improved shutdown for receive command

This commit is contained in:
AsamK 2023-11-09 19:22:58 +01:00
parent 1058e33f12
commit 5d33f71d4d
4 changed files with 37 additions and 1 deletions

View file

@ -255,6 +255,8 @@ public interface Manager extends Closeable {
Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler
) throws IOException, AlreadyReceivingException; ) throws IOException, AlreadyReceivingException;
void stopReceiveMessages();
void setReceiveConfig(ReceiveConfig receiveConfig); void setReceiveConfig(ReceiveConfig receiveConfig);
boolean isContactBlocked(RecipientIdentifier.Single recipient); boolean isContactBlocked(RecipientIdentifier.Single recipient);

View file

@ -1091,6 +1091,20 @@ public class ManagerImpl implements Manager {
receiveMessages(timeout.orElse(Duration.ofMinutes(1)), timeout.isPresent(), maxMessages.orElse(null), handler); receiveMessages(timeout.orElse(Duration.ofMinutes(1)), timeout.isPresent(), maxMessages.orElse(null), handler);
} }
@Override
public void stopReceiveMessages() {
Thread thread = null;
synchronized (messageHandlers) {
if (isReceivingSynchronous) {
thread = receiveThread;
receiveThread = null;
}
}
if (thread != null) {
stopReceiveThread(thread);
}
}
private void receiveMessages( private void receiveMessages(
Duration timeout, boolean returnOnTimeout, Integer maxMessages, ReceiveMessageHandler handler Duration timeout, boolean returnOnTimeout, Integer maxMessages, ReceiveMessageHandler handler
) throws IOException, AlreadyReceivingException { ) throws IOException, AlreadyReceivingException {

View file

@ -8,6 +8,7 @@ import net.sourceforge.argparse4j.inf.Subparser;
import org.asamk.signal.OutputType; import org.asamk.signal.OutputType;
import org.asamk.signal.ReceiveMessageHandler; import org.asamk.signal.ReceiveMessageHandler;
import org.asamk.signal.Shutdown;
import org.asamk.signal.commands.exceptions.CommandException; import org.asamk.signal.commands.exceptions.CommandException;
import org.asamk.signal.commands.exceptions.IOErrorException; import org.asamk.signal.commands.exceptions.IOErrorException;
import org.asamk.signal.commands.exceptions.UserErrorException; import org.asamk.signal.commands.exceptions.UserErrorException;
@ -67,6 +68,7 @@ public class ReceiveCommand implements LocalCommand, JsonRpcSingleCommand<Receiv
public void handleCommand( public void handleCommand(
final Namespace ns, final Manager m, final OutputWriter outputWriter final Namespace ns, final Manager m, final OutputWriter outputWriter
) throws CommandException { ) throws CommandException {
Shutdown.installHandler();
final var timeout = ns.getDouble("timeout"); final var timeout = ns.getDouble("timeout");
final var maxMessagesRaw = ns.getInt("max-messages"); final var maxMessagesRaw = ns.getInt("max-messages");
final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments")); final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
@ -80,6 +82,7 @@ public class ReceiveCommand implements LocalCommand, JsonRpcSingleCommand<Receiv
}; };
final var duration = timeout < 0 ? null : Duration.ofMillis((long) (timeout * 1000)); final var duration = timeout < 0 ? null : Duration.ofMillis((long) (timeout * 1000));
final var maxMessages = maxMessagesRaw < 0 ? null : maxMessagesRaw; final var maxMessages = maxMessagesRaw < 0 ? null : maxMessagesRaw;
Shutdown.registerShutdownListener(m::stopReceiveMessages);
m.receiveMessages(Optional.ofNullable(duration), Optional.ofNullable(maxMessages), handler); m.receiveMessages(Optional.ofNullable(duration), Optional.ofNullable(maxMessages), handler);
} catch (IOException e) { } catch (IOException e) {
throw new IOErrorException("Error while receiving messages: " + e.getMessage(), e); throw new IOErrorException("Error while receiving messages: " + e.getMessage(), e);

View file

@ -3,6 +3,7 @@ package org.asamk.signal.dbus;
import org.asamk.Signal; import org.asamk.Signal;
import org.asamk.signal.DbusConfig; import org.asamk.signal.DbusConfig;
import org.asamk.signal.manager.Manager; import org.asamk.signal.manager.Manager;
import org.asamk.signal.manager.api.AlreadyReceivingException;
import org.asamk.signal.manager.api.AttachmentInvalidException; import org.asamk.signal.manager.api.AttachmentInvalidException;
import org.asamk.signal.manager.api.CaptchaRequiredException; import org.asamk.signal.manager.api.CaptchaRequiredException;
import org.asamk.signal.manager.api.Configuration; import org.asamk.signal.manager.api.Configuration;
@ -548,10 +549,17 @@ public class DbusManagerImpl implements Manager {
} }
} }
private Thread receiveThread;
@Override @Override
public void receiveMessages( public void receiveMessages(
Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler
) throws IOException { ) throws IOException, AlreadyReceivingException {
if (receiveThread != null) {
throw new AlreadyReceivingException("Already receiving message.");
}
receiveThread = Thread.currentThread();
final var remainingMessages = new AtomicInteger(maxMessages.orElse(-1)); final var remainingMessages = new AtomicInteger(maxMessages.orElse(-1));
final var lastMessage = new AtomicLong(System.currentTimeMillis()); final var lastMessage = new AtomicLong(System.currentTimeMillis());
final var thread = Thread.currentThread(); final var thread = Thread.currentThread();
@ -577,6 +585,7 @@ public class DbusManagerImpl implements Manager {
} }
Thread.sleep(sleepTimeRemaining); Thread.sleep(sleepTimeRemaining);
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
break;
} }
} }
} else { } else {
@ -589,6 +598,14 @@ public class DbusManagerImpl implements Manager {
} }
removeReceiveHandler(receiveHandler); removeReceiveHandler(receiveHandler);
receiveThread = null;
}
@Override
public void stopReceiveMessages() {
if (receiveThread != null) {
receiveThread.interrupt();
}
} }
@Override @Override