remove receiveMessageAndReadStdin

This commit is contained in:
technillogue 2021-04-25 19:34:40 -04:00
parent a1f9597aa3
commit 87c0bfc728
2 changed files with 68 additions and 184 deletions

View file

@ -1698,114 +1698,6 @@ public class Manager implements Closeable {
cachedMessage.delete();
}
public void receiveMessagesAndReadStdin(
long timeout,
TimeUnit unit,
boolean returnOnTimeout,
boolean ignoreAttachments,
ReceiveMessageHandler handler
) throws IOException {
retryFailedReceivedMessages(handler, ignoreAttachments);
final SignalServiceMessageReceiver messageReceiver = getMessageReceiver();
Set<HandleAction> queuedActions = null;
if (messagePipe == null) {
messagePipe = messageReceiver.createMessagePipe();
}
boolean hasCaughtUpWithOldMessages = false;
while (true) {
SignalServiceEnvelope envelope;
SignalServiceContent content = null;
Exception exception = null;
final long now = new Date().getTime();
try {
Optional<SignalServiceEnvelope> result = messagePipe.readOrEmpty(timeout, unit, envelope1 -> {
// store message on disk, before acknowledging receipt to the server
try {
String source = envelope1.getSourceE164().isPresent() ? envelope1.getSourceE164().get() : "";
File cacheFile = getMessageCacheFile(source, now, envelope1.getTimestamp());
Utils.storeEnvelope(envelope1, cacheFile);
} catch (IOException e) {
System.err.println("Failed to store encrypted message in disk cache, ignoring: " + e.getMessage());
}
});
if (result.isPresent()) {
envelope = result.get();
} else {
// Received indicator that server queue is empty
hasCaughtUpWithOldMessages = true;
if (queuedActions != null) {
for (HandleAction action : queuedActions) {
try {
action.execute(this);
} catch (Throwable e) {
e.printStackTrace();
}
}
queuedActions.clear();
queuedActions = null;
}
// Continue to wait another timeout for new messages
continue;
}
} catch (TimeoutException e) {
if (returnOnTimeout)
return;
continue;
} catch (InvalidVersionException e) {
System.err.println("Ignoring error: " + e.getMessage());
continue;
}
if (envelope.hasSource()) {
// Store uuid if we don't have it already
SignalServiceAddress source = envelope.getSourceAddress();
resolveSignalServiceAddress(source);
}
if (!envelope.isReceipt()) {
try {
content = decryptMessage(envelope);
} catch (Exception e) {
exception = e;
}
List<HandleAction> actions = handleMessage(envelope, content, ignoreAttachments);
if (hasCaughtUpWithOldMessages) {
for (HandleAction action : actions) {
try {
action.execute(this);
} catch (Throwable e) {
e.printStackTrace();
}
}
} else {
if (queuedActions == null) {
queuedActions = new HashSet<>();
}
queuedActions.addAll(actions);
}
}
account.save();
if (!isMessageBlocked(envelope, content)) {
handler.handleMessage(envelope, content, exception);
}
if (!(exception instanceof org.whispersystems.libsignal.UntrustedIdentityException)) {
File cacheFile = null;
try {
String source = envelope.getSourceE164().isPresent() ? envelope.getSourceE164().get() : "";
cacheFile = getMessageCacheFile(source, now, envelope.getTimestamp());
Files.delete(cacheFile.toPath());
// Try to delete directory if empty
new File(getMessageCachePath()).delete();
} catch (IOException e) {
System.err.println("Failed to delete cached message file “" + cacheFile + "”: " + e.getMessage());
}
}
}
}
public void receiveMessages(
long timeout,
TimeUnit unit,

View file

@ -1,93 +1,88 @@
package org.asamk.signal.commands;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.asamk.signal.DbusReceiveMessageHandler;
import org.asamk.signal.JsonDbusReceiveMessageHandler;
import org.asamk.signal.ReceiveMessageHandler;
import org.asamk.signal.JsonReceiveMessageHandler;
import org.asamk.signal.dbus.DbusSignalImpl;
import org.asamk.signal.manager.Manager;
import org.asamk.signal.ReceiveMessageHandler;
import org.asamk.signal.manager.AttachmentInvalidException;
import org.freedesktop.dbus.connections.impl.DBusConnection;
import org.freedesktop.dbus.exceptions.DBusException;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import static org.asamk.signal.DbusConfig.SIGNAL_BUSNAME;
import static org.asamk.signal.DbusConfig.SIGNAL_OBJECTPATH;
import static org.asamk.signal.util.ErrorUtils.handleAssertionError;
import org.asamk.signal.manager.Manager;
import org.whispersystems.signalservice.api.push.exceptions.EncapsulatedExceptions;
import org.whispersystems.signalservice.api.util.InvalidNumberException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.asamk.signal.util.ErrorUtils.handleAssertionError;
class JsonInterface {
public String commandName;
public String recipient;
public String content;
public JsonNode details;
}
class InputReader implements Runnable {
private volatile boolean alive = true;
private Manager m;
private final Manager m;
InputReader(final Manager m) {
this.m = m;
}
public void terminate() {
this.alive = false;
this.alive = false;
}
InputReader (final Manager m) {
this.m = m;
}
@Override
public void run() {
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
ObjectMapper jsonProcessor = new ObjectMapper();
while (alive) {
try {
String in = br.readLine();
if (in != null) {
while (alive) {
try {
String in = br.readLine();
if (in != null) {
JsonInterface command = jsonProcessor.readValue(in, JsonInterface.class);
if (command.commandName.equals("sendMessage")){
List<String> recipients = new ArrayList<String>();
recipients.add(command.recipient);
List<String> attachments = new ArrayList<>();
if (command.details != null && command.details.has("attachments") ) {
command.details.get("attachments").forEach(attachment -> {
if (attachment.isTextual()){
attachments.add(attachment.asText());
}
});
}
try {
// verbosity flag? better yet, json acknowledgement with timestamp or message id?
System.out.println("sentMessage '" + command.content + "' to " + command.recipient);
this.m.sendMessage(command.content, attachments, recipients);
} catch (AssertionError | EncapsulatedExceptions| AttachmentInvalidException | InvalidNumberException e) {
if (command.commandName.equals("sendMessage")) {
List<String> recipients = new ArrayList<String>();
recipients.add(command.recipient);
List<String> attachments = new ArrayList<>();
if (command.details != null && command.details.has("attachments")) {
command.details.get("attachments").forEach(attachment -> {
if (attachment.isTextual()) {
attachments.add(attachment.asText());
}
});
}
try {
// verbosity flag? better yet, json acknowledgement with timestamp or message id?
System.out.println("sentMessage '" + command.content + "' to " + command.recipient);
this.m.sendMessage(command.content, attachments, recipients);
} catch (AssertionError | EncapsulatedExceptions | AttachmentInvalidException | InvalidNumberException e) {
System.err.println("error in sending message");
e.printStackTrace(System.out);
}
} /* elif (command.commandName == "sendTyping") {
} /* elif (command.commandName == "sendTyping") {
getMessageSender().sendTyping(signalServiceAddress?, ....)
}*/
}
}
} catch (IOException e) {
System.err.println(e);
alive = false;
}
} catch (IOException e) {
System.err.println(e);
alive = false;
}
}
}
}
}
@ -105,30 +100,27 @@ public class StdioCommand implements LocalCommand {
@Override
public int handleCommand(final Namespace ns, final Manager m) {
if (!m.isRegistered()) {
System.err.println("User is not registered.");
boolean ignoreAttachments = ns.getBoolean("ignore_attachments");
InputReader reader = new InputReader(m);
Thread readerThread = new Thread(reader);
readerThread.start();
try {
m.receiveMessages(1,
TimeUnit.HOURS,
false,
ignoreAttachments,
ns.getBoolean("json") ? new JsonReceiveMessageHandler(m) : new ReceiveMessageHandler(m)
/*true*/);
return 0;
} catch (IOException e) {
System.err.println("Error while receiving messages: " + e.getMessage());
return 3;
} catch (AssertionError e) {
handleAssertionError(e);
return 1;
} finally {
reader.terminate();
}
boolean ignoreAttachments = ns.getBoolean("ignore_attachments");
InputReader reader = new InputReader(m);
Thread readerThread = new Thread(reader);
readerThread.start();
try {
m.receiveMessagesAndReadStdin(1, TimeUnit.HOURS, false, ignoreAttachments,
ns.getBoolean("json")
? new JsonReceiveMessageHandler(m)
: new ReceiveMessageHandler(m)
/*true*/);
return 0;
} catch (IOException e) {
System.err.println("Error while receiving messages: " + e.getMessage());
return 3;
} catch (AssertionError e) {
handleAssertionError(e);
return 1;
} finally {
reader.terminate();
}
}
}