mirror of
https://github.com/AsamK/signal-cli
synced 2025-09-02 04:20:38 +00:00
ADD: mqtt handler and command
This commit is contained in:
parent
9a391b8315
commit
ee8971f002
3 changed files with 181 additions and 0 deletions
|
@ -26,6 +26,9 @@ public class Commands {
|
||||||
addCommand("updateAccount", new UpdateAccountCommand());
|
addCommand("updateAccount", new UpdateAccountCommand());
|
||||||
addCommand("updateGroup", new UpdateGroupCommand());
|
addCommand("updateGroup", new UpdateGroupCommand());
|
||||||
addCommand("verify", new VerifyCommand());
|
addCommand("verify", new VerifyCommand());
|
||||||
|
|
||||||
|
// mqtt commands
|
||||||
|
addCommand("mqtt", new MqttCommand());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Map<String, Command> getCommands() {
|
public static Map<String, Command> getCommands() {
|
||||||
|
|
86
src/main/java/org/asamk/signal/commands/MqttCommand.java
Normal file
86
src/main/java/org/asamk/signal/commands/MqttCommand.java
Normal file
|
@ -0,0 +1,86 @@
|
||||||
|
package org.asamk.signal.commands;
|
||||||
|
|
||||||
|
import net.sourceforge.argparse4j.impl.Arguments;
|
||||||
|
import net.sourceforge.argparse4j.inf.Namespace;
|
||||||
|
import net.sourceforge.argparse4j.inf.Subparser;
|
||||||
|
import org.asamk.signal.manager.Manager;
|
||||||
|
import org.asamk.signal.mqtt.MqttReceiveMessageHandler;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.asamk.signal.util.ErrorUtils.handleAssertionError;
|
||||||
|
|
||||||
|
public class MqttCommand implements LocalCommand {
|
||||||
|
|
||||||
|
private static String DEFAULT_MQTT_BROKER = "tcp://127.0.0.1:1883";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void attachToSubparser(final Subparser subparser) {
|
||||||
|
//subparser.addArgument("--json")
|
||||||
|
// .help("Output received messages in json format, one json object per line.")
|
||||||
|
// .action(Arguments.storeTrue());
|
||||||
|
subparser.addArgument("-b", "--broker")
|
||||||
|
.help("Output received messages in json format, one json object per line.")
|
||||||
|
.action(Arguments.store());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int handleCommand(final Namespace ns, final Manager m) {
|
||||||
|
if (!m.isRegistered()) {
|
||||||
|
System.err.println("User is not registered.");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
// TODO: start new thread to also send messages
|
||||||
|
String brokerInput = ns.getString("broker");
|
||||||
|
|
||||||
|
String broker = brokerInput != null ? brokerInput : DEFAULT_MQTT_BROKER;
|
||||||
|
String clientId = "signal-cli";
|
||||||
|
|
||||||
|
MqttClient mqttClient = null;
|
||||||
|
try {
|
||||||
|
// connect to mqtt
|
||||||
|
mqttClient = new MqttClient(broker, clientId);
|
||||||
|
MqttConnectOptions connOpts = new MqttConnectOptions();
|
||||||
|
connOpts.setCleanSession(true);
|
||||||
|
System.out.println("Connecting to broker: " + broker);
|
||||||
|
mqttClient.connect(connOpts);
|
||||||
|
System.out.println("Connected");
|
||||||
|
boolean ignoreAttachments = false;
|
||||||
|
try {
|
||||||
|
m.receiveMessages(1,
|
||||||
|
TimeUnit.HOURS,
|
||||||
|
false,
|
||||||
|
ignoreAttachments,
|
||||||
|
new MqttReceiveMessageHandler(m, mqttClient));
|
||||||
|
return 0;
|
||||||
|
} catch (IOException e) {
|
||||||
|
System.err.println("Error while receiving messages: " + e.getMessage());
|
||||||
|
return 3;
|
||||||
|
} catch (AssertionError e) {
|
||||||
|
handleAssertionError(e);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
} catch(MqttException me) {
|
||||||
|
System.err.println("Error while handling mqtt: " + me.getMessage());
|
||||||
|
me.printStackTrace();
|
||||||
|
return 1;
|
||||||
|
} finally {
|
||||||
|
if(mqttClient != null) {
|
||||||
|
try {
|
||||||
|
mqttClient.disconnect();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
catch (MqttException me)
|
||||||
|
{
|
||||||
|
System.err.println("Error while closing mqtt connection: " + me.getMessage());
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,92 @@
|
||||||
|
package org.asamk.signal.mqtt;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||||
|
import com.fasterxml.jackson.annotation.PropertyAccessor;
|
||||||
|
import com.fasterxml.jackson.core.JsonGenerator;
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||||
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.asamk.signal.JsonError;
|
||||||
|
import org.asamk.signal.JsonMessageEnvelope;
|
||||||
|
import org.asamk.signal.manager.Manager;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
|
import org.whispersystems.signalservice.api.messages.SignalServiceContent;
|
||||||
|
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handler class that passes incoming Signal messages to an mqtt broker.
|
||||||
|
*/
|
||||||
|
public class MqttReceiveMessageHandler implements Manager.ReceiveMessageHandler {
|
||||||
|
|
||||||
|
public static String DEFAULT_TOPIC = "signal-cli/messages/incoming";
|
||||||
|
|
||||||
|
private static int DEFAULT_QUALITY_OF_SERVICE = 2;
|
||||||
|
|
||||||
|
private final MqttClient mqttClient;
|
||||||
|
private final ObjectMapper jsonProcessor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new instance that passes all incoming messages to the provided mqttClient.
|
||||||
|
* @param mqttClient the broker to pass all the incoming messages to
|
||||||
|
*/
|
||||||
|
public MqttReceiveMessageHandler(MqttClient mqttClient)
|
||||||
|
{
|
||||||
|
this.mqttClient = mqttClient;
|
||||||
|
this.jsonProcessor = new ObjectMapper();
|
||||||
|
jsonProcessor.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // disable autodetect
|
||||||
|
jsonProcessor.enable(SerializationFeature.WRITE_NULL_MAP_VALUES);
|
||||||
|
jsonProcessor.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
|
||||||
|
jsonProcessor.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builds a Json Message from an incoming signal message.
|
||||||
|
* @param envelope the signal service envelope of the message
|
||||||
|
* @param content the content of the message
|
||||||
|
* @param exception an exception that might have occurred on the way of processing
|
||||||
|
* @return the json encoded message as a string
|
||||||
|
*/
|
||||||
|
private String buildJsonMessage(SignalServiceEnvelope envelope, SignalServiceContent content, Throwable exception) {
|
||||||
|
ObjectNode result = jsonProcessor.createObjectNode();
|
||||||
|
if (exception != null) {
|
||||||
|
result.putPOJO("error", new JsonError(exception));
|
||||||
|
}
|
||||||
|
if (envelope != null) {
|
||||||
|
result.putPOJO("envelope", new JsonMessageEnvelope(envelope, content));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return jsonProcessor.writeValueAsString(result);
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
ObjectNode errorMsg = jsonProcessor.createObjectNode();
|
||||||
|
result.putPOJO("error", new JsonError(e));
|
||||||
|
try {
|
||||||
|
return jsonProcessor.writeValueAsString(errorMsg);
|
||||||
|
} catch (JsonProcessingException jsonEx) {
|
||||||
|
// this should never happen, but well just to be safe
|
||||||
|
throw new AssertionError(jsonEx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleMessage(final SignalServiceEnvelope envelope, final SignalServiceContent decryptedContent, final Throwable e) {
|
||||||
|
System.out.println("Sender: " + decryptedContent.getSender());
|
||||||
|
System.out.println("Message Received");
|
||||||
|
String content = buildJsonMessage(envelope, decryptedContent, e);
|
||||||
|
System.out.println("Publishing message: " + content);
|
||||||
|
MqttMessage message = new MqttMessage(content.getBytes());
|
||||||
|
message.setQos(DEFAULT_QUALITY_OF_SERVICE);
|
||||||
|
|
||||||
|
try {
|
||||||
|
mqttClient.publish(DEFAULT_TOPIC, message);
|
||||||
|
} catch (MqttException e1) {
|
||||||
|
e1.printStackTrace();
|
||||||
|
// TODO: not sure how to handle that here
|
||||||
|
}
|
||||||
|
System.out.println("Message published");
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue