From ebdd973851c8b6a53415dd350dcccdda27cdb9f1 Mon Sep 17 00:00:00 2001 From: peteh Date: Mon, 24 Dec 2018 14:52:26 +0100 Subject: [PATCH] ADD: message handler that publishes incoming messages to mqtt --- .../asamk/signal/commands/MqttCommand.java | 83 ++++++++++++ .../asamk/signal/mqtt/MqttJsonMessage.java | 123 ++++++++++++++++++ .../mqtt/MqttReceiveMessageHandler.java | 68 ++++++++++ 3 files changed, 274 insertions(+) create mode 100644 src/main/java/org/asamk/signal/commands/MqttCommand.java create mode 100644 src/main/java/org/asamk/signal/mqtt/MqttJsonMessage.java create mode 100644 src/main/java/org/asamk/signal/mqtt/MqttReceiveMessageHandler.java diff --git a/src/main/java/org/asamk/signal/commands/MqttCommand.java b/src/main/java/org/asamk/signal/commands/MqttCommand.java new file mode 100644 index 00000000..ca7cdfa8 --- /dev/null +++ b/src/main/java/org/asamk/signal/commands/MqttCommand.java @@ -0,0 +1,83 @@ +package org.asamk.signal.commands; + +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.asamk.signal.manager.Manager; +import org.asamk.signal.mqtt.MqttReceiveMessageHandler; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.asamk.signal.util.ErrorUtils.handleAssertionError; + +public class MqttCommand implements LocalCommand { + + private static String DEFAULT_MQTT_BROKER = "tcp://127.0.0.1:1883"; + + @Override + public void attachToSubparser(final Subparser subparser) { + subparser.addArgument("-b", "--broker") + .help("The broker to connect to, default: " + DEFAULT_MQTT_BROKER) + .action(Arguments.store()); + } + + @Override + public int handleCommand(final Namespace ns, final Manager m) { + if (!m.isRegistered()) { + System.err.println("User is not registered."); + return 1; + } + // TODO: start new thread to also send messages + String brokerInput = ns.getString("broker"); + + String broker = brokerInput != null ? brokerInput : DEFAULT_MQTT_BROKER; + String clientId = "signal-cli"; + + MqttClient mqttClient = null; + try { + // connect to mqtt + mqttClient = new MqttClient(broker, clientId); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + System.out.println("Connecting to broker: " + broker); + mqttClient.connect(connOpts); + System.out.println("Connected"); + + boolean ignoreAttachments = false; + try { + m.receiveMessages(1, + TimeUnit.HOURS, + false, + ignoreAttachments, + new MqttReceiveMessageHandler(m, mqttClient)); + return 0; + } catch (IOException e) { + System.err.println("Error while receiving messages: " + e.getMessage()); + return 3; + } catch (AssertionError e) { + handleAssertionError(e); + return 1; + } + } catch(MqttException me) { + System.err.println("Error while handling mqtt: " + me.getMessage()); + me.printStackTrace(); + return 1; + } finally { + if(mqttClient != null) { + try { + mqttClient.disconnect(); + return 0; + } + catch (MqttException me) + { + System.err.println("Error while closing mqtt connection: " + me.getMessage()); + return 1; + } + } + } + } +} diff --git a/src/main/java/org/asamk/signal/mqtt/MqttJsonMessage.java b/src/main/java/org/asamk/signal/mqtt/MqttJsonMessage.java new file mode 100644 index 00000000..8d22aedd --- /dev/null +++ b/src/main/java/org/asamk/signal/mqtt/MqttJsonMessage.java @@ -0,0 +1,123 @@ +package org.asamk.signal.mqtt; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.asamk.signal.JsonError; +import org.asamk.signal.JsonMessageEnvelope; +import org.whispersystems.signalservice.api.messages.SignalServiceContent; +import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; + +/** + * Class that encapsulates an incoming message that can be published on an mqtt broker. + */ +public class MqttJsonMessage { + + public static final String SUBTOPIC_DATA = "data"; + public static final String SUBTOPIC_SYNC = "sync"; + public static final String SUBTOPIC_CALL = "call"; + public static final String SUBTOPIC_TYPING_INFO = "typinginfo"; + public static final String SUBTOPIC_RECEIPT = "receipt"; + public static final String SUBTOPIC_OTHER = "other"; + + private String subTopic; + private String content; + + private static String SUBTOPIC_ERROR = "error"; + + private MqttJsonMessage() { + // hide public constructor + } + + private void setSubTopic(String subTopic) { + this.subTopic = subTopic; + } + + public String getSubTopic() { + return subTopic; + } + + private void setContent(String content) { + this.content = content; + } + + /** + * Returns the json encoded message. + * @return json encoded message + */ + public String getJsonContent() { + return content; + } + + /** + * Builds a Json Message from an incoming signal message and determines the corresponding sub topic + * for the mqtt broker. + * + * @param envelope the signal service envelope of the message + * @param content the content of the message + * @param exception an exception that might have occurred on the way of processing + * @return the mqtt json message with assigned sub topic + */ + public static MqttJsonMessage build(SignalServiceEnvelope envelope, SignalServiceContent content, Throwable exception) { + MqttJsonMessage message = new MqttJsonMessage(); + + ObjectMapper jsonProcessor = new ObjectMapper(); + jsonProcessor.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // disable autodetect + jsonProcessor.enable(SerializationFeature.WRITE_NULL_MAP_VALUES); + jsonProcessor.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + jsonProcessor.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + + ObjectNode result = jsonProcessor.createObjectNode(); + if (exception != null) { + result.putPOJO("error", new JsonError(exception)); + message.setSubTopic(SUBTOPIC_ERROR); + } + + if (envelope != null) { + result.putPOJO("envelope", new JsonMessageEnvelope(envelope, content)); + message.setSubTopic(findSubTopic(content)); + } + try { + message.setContent(jsonProcessor.writeValueAsString(result)); + } catch (JsonProcessingException e) { + ObjectNode errorMsg = jsonProcessor.createObjectNode(); + result.putPOJO("error", new JsonError(e)); + try { + message.setSubTopic(SUBTOPIC_ERROR); + message.setContent(jsonProcessor.writeValueAsString(errorMsg)); + } catch (JsonProcessingException jsonEx) { + // this should never happen, but well just to be safe + throw new AssertionError(jsonEx); + } + } + return message; + } + + /** + * Finds the designated type of the message and defines the subtopic for the mqtt broker. + * + * Possible subtopics: data, synq, call, typinginfo, receipt, other + * @param content + * @return + */ + private static String findSubTopic(final SignalServiceContent content) { + if (content.getDataMessage().isPresent()) { + return SUBTOPIC_DATA; + } else if (content.getSyncMessage().isPresent()) { + return SUBTOPIC_SYNC; + } else if (content.getCallMessage().isPresent()) { + return SUBTOPIC_CALL; + } else if (content.getTypingMessage().isPresent()) { + return SUBTOPIC_TYPING_INFO; + } else if (content.getReceiptMessage().isPresent()) { + return SUBTOPIC_RECEIPT; + } else { + return SUBTOPIC_OTHER; + } + } +} diff --git a/src/main/java/org/asamk/signal/mqtt/MqttReceiveMessageHandler.java b/src/main/java/org/asamk/signal/mqtt/MqttReceiveMessageHandler.java new file mode 100644 index 00000000..721bf842 --- /dev/null +++ b/src/main/java/org/asamk/signal/mqtt/MqttReceiveMessageHandler.java @@ -0,0 +1,68 @@ +package org.asamk.signal.mqtt; + +import org.asamk.signal.manager.Manager; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.whispersystems.signalservice.api.messages.SignalServiceContent; +import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; + +/** + * Handler class that passes incoming Signal messages to an mqtt broker. + */ +public class MqttReceiveMessageHandler implements Manager.ReceiveMessageHandler { + + public static String DEFAULT_TOPIC = "signal-cli/messages/incoming/"; + + private static int DEFAULT_QUALITY_OF_SERVICE = 2; + + private final MqttClient mqttClient; + private final Manager manager; + + + /** + * Creates a new instance that passes all incoming messages to the provided mqttClient. + * @param mqttClient the broker to pass all the incoming messages to + */ + public MqttReceiveMessageHandler(Manager manager, MqttClient mqttClient) + { + this.manager = manager; + this.mqttClient = mqttClient; + } + + /** + * Removes spaces and wildcard signs (*, +) from a given string. + * @param topic the topic to clean + * @return the cleaned topic + */ + private String stripIllegalTopicCharacters(String topic) + { + return topic.replace("+", "") + .replace(" ", ""); + } + + private void publishMessage(String topic, String content) + { + MqttMessage message = new MqttMessage(content.getBytes()); + message.setQos(DEFAULT_QUALITY_OF_SERVICE); + try { + System.out.println("Topic: " + topic); + System.out.println("Publishing message: " + content); + mqttClient.publish(topic, message); + } catch (MqttException ex) { + throw new AssertionError(ex); + } + System.out.println("Message published"); + } + + @Override + public void handleMessage(final SignalServiceEnvelope envelope, final SignalServiceContent decryptedContent, final Throwable e) { + System.out.println("Message Received from " + decryptedContent.getSender()); + + MqttJsonMessage msg = MqttJsonMessage.build(envelope, decryptedContent, e); + String topic = DEFAULT_TOPIC + stripIllegalTopicCharacters(manager.getUsername() + "/" + msg.getSubTopic()); + + publishMessage(topic, msg.getJsonContent()); + + } +}