diff --git a/src/main/java/org/asamk/signal/commands/Commands.java b/src/main/java/org/asamk/signal/commands/Commands.java index 6f262fdf..229e0aa8 100644 --- a/src/main/java/org/asamk/signal/commands/Commands.java +++ b/src/main/java/org/asamk/signal/commands/Commands.java @@ -26,6 +26,9 @@ public class Commands { addCommand("updateAccount", new UpdateAccountCommand()); addCommand("updateGroup", new UpdateGroupCommand()); addCommand("verify", new VerifyCommand()); + + // mqtt commands + addCommand("mqtt", new MqttCommand()); } public static Map getCommands() { diff --git a/src/main/java/org/asamk/signal/commands/MqttCommand.java b/src/main/java/org/asamk/signal/commands/MqttCommand.java new file mode 100644 index 00000000..dec4ad9c --- /dev/null +++ b/src/main/java/org/asamk/signal/commands/MqttCommand.java @@ -0,0 +1,86 @@ +package org.asamk.signal.commands; + +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.asamk.signal.manager.Manager; +import org.asamk.signal.mqtt.MqttReceiveMessageHandler; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.asamk.signal.util.ErrorUtils.handleAssertionError; + +public class MqttCommand implements LocalCommand { + + private static String DEFAULT_MQTT_BROKER = "tcp://127.0.0.1:1883"; + + @Override + public void attachToSubparser(final Subparser subparser) { + //subparser.addArgument("--json") + // .help("Output received messages in json format, one json object per line.") + // .action(Arguments.storeTrue()); + subparser.addArgument("-b", "--broker") + .help("Output received messages in json format, one json object per line.") + .action(Arguments.store()); + } + + @Override + public int handleCommand(final Namespace ns, final Manager m) { + if (!m.isRegistered()) { + System.err.println("User is not registered."); + return 1; + } + // TODO: start new thread to also send messages + String brokerInput = ns.getString("broker"); + + String broker = brokerInput != null ? brokerInput : DEFAULT_MQTT_BROKER; + String clientId = "signal-cli"; + + MqttClient mqttClient = null; + try { + // connect to mqtt + mqttClient = new MqttClient(broker, clientId); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + System.out.println("Connecting to broker: " + broker); + mqttClient.connect(connOpts); + System.out.println("Connected"); + boolean ignoreAttachments = false; + try { + m.receiveMessages(1, + TimeUnit.HOURS, + false, + ignoreAttachments, + new MqttReceiveMessageHandler(m, mqttClient)); + return 0; + } catch (IOException e) { + System.err.println("Error while receiving messages: " + e.getMessage()); + return 3; + } catch (AssertionError e) { + handleAssertionError(e); + return 1; + } + } catch(MqttException me) { + System.err.println("Error while handling mqtt: " + me.getMessage()); + me.printStackTrace(); + return 1; + } finally { + if(mqttClient != null) { + try { + mqttClient.disconnect(); + return 0; + } + catch (MqttException me) + { + System.err.println("Error while closing mqtt connection: " + me.getMessage()); + return 1; + } + } + } + } +} diff --git a/src/main/java/org/asamk/signal/mqtt/MqttReceiveMessageHandler.java b/src/main/java/org/asamk/signal/mqtt/MqttReceiveMessageHandler.java new file mode 100644 index 00000000..a9e23cb0 --- /dev/null +++ b/src/main/java/org/asamk/signal/mqtt/MqttReceiveMessageHandler.java @@ -0,0 +1,92 @@ +package org.asamk.signal.mqtt; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.asamk.signal.JsonError; +import org.asamk.signal.JsonMessageEnvelope; +import org.asamk.signal.manager.Manager; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.whispersystems.signalservice.api.messages.SignalServiceContent; +import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; + +/** + * Handler class that passes incoming Signal messages to an mqtt broker. + */ +public class MqttReceiveMessageHandler implements Manager.ReceiveMessageHandler { + + public static String DEFAULT_TOPIC = "signal-cli/messages/incoming"; + + private static int DEFAULT_QUALITY_OF_SERVICE = 2; + + private final MqttClient mqttClient; + private final ObjectMapper jsonProcessor; + + /** + * Creates a new instance that passes all incoming messages to the provided mqttClient. + * @param mqttClient the broker to pass all the incoming messages to + */ + public MqttReceiveMessageHandler(MqttClient mqttClient) + { + this.mqttClient = mqttClient; + this.jsonProcessor = new ObjectMapper(); + jsonProcessor.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // disable autodetect + jsonProcessor.enable(SerializationFeature.WRITE_NULL_MAP_VALUES); + jsonProcessor.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + jsonProcessor.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + } + + /** + * Builds a Json Message from an incoming signal message. + * @param envelope the signal service envelope of the message + * @param content the content of the message + * @param exception an exception that might have occurred on the way of processing + * @return the json encoded message as a string + */ + private String buildJsonMessage(SignalServiceEnvelope envelope, SignalServiceContent content, Throwable exception) { + ObjectNode result = jsonProcessor.createObjectNode(); + if (exception != null) { + result.putPOJO("error", new JsonError(exception)); + } + if (envelope != null) { + result.putPOJO("envelope", new JsonMessageEnvelope(envelope, content)); + } + try { + return jsonProcessor.writeValueAsString(result); + } catch (JsonProcessingException e) { + ObjectNode errorMsg = jsonProcessor.createObjectNode(); + result.putPOJO("error", new JsonError(e)); + try { + return jsonProcessor.writeValueAsString(errorMsg); + } catch (JsonProcessingException jsonEx) { + // this should never happen, but well just to be safe + throw new AssertionError(jsonEx); + } + } + } + + @Override + public void handleMessage(final SignalServiceEnvelope envelope, final SignalServiceContent decryptedContent, final Throwable e) { + System.out.println("Sender: " + decryptedContent.getSender()); + System.out.println("Message Received"); + String content = buildJsonMessage(envelope, decryptedContent, e); + System.out.println("Publishing message: " + content); + MqttMessage message = new MqttMessage(content.getBytes()); + message.setQos(DEFAULT_QUALITY_OF_SERVICE); + + try { + mqttClient.publish(DEFAULT_TOPIC, message); + } catch (MqttException e1) { + e1.printStackTrace(); + // TODO: not sure how to handle that here + } + System.out.println("Message published"); + } +}