diff --git a/src/main/java/org/asamk/signal/commands/MqttCommand.java b/src/main/java/org/asamk/signal/commands/MqttCommand.java index 36021a79..ca7cdfa8 100644 --- a/src/main/java/org/asamk/signal/commands/MqttCommand.java +++ b/src/main/java/org/asamk/signal/commands/MqttCommand.java @@ -53,7 +53,7 @@ public class MqttCommand implements LocalCommand { TimeUnit.HOURS, false, ignoreAttachments, - new MqttReceiveMessageHandler(mqttClient)); + new MqttReceiveMessageHandler(m, mqttClient)); return 0; } catch (IOException e) { System.err.println("Error while receiving messages: " + e.getMessage()); diff --git a/src/main/java/org/asamk/signal/mqtt/MqttJsonMessage.java b/src/main/java/org/asamk/signal/mqtt/MqttJsonMessage.java new file mode 100644 index 00000000..8d22aedd --- /dev/null +++ b/src/main/java/org/asamk/signal/mqtt/MqttJsonMessage.java @@ -0,0 +1,123 @@ +package org.asamk.signal.mqtt; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.asamk.signal.JsonError; +import org.asamk.signal.JsonMessageEnvelope; +import org.whispersystems.signalservice.api.messages.SignalServiceContent; +import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; + +/** + * Class that encapsulates an incoming message that can be published on an mqtt broker. + */ +public class MqttJsonMessage { + + public static final String SUBTOPIC_DATA = "data"; + public static final String SUBTOPIC_SYNC = "sync"; + public static final String SUBTOPIC_CALL = "call"; + public static final String SUBTOPIC_TYPING_INFO = "typinginfo"; + public static final String SUBTOPIC_RECEIPT = "receipt"; + public static final String SUBTOPIC_OTHER = "other"; + + private String subTopic; + private String content; + + private static String SUBTOPIC_ERROR = "error"; + + private MqttJsonMessage() { + // hide public constructor + } + + private void setSubTopic(String subTopic) { + this.subTopic = subTopic; + } + + public String getSubTopic() { + return subTopic; + } + + private void setContent(String content) { + this.content = content; + } + + /** + * Returns the json encoded message. + * @return json encoded message + */ + public String getJsonContent() { + return content; + } + + /** + * Builds a Json Message from an incoming signal message and determines the corresponding sub topic + * for the mqtt broker. + * + * @param envelope the signal service envelope of the message + * @param content the content of the message + * @param exception an exception that might have occurred on the way of processing + * @return the mqtt json message with assigned sub topic + */ + public static MqttJsonMessage build(SignalServiceEnvelope envelope, SignalServiceContent content, Throwable exception) { + MqttJsonMessage message = new MqttJsonMessage(); + + ObjectMapper jsonProcessor = new ObjectMapper(); + jsonProcessor.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // disable autodetect + jsonProcessor.enable(SerializationFeature.WRITE_NULL_MAP_VALUES); + jsonProcessor.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + jsonProcessor.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + + ObjectNode result = jsonProcessor.createObjectNode(); + if (exception != null) { + result.putPOJO("error", new JsonError(exception)); + message.setSubTopic(SUBTOPIC_ERROR); + } + + if (envelope != null) { + result.putPOJO("envelope", new JsonMessageEnvelope(envelope, content)); + message.setSubTopic(findSubTopic(content)); + } + try { + message.setContent(jsonProcessor.writeValueAsString(result)); + } catch (JsonProcessingException e) { + ObjectNode errorMsg = jsonProcessor.createObjectNode(); + result.putPOJO("error", new JsonError(e)); + try { + message.setSubTopic(SUBTOPIC_ERROR); + message.setContent(jsonProcessor.writeValueAsString(errorMsg)); + } catch (JsonProcessingException jsonEx) { + // this should never happen, but well just to be safe + throw new AssertionError(jsonEx); + } + } + return message; + } + + /** + * Finds the designated type of the message and defines the subtopic for the mqtt broker. + * + * Possible subtopics: data, synq, call, typinginfo, receipt, other + * @param content + * @return + */ + private static String findSubTopic(final SignalServiceContent content) { + if (content.getDataMessage().isPresent()) { + return SUBTOPIC_DATA; + } else if (content.getSyncMessage().isPresent()) { + return SUBTOPIC_SYNC; + } else if (content.getCallMessage().isPresent()) { + return SUBTOPIC_CALL; + } else if (content.getTypingMessage().isPresent()) { + return SUBTOPIC_TYPING_INFO; + } else if (content.getReceiptMessage().isPresent()) { + return SUBTOPIC_RECEIPT; + } else { + return SUBTOPIC_OTHER; + } + } +} diff --git a/src/main/java/org/asamk/signal/mqtt/MqttReceiveMessageHandler.java b/src/main/java/org/asamk/signal/mqtt/MqttReceiveMessageHandler.java index a9e23cb0..721bf842 100644 --- a/src/main/java/org/asamk/signal/mqtt/MqttReceiveMessageHandler.java +++ b/src/main/java/org/asamk/signal/mqtt/MqttReceiveMessageHandler.java @@ -1,15 +1,5 @@ package org.asamk.signal.mqtt; -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.asamk.signal.JsonError; -import org.asamk.signal.JsonMessageEnvelope; import org.asamk.signal.manager.Manager; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; @@ -22,71 +12,57 @@ import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; */ public class MqttReceiveMessageHandler implements Manager.ReceiveMessageHandler { - public static String DEFAULT_TOPIC = "signal-cli/messages/incoming"; + public static String DEFAULT_TOPIC = "signal-cli/messages/incoming/"; private static int DEFAULT_QUALITY_OF_SERVICE = 2; private final MqttClient mqttClient; - private final ObjectMapper jsonProcessor; + private final Manager manager; + /** * Creates a new instance that passes all incoming messages to the provided mqttClient. * @param mqttClient the broker to pass all the incoming messages to */ - public MqttReceiveMessageHandler(MqttClient mqttClient) + public MqttReceiveMessageHandler(Manager manager, MqttClient mqttClient) { + this.manager = manager; this.mqttClient = mqttClient; - this.jsonProcessor = new ObjectMapper(); - jsonProcessor.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // disable autodetect - jsonProcessor.enable(SerializationFeature.WRITE_NULL_MAP_VALUES); - jsonProcessor.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - jsonProcessor.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); } /** - * Builds a Json Message from an incoming signal message. - * @param envelope the signal service envelope of the message - * @param content the content of the message - * @param exception an exception that might have occurred on the way of processing - * @return the json encoded message as a string + * Removes spaces and wildcard signs (*, +) from a given string. + * @param topic the topic to clean + * @return the cleaned topic */ - private String buildJsonMessage(SignalServiceEnvelope envelope, SignalServiceContent content, Throwable exception) { - ObjectNode result = jsonProcessor.createObjectNode(); - if (exception != null) { - result.putPOJO("error", new JsonError(exception)); - } - if (envelope != null) { - result.putPOJO("envelope", new JsonMessageEnvelope(envelope, content)); - } + private String stripIllegalTopicCharacters(String topic) + { + return topic.replace("+", "") + .replace(" ", ""); + } + + private void publishMessage(String topic, String content) + { + MqttMessage message = new MqttMessage(content.getBytes()); + message.setQos(DEFAULT_QUALITY_OF_SERVICE); try { - return jsonProcessor.writeValueAsString(result); - } catch (JsonProcessingException e) { - ObjectNode errorMsg = jsonProcessor.createObjectNode(); - result.putPOJO("error", new JsonError(e)); - try { - return jsonProcessor.writeValueAsString(errorMsg); - } catch (JsonProcessingException jsonEx) { - // this should never happen, but well just to be safe - throw new AssertionError(jsonEx); - } + System.out.println("Topic: " + topic); + System.out.println("Publishing message: " + content); + mqttClient.publish(topic, message); + } catch (MqttException ex) { + throw new AssertionError(ex); } + System.out.println("Message published"); } @Override public void handleMessage(final SignalServiceEnvelope envelope, final SignalServiceContent decryptedContent, final Throwable e) { - System.out.println("Sender: " + decryptedContent.getSender()); - System.out.println("Message Received"); - String content = buildJsonMessage(envelope, decryptedContent, e); - System.out.println("Publishing message: " + content); - MqttMessage message = new MqttMessage(content.getBytes()); - message.setQos(DEFAULT_QUALITY_OF_SERVICE); + System.out.println("Message Received from " + decryptedContent.getSender()); + + MqttJsonMessage msg = MqttJsonMessage.build(envelope, decryptedContent, e); + String topic = DEFAULT_TOPIC + stripIllegalTopicCharacters(manager.getUsername() + "/" + msg.getSubTopic()); + + publishMessage(topic, msg.getJsonContent()); - try { - mqttClient.publish(DEFAULT_TOPIC, message); - } catch (MqttException e1) { - e1.printStackTrace(); - // TODO: not sure how to handle that here - } - System.out.println("Message published"); } }