Improve JSON-RPC subscribeReceive method with subscription id

This commit is contained in:
AsamK 2022-01-30 16:35:11 +01:00
parent e5a8cdb056
commit be0993c5d8
9 changed files with 158 additions and 43 deletions

View file

@ -269,7 +269,10 @@
{ {
"name":"java.lang.Throwable", "name":"java.lang.Throwable",
"queryAllPublicMethods":true, "queryAllPublicMethods":true,
"methods":[{"name":"addSuppressed","parameterTypes":["java.lang.Throwable"] }] "methods":[
{"name":"addSuppressed","parameterTypes":["java.lang.Throwable"] },
{"name":"getSuppressed","parameterTypes":[] }
]
}, },
{ {
"name":"java.lang.reflect.Method", "name":"java.lang.reflect.Method",

View file

@ -10,6 +10,8 @@ public interface MultiAccountManager extends AutoCloseable {
List<String> getAccountNumbers(); List<String> getAccountNumbers();
List<Manager> getManagers();
void addOnManagerAddedHandler(Consumer<Manager> handler); void addOnManagerAddedHandler(Consumer<Manager> handler);
void addOnManagerRemovedHandler(Consumer<Manager> handler); void addOnManagerRemovedHandler(Consumer<Manager> handler);

View file

@ -49,6 +49,13 @@ public class MultiAccountManagerImpl implements MultiAccountManager {
} }
} }
@Override
public List<Manager> getManagers() {
synchronized (managers) {
return new ArrayList<>(managers);
}
}
void addManager(final Manager m) { void addManager(final Manager m) {
synchronized (managers) { synchronized (managers) {
if (managers.contains(m)) { if (managers.contains(m)) {

View file

@ -76,6 +76,22 @@ The `method` field is the command name and the parameters can be sent as the `pa
`--attachment ATTACH` becomes `"attachment":"ATTACH"` `--attachment ATTACH` becomes `"attachment":"ATTACH"`
=== Additional JSON-RPC commands
For receiving message additional commands are provided in JSON-RPC mode with `--receive-mode=manual`.
==== subscribeReceive
Tells the daemon to start receiving messages, returns the subscription id as a single integer value in the result.
==== unsubscribeReceive
Stop a previous subscription for receiving messages.
Params:
- `subscription`: the subscription id returned by `subscribeReceive`
== Examples == Examples
REQUEST: `{"jsonrpc":"2.0","method":"listGroups","id":"5"}` RESPONSE: `{"jsonrpc":"2.0","result":[...],"id":"5"}` REQUEST: `{"jsonrpc":"2.0","method":"listGroups","id":"5"}` RESPONSE: `{"jsonrpc":"2.0","result":[...],"id":"5"}`

View file

@ -8,7 +8,7 @@ vim:set ts=4 sw=4 tw=82 noet:
== Name == Name
signal-cli - A commandline and dbus interface for the Signal messenger signal-cli - A commandline interface for the Signal messenger
== Synopsis == Synopsis
@ -20,7 +20,7 @@ signal-cli is a commandline interface for libsignal-service-java.
It supports registering, verifying, sending and receiving messages. It supports registering, verifying, sending and receiving messages.
For registering you need a phone number where you can receive SMS or incoming calls. For registering you need a phone number where you can receive SMS or incoming calls.
signal-cli was primarily developed to be used on servers to notify admins of important events. signal-cli was primarily developed to be used on servers to notify admins of important events.
For this use-case, it has a dbus interface, that can be used to send messages from any programming language that has dbus bindings. For this use-case, it has a dbus and a JSON-RPC interface, that can be used to send messages from other programs.
For some functionality the Signal protocol requires that all messages have been received from the server. For some functionality the Signal protocol requires that all messages have been received from the server.
The `receive` command should be regularly executed. The `receive` command should be regularly executed.
@ -54,8 +54,9 @@ This flag must not be given for the `link` command.
It is optional for the `daemon` command. It is optional for the `daemon` command.
For all other commands it is only optional if there is exactly one local user in the config directory. For all other commands it is only optional if there is exactly one local user in the config directory.
*--service-environment* ENVIRONMENT *--service-environment* ENVIRONMENT::
Choose the server environment to use: Choose the server environment to use:
- `live` (default) - `live` (default)
- `staging` - `staging`

View file

@ -155,7 +155,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
final var receiveMode = ns.<ReceiveMode>get("receive-mode"); final var receiveMode = ns.<ReceiveMode>get("receive-mode");
final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments")); final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
c.getAccountNumbers().stream().map(c::getManager).filter(Objects::nonNull).forEach(m -> { c.getManagers().forEach(m -> {
m.setIgnoreAttachments(ignoreAttachments); m.setIgnoreAttachments(ignoreAttachments);
addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START); addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
}); });
@ -317,10 +317,8 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
connection.unExportObject(path); connection.unExportObject(path);
}); });
final var initThreads = c.getAccountNumbers() final var initThreads = c.getManagers()
.stream() .stream()
.map(c::getManager)
.filter(Objects::nonNull)
.map(m -> exportMultiAccountManager(connection, m, noReceiveOnStart)) .map(m -> exportMultiAccountManager(connection, m, noReceiveOnStart))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.toList(); .toList();

View file

@ -46,6 +46,14 @@ public class DbusMultiAccountManagerImpl implements MultiAccountManager {
.toList(); .toList();
} }
@Override
public List<Manager> getManagers() {
return signalControl.listAccounts()
.stream()
.map(a -> (Manager) new DbusManagerImpl(getRemoteObject(a, Signal.class), connection))
.toList();
}
@Override @Override
public void addOnManagerAddedHandler(final Consumer<Manager> handler) { public void addOnManagerAddedHandler(final Consumer<Manager> handler) {
synchronized (onManagerAddedHandlers) { synchronized (onManagerAddedHandlers) {

View file

@ -90,10 +90,11 @@ public class JsonRpcReader {
String input = lineSupplier.get(); String input = lineSupplier.get();
if (input == null) { if (input == null) {
// Reached end of input stream logger.trace("Reached end of JSON-RPC input stream.");
break; break;
} }
logger.trace("Incoming JSON-RPC message: {}", input);
JsonRpcMessage message = parseJsonRpcMessage(input); JsonRpcMessage message = parseJsonRpcMessage(input);
if (message == null) continue; if (message == null) continue;

View file

@ -5,7 +5,9 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ContainerNode; import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import org.asamk.signal.commands.Command; import org.asamk.signal.commands.Command;
@ -21,6 +23,7 @@ import org.asamk.signal.json.JsonReceiveMessageHandler;
import org.asamk.signal.manager.Manager; import org.asamk.signal.manager.Manager;
import org.asamk.signal.manager.MultiAccountManager; import org.asamk.signal.manager.MultiAccountManager;
import org.asamk.signal.manager.RegistrationManager; import org.asamk.signal.manager.RegistrationManager;
import org.asamk.signal.manager.api.Pair;
import org.asamk.signal.output.JsonWriter; import org.asamk.signal.output.JsonWriter;
import org.asamk.signal.util.Util; import org.asamk.signal.util.Util;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -29,8 +32,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.OverlappingFileLockException; import java.nio.channels.OverlappingFileLockException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier; import java.util.function.Supplier;
public class SignalJsonRpcDispatcherHandler { public class SignalJsonRpcDispatcherHandler {
@ -47,7 +51,7 @@ public class SignalJsonRpcDispatcherHandler {
private final boolean noReceiveOnStart; private final boolean noReceiveOnStart;
private MultiAccountManager c; private MultiAccountManager c;
private final Map<Manager, Manager.ReceiveMessageHandler> receiveHandlers = new HashMap<>(); private final Map<Integer, List<Pair<Manager, Manager.ReceiveMessageHandler>>> receiveHandlers = new HashMap<>();
private Manager m; private Manager m;
@ -64,7 +68,7 @@ public class SignalJsonRpcDispatcherHandler {
this.c = c; this.c = c;
if (!noReceiveOnStart) { if (!noReceiveOnStart) {
c.getAccountNumbers().stream().map(c::getManager).filter(Objects::nonNull).forEach(this::subscribeReceive); this.subscribeReceive(c.getManagers());
c.addOnManagerAddedHandler(this::subscribeReceive); c.addOnManagerAddedHandler(this::subscribeReceive);
c.addOnManagerRemovedHandler(this::unsubscribeReceive); c.addOnManagerRemovedHandler(this::unsubscribeReceive);
} }
@ -85,33 +89,46 @@ public class SignalJsonRpcDispatcherHandler {
handleConnection(); handleConnection();
} }
private void subscribeReceive(final Manager m) { private static final AtomicInteger nextSubscriptionId = new AtomicInteger(0);
if (receiveHandlers.containsKey(m)) {
return;
}
final var receiveMessageHandler = new JsonReceiveMessageHandler(m, private int subscribeReceive(final Manager manager) {
s -> jsonRpcSender.sendRequest(JsonRpcRequest.forNotification("receive", return subscribeReceive(List.of(manager));
objectMapper.valueToTree(s),
null)));
m.addReceiveHandler(receiveMessageHandler);
receiveHandlers.put(m, receiveMessageHandler);
while (!m.hasCaughtUpWithOldMessages()) {
try {
synchronized (m) {
m.wait();
}
} catch (InterruptedException ignored) {
}
}
} }
void unsubscribeReceive(final Manager m) { private int subscribeReceive(final List<Manager> managers) {
final var receiveMessageHandler = receiveHandlers.remove(m); final var subscriptionId = nextSubscriptionId.getAndIncrement();
if (receiveMessageHandler != null) { final var handlers = managers.stream().map(m -> {
m.removeReceiveHandler(receiveMessageHandler); final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> {
final ContainerNode<?> params = objectMapper.valueToTree(s);
((ObjectNode) params).set("subscription", IntNode.valueOf(subscriptionId));
jsonRpcSender.sendRequest(JsonRpcRequest.forNotification("receive", params, null));
});
m.addReceiveHandler(receiveMessageHandler);
return new Pair<>(m, (Manager.ReceiveMessageHandler) receiveMessageHandler);
}).toList();
receiveHandlers.put(subscriptionId, handlers);
return subscriptionId;
}
private boolean unsubscribeReceive(final int subscriptionId) {
final var handlers = receiveHandlers.remove(subscriptionId);
if (handlers == null) {
return false;
} }
for (final var pair : handlers) {
unsubscribeReceiveHandler(pair);
}
return true;
}
private void unsubscribeReceive(final Manager m) {
final var subscriptionId = receiveHandlers.entrySet()
.stream()
.filter(e -> e.getValue().size() == 1 && e.getValue().get(0).first().equals(m))
.map(Map.Entry::getKey)
.findFirst();
subscriptionId.ifPresent(this::unsubscribeReceive);
} }
private void handleConnection() { private void handleConnection() {
@ -119,16 +136,28 @@ public class SignalJsonRpcDispatcherHandler {
jsonRpcReader.readMessages((method, params) -> handleRequest(objectMapper, method, params), jsonRpcReader.readMessages((method, params) -> handleRequest(objectMapper, method, params),
response -> logger.debug("Received unexpected response for id {}", response.getId())); response -> logger.debug("Received unexpected response for id {}", response.getId()));
} finally { } finally {
receiveHandlers.forEach(Manager::removeReceiveHandler); receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler));
receiveHandlers.clear(); receiveHandlers.clear();
} }
} }
private void unsubscribeReceiveHandler(final Pair<Manager, Manager.ReceiveMessageHandler> pair) {
final var m = pair.first();
final var handler = pair.second();
m.removeReceiveHandler(handler);
}
private JsonNode handleRequest( private JsonNode handleRequest(
final ObjectMapper objectMapper, final String method, ContainerNode<?> params final ObjectMapper objectMapper, final String method, ContainerNode<?> params
) throws JsonRpcException { ) throws JsonRpcException {
var command = getCommand(method); var command = getCommand(method);
if (c != null) { if (c != null) {
if (command instanceof JsonRpcSingleCommand<?> jsonRpcCommand) {
final var manager = getManagerFromParams(params);
if (manager != null) {
return runCommand(objectMapper, params, new CommandRunnerImpl<>(manager, jsonRpcCommand));
}
}
if (command instanceof JsonRpcMultiCommand<?> jsonRpcCommand) { if (command instanceof JsonRpcMultiCommand<?> jsonRpcCommand) {
return runCommand(objectMapper, params, new MultiCommandRunnerImpl<>(c, jsonRpcCommand)); return runCommand(objectMapper, params, new MultiCommandRunnerImpl<>(c, jsonRpcCommand));
} }
@ -168,10 +197,15 @@ public class SignalJsonRpcDispatcherHandler {
null)); null));
} }
private Manager getManagerFromParams(final ContainerNode<?> params) { private Manager getManagerFromParams(final ContainerNode<?> params) throws JsonRpcException {
if (params != null && params.has("account")) { if (params != null && params.hasNonNull("account")) {
final var manager = c.getManager(params.get("account").asText()); final var manager = c.getManager(params.get("account").asText());
((ObjectNode) params).remove("account"); ((ObjectNode) params).remove("account");
if (manager == null) {
throw new JsonRpcException(new JsonRpcResponse.Error(JsonRpcResponse.Error.INVALID_PARAMS,
"Specified account does not exist",
null));
}
return manager; return manager;
} }
return null; return null;
@ -322,7 +356,7 @@ public class SignalJsonRpcDispatcherHandler {
command.handleCommand(requestParams, jsonWriter); command.handleCommand(requestParams, jsonWriter);
} }
private class SubscribeReceiveCommand implements JsonRpcSingleCommand<Void> { private class SubscribeReceiveCommand implements JsonRpcSingleCommand<Void>, JsonRpcMultiCommand<Void> {
@Override @Override
public String getName() { public String getName() {
@ -333,22 +367,67 @@ public class SignalJsonRpcDispatcherHandler {
public void handleCommand( public void handleCommand(
final Void request, final Manager m, final JsonWriter jsonWriter final Void request, final Manager m, final JsonWriter jsonWriter
) throws CommandException { ) throws CommandException {
subscribeReceive(m); final var subscriptionId = subscribeReceive(m);
jsonWriter.write(subscriptionId);
}
@Override
public void handleCommand(
final Void request, final MultiAccountManager c, final JsonWriter jsonWriter
) throws CommandException {
final var subscriptionId = subscribeReceive(c.getManagers());
jsonWriter.write(subscriptionId);
} }
} }
private class UnsubscribeReceiveCommand implements JsonRpcSingleCommand<Void> { private class UnsubscribeReceiveCommand implements JsonRpcSingleCommand<JsonNode>, JsonRpcMultiCommand<JsonNode> {
@Override @Override
public String getName() { public String getName() {
return "unsubscribeReceive"; return "unsubscribeReceive";
} }
@Override
public TypeReference<JsonNode> getRequestType() {
return new TypeReference<>() {};
}
@Override @Override
public void handleCommand( public void handleCommand(
final Void request, final Manager m, final JsonWriter jsonWriter final JsonNode request, final Manager m, final JsonWriter jsonWriter
) throws CommandException { ) throws CommandException {
unsubscribeReceive(m); final var subscriptionId = getSubscriptionId(request);
if (subscriptionId == null) {
unsubscribeReceive(m);
} else {
if (!unsubscribeReceive(subscriptionId)) {
throw new UserErrorException("Unknown subscription id");
}
}
}
@Override
public void handleCommand(
final JsonNode request, final MultiAccountManager c, final JsonWriter jsonWriter
) throws CommandException {
final var subscriptionId = getSubscriptionId(request);
if (subscriptionId == null) {
throw new UserErrorException("Missing subscription parameter with subscription id");
} else {
if (!unsubscribeReceive(subscriptionId)) {
throw new UserErrorException("Unknown subscription id");
}
}
}
private Integer getSubscriptionId(final JsonNode request) {
if (request instanceof ArrayNode req) {
return req.get(0).asInt();
} else if (request instanceof ObjectNode req) {
return req.get("subscription").asInt();
} else {
return null;
}
} }
} }
} }