Implement socket/tcp for daemon command

This commit is contained in:
AsamK 2021-11-10 10:30:57 +01:00
parent 7706a02e1b
commit 81a11dc977
19 changed files with 785 additions and 240 deletions

View file

@ -198,7 +198,11 @@ public interface Manager extends Closeable {
* Add a handler to receive new messages.
* Will start receiving messages from server, if not already started.
*/
void addReceiveHandler(ReceiveMessageHandler handler);
default void addReceiveHandler(ReceiveMessageHandler handler) {
addReceiveHandler(handler, false);
}
void addReceiveHandler(ReceiveMessageHandler handler, final boolean isWeakListener);
/**
* Remove a handler to receive new messages.
@ -249,6 +253,9 @@ public interface Manager extends Closeable {
interface ReceiveMessageHandler {
ReceiveMessageHandler EMPTY = (envelope, e) -> {
};
void handleMessage(MessageEnvelope envelope, Throwable e);
}
}

View file

@ -108,6 +108,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.asamk.signal.manager.config.ServiceConfig.capabilities;
@ -139,6 +140,7 @@ public class ManagerImpl implements Manager {
private boolean ignoreAttachments = false;
private Thread receiveThread;
private final Set<ReceiveMessageHandler> weakHandlers = new HashSet<>();
private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
private boolean isReceivingSynchronous;
@ -904,14 +906,17 @@ public class ManagerImpl implements Manager {
}
@Override
public void addReceiveHandler(final ReceiveMessageHandler handler) {
public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) {
if (isReceivingSynchronous) {
throw new IllegalStateException("Already receiving message synchronously.");
}
synchronized (messageHandlers) {
messageHandlers.add(handler);
startReceiveThreadIfRequired();
if (isWeakListener) {
weakHandlers.add(handler);
} else {
messageHandlers.add(handler);
startReceiveThreadIfRequired();
}
}
}
@ -925,13 +930,13 @@ public class ManagerImpl implements Manager {
try {
receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, e) -> {
synchronized (messageHandlers) {
for (ReceiveMessageHandler h : messageHandlers) {
Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
try {
h.handleMessage(envelope, e);
} catch (Exception ex) {
logger.warn("Message handler failed, ignoring", ex);
}
}
});
}
});
break;
@ -959,8 +964,9 @@ public class ManagerImpl implements Manager {
public void removeReceiveHandler(final ReceiveMessageHandler handler) {
final Thread thread;
synchronized (messageHandlers) {
weakHandlers.remove(handler);
messageHandlers.remove(handler);
if (!messageHandlers.isEmpty() || isReceivingSynchronous) {
if (!messageHandlers.isEmpty() || receiveThread == null || isReceivingSynchronous) {
return;
}
thread = receiveThread;
@ -1380,6 +1386,7 @@ public class ManagerImpl implements Manager {
private void close(boolean closeAccount) throws IOException {
Thread thread;
synchronized (messageHandlers) {
weakHandlers.clear();
messageHandlers.clear();
thread = receiveThread;
receiveThread = null;