Execute JSON-RPC requests in parallel

This commit is contained in:
AsamK 2023-10-05 22:18:13 +02:00
parent a0c345185b
commit 6f4d538832
2 changed files with 75 additions and 22 deletions

View file

@ -14,7 +14,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Objects; import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
@ -53,20 +55,25 @@ public class JsonRpcReader {
return; return;
} }
final var executor = Executors.newFixedThreadPool(10);
try {
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
String input = lineSupplier.get(); final var input = lineSupplier.get();
if (input == null) { if (input == null) {
logger.trace("Reached end of JSON-RPC input stream."); logger.trace("Reached end of JSON-RPC input stream.");
break; break;
} }
logger.trace("Incoming JSON-RPC message: {}", input); logger.trace("Incoming JSON-RPC message: {}", input);
JsonRpcMessage message = parseJsonRpcMessage(input); final var message = parseJsonRpcMessage(input);
if (message == null) { if (message == null) {
continue; continue;
} }
handleMessage(message, requestHandler, responseHandler); executor.submit(() -> handleMessage(message, requestHandler, responseHandler));
}
} finally {
Util.closeExecutorService(executor);
} }
} }
@ -84,16 +91,41 @@ public class JsonRpcReader {
} else if (message instanceof JsonRpcResponse jsonRpcResponse) { } else if (message instanceof JsonRpcResponse jsonRpcResponse) {
responseHandler.accept(jsonRpcResponse); responseHandler.accept(jsonRpcResponse);
} else { } else {
final var responseList = ((JsonRpcBatchMessage) message).getMessages().stream().map(jsonNode -> { final var messages = ((JsonRpcBatchMessage) message).getMessages();
final var responseList = new ArrayList<JsonRpcResponse>(messages.size());
final var executor = Executors.newFixedThreadPool(10);
try {
final var lock = new ReentrantLock();
messages.forEach(jsonNode -> {
final JsonRpcRequest request; final JsonRpcRequest request;
try { try {
request = parseJsonRpcRequest(jsonNode); request = parseJsonRpcRequest(jsonNode);
} catch (JsonRpcException e) { } catch (JsonRpcException e) {
return JsonRpcResponse.forError(e.getError(), getId(jsonNode)); final var response = JsonRpcResponse.forError(e.getError(), getId(jsonNode));
lock.lock();
try {
responseList.add(response);
} finally {
lock.unlock();
}
return;
} }
return handleRequest(requestHandler, request); executor.submit(() -> {
}).filter(Objects::nonNull).toList(); final var response = handleRequest(requestHandler, request);
if (response != null) {
lock.lock();
try {
responseList.add(response);
} finally {
lock.unlock();
}
}
});
});
} finally {
Util.closeExecutorService(executor);
}
if (responseList.size() > 0) { if (responseList.size() > 0) {
jsonRpcSender.sendBatchResponses(responseList); jsonRpcSender.sendBatchResponses(responseList);

View file

@ -5,6 +5,9 @@ import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
@ -13,10 +16,14 @@ import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class Util { public class Util {
private final static Logger logger = LoggerFactory.getLogger(Util.class);
private Util() { private Util() {
} }
@ -80,4 +87,18 @@ public class Util {
return map; return map;
} }
public static void closeExecutorService(ExecutorService executor) {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
executor.shutdownNow();
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
logger.warn("Failed to shutdown executor service");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
} }