ADD: subtopics for different types of messages

Factored out json processing from the handler and created subtopics for the different types of incoming messages: data, sync, call, typinginfo, receipt, other
This commit is contained in:
peteh 2018-12-24 14:08:29 +01:00
parent d045c97836
commit cfb5774921
3 changed files with 154 additions and 55 deletions

View file

@ -53,7 +53,7 @@ public class MqttCommand implements LocalCommand {
TimeUnit.HOURS, TimeUnit.HOURS,
false, false,
ignoreAttachments, ignoreAttachments,
new MqttReceiveMessageHandler(mqttClient)); new MqttReceiveMessageHandler(m, mqttClient));
return 0; return 0;
} catch (IOException e) { } catch (IOException e) {
System.err.println("Error while receiving messages: " + e.getMessage()); System.err.println("Error while receiving messages: " + e.getMessage());

View file

@ -0,0 +1,123 @@
package org.asamk.signal.mqtt;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.asamk.signal.JsonError;
import org.asamk.signal.JsonMessageEnvelope;
import org.whispersystems.signalservice.api.messages.SignalServiceContent;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
/**
* Class that encapsulates an incoming message that can be published on an mqtt broker.
*/
public class MqttJsonMessage {
public static final String SUBTOPIC_DATA = "data";
public static final String SUBTOPIC_SYNC = "sync";
public static final String SUBTOPIC_CALL = "call";
public static final String SUBTOPIC_TYPING_INFO = "typinginfo";
public static final String SUBTOPIC_RECEIPT = "receipt";
public static final String SUBTOPIC_OTHER = "other";
private String subTopic;
private String content;
private static String SUBTOPIC_ERROR = "error";
private MqttJsonMessage() {
// hide public constructor
}
private void setSubTopic(String subTopic) {
this.subTopic = subTopic;
}
public String getSubTopic() {
return subTopic;
}
private void setContent(String content) {
this.content = content;
}
/**
* Returns the json encoded message.
* @return json encoded message
*/
public String getJsonContent() {
return content;
}
/**
* Builds a Json Message from an incoming signal message and determines the corresponding sub topic
* for the mqtt broker.
*
* @param envelope the signal service envelope of the message
* @param content the content of the message
* @param exception an exception that might have occurred on the way of processing
* @return the mqtt json message with assigned sub topic
*/
public static MqttJsonMessage build(SignalServiceEnvelope envelope, SignalServiceContent content, Throwable exception) {
MqttJsonMessage message = new MqttJsonMessage();
ObjectMapper jsonProcessor = new ObjectMapper();
jsonProcessor.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // disable autodetect
jsonProcessor.enable(SerializationFeature.WRITE_NULL_MAP_VALUES);
jsonProcessor.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
jsonProcessor.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
ObjectNode result = jsonProcessor.createObjectNode();
if (exception != null) {
result.putPOJO("error", new JsonError(exception));
message.setSubTopic(SUBTOPIC_ERROR);
}
if (envelope != null) {
result.putPOJO("envelope", new JsonMessageEnvelope(envelope, content));
message.setSubTopic(findSubTopic(content));
}
try {
message.setContent(jsonProcessor.writeValueAsString(result));
} catch (JsonProcessingException e) {
ObjectNode errorMsg = jsonProcessor.createObjectNode();
result.putPOJO("error", new JsonError(e));
try {
message.setSubTopic(SUBTOPIC_ERROR);
message.setContent(jsonProcessor.writeValueAsString(errorMsg));
} catch (JsonProcessingException jsonEx) {
// this should never happen, but well just to be safe
throw new AssertionError(jsonEx);
}
}
return message;
}
/**
* Finds the designated type of the message and defines the subtopic for the mqtt broker.
*
* Possible subtopics: data, synq, call, typinginfo, receipt, other
* @param content
* @return
*/
private static String findSubTopic(final SignalServiceContent content) {
if (content.getDataMessage().isPresent()) {
return SUBTOPIC_DATA;
} else if (content.getSyncMessage().isPresent()) {
return SUBTOPIC_SYNC;
} else if (content.getCallMessage().isPresent()) {
return SUBTOPIC_CALL;
} else if (content.getTypingMessage().isPresent()) {
return SUBTOPIC_TYPING_INFO;
} else if (content.getReceiptMessage().isPresent()) {
return SUBTOPIC_RECEIPT;
} else {
return SUBTOPIC_OTHER;
}
}
}

View file

@ -1,15 +1,5 @@
package org.asamk.signal.mqtt; package org.asamk.signal.mqtt;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.asamk.signal.JsonError;
import org.asamk.signal.JsonMessageEnvelope;
import org.asamk.signal.manager.Manager; import org.asamk.signal.manager.Manager;
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
@ -22,71 +12,57 @@ import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
*/ */
public class MqttReceiveMessageHandler implements Manager.ReceiveMessageHandler { public class MqttReceiveMessageHandler implements Manager.ReceiveMessageHandler {
public static String DEFAULT_TOPIC = "signal-cli/messages/incoming"; public static String DEFAULT_TOPIC = "signal-cli/messages/incoming/";
private static int DEFAULT_QUALITY_OF_SERVICE = 2; private static int DEFAULT_QUALITY_OF_SERVICE = 2;
private final MqttClient mqttClient; private final MqttClient mqttClient;
private final ObjectMapper jsonProcessor; private final Manager manager;
/** /**
* Creates a new instance that passes all incoming messages to the provided mqttClient. * Creates a new instance that passes all incoming messages to the provided mqttClient.
* @param mqttClient the broker to pass all the incoming messages to * @param mqttClient the broker to pass all the incoming messages to
*/ */
public MqttReceiveMessageHandler(MqttClient mqttClient) public MqttReceiveMessageHandler(Manager manager, MqttClient mqttClient)
{ {
this.manager = manager;
this.mqttClient = mqttClient; this.mqttClient = mqttClient;
this.jsonProcessor = new ObjectMapper();
jsonProcessor.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // disable autodetect
jsonProcessor.enable(SerializationFeature.WRITE_NULL_MAP_VALUES);
jsonProcessor.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
jsonProcessor.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
} }
/** /**
* Builds a Json Message from an incoming signal message. * Removes spaces and wildcard signs (*, +) from a given string.
* @param envelope the signal service envelope of the message * @param topic the topic to clean
* @param content the content of the message * @return the cleaned topic
* @param exception an exception that might have occurred on the way of processing
* @return the json encoded message as a string
*/ */
private String buildJsonMessage(SignalServiceEnvelope envelope, SignalServiceContent content, Throwable exception) { private String stripIllegalTopicCharacters(String topic)
ObjectNode result = jsonProcessor.createObjectNode(); {
if (exception != null) { return topic.replace("+", "")
result.putPOJO("error", new JsonError(exception)); .replace(" ", "");
} }
if (envelope != null) {
result.putPOJO("envelope", new JsonMessageEnvelope(envelope, content)); private void publishMessage(String topic, String content)
} {
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(DEFAULT_QUALITY_OF_SERVICE);
try { try {
return jsonProcessor.writeValueAsString(result); System.out.println("Topic: " + topic);
} catch (JsonProcessingException e) { System.out.println("Publishing message: " + content);
ObjectNode errorMsg = jsonProcessor.createObjectNode(); mqttClient.publish(topic, message);
result.putPOJO("error", new JsonError(e)); } catch (MqttException ex) {
try { throw new AssertionError(ex);
return jsonProcessor.writeValueAsString(errorMsg);
} catch (JsonProcessingException jsonEx) {
// this should never happen, but well just to be safe
throw new AssertionError(jsonEx);
}
} }
System.out.println("Message published");
} }
@Override @Override
public void handleMessage(final SignalServiceEnvelope envelope, final SignalServiceContent decryptedContent, final Throwable e) { public void handleMessage(final SignalServiceEnvelope envelope, final SignalServiceContent decryptedContent, final Throwable e) {
System.out.println("Sender: " + decryptedContent.getSender()); System.out.println("Message Received from " + decryptedContent.getSender());
System.out.println("Message Received");
String content = buildJsonMessage(envelope, decryptedContent, e); MqttJsonMessage msg = MqttJsonMessage.build(envelope, decryptedContent, e);
System.out.println("Publishing message: " + content); String topic = DEFAULT_TOPIC + stripIllegalTopicCharacters(manager.getUsername() + "/" + msg.getSubTopic());
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(DEFAULT_QUALITY_OF_SERVICE); publishMessage(topic, msg.getJsonContent());
try {
mqttClient.publish(DEFAULT_TOPIC, message);
} catch (MqttException e1) {
e1.printStackTrace();
// TODO: not sure how to handle that here
}
System.out.println("Message published");
} }
} }