Refactor JsonRpcReader to for handling a single message

This commit is contained in:
AsamK 2022-11-01 18:06:40 +01:00
parent ae678871ec
commit bf76c04664

View file

@ -13,6 +13,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -24,21 +25,56 @@ public class JsonRpcReader {
private final JsonRpcSender jsonRpcSender;
private final ObjectMapper objectMapper;
private final InputStream input;
private final Supplier<String> lineSupplier;
public JsonRpcReader(
final JsonRpcSender jsonRpcSender, final Supplier<String> lineSupplier
) {
public JsonRpcReader(final JsonRpcSender jsonRpcSender, final Supplier<String> lineSupplier) {
this.jsonRpcSender = jsonRpcSender;
this.input = null;
this.lineSupplier = lineSupplier;
this.objectMapper = Util.createJsonObjectMapper();
}
public void readMessages(final RequestHandler requestHandler, final Consumer<JsonRpcResponse> responseHandler) {
while (!Thread.interrupted()) {
JsonRpcMessage message = readMessage();
if (message == null) break;
public JsonRpcReader(final JsonRpcSender jsonRpcSender, final InputStream input) {
this.jsonRpcSender = jsonRpcSender;
this.input = input;
this.lineSupplier = null;
this.objectMapper = Util.createJsonObjectMapper();
}
public void readMessages(final RequestHandler requestHandler, final Consumer<JsonRpcResponse> responseHandler) {
if (input != null) {
JsonRpcMessage message = parseJsonRpcMessage(input);
if (message == null) {
return;
}
handleMessage(message, requestHandler, responseHandler);
return;
}
while (!Thread.interrupted()) {
String input = lineSupplier.get();
if (input == null) {
logger.trace("Reached end of JSON-RPC input stream.");
break;
}
logger.trace("Incoming JSON-RPC message: {}", input);
JsonRpcMessage message = parseJsonRpcMessage(input);
if (message == null) {
continue;
}
handleMessage(message, requestHandler, responseHandler);
}
}
private void handleMessage(
final JsonRpcMessage message,
final RequestHandler requestHandler,
final Consumer<JsonRpcResponse> responseHandler
) {
if (message instanceof final JsonRpcRequest jsonRpcRequest) {
logger.debug("Received json rpc request, method: " + jsonRpcRequest.getMethod());
final var response = handleRequest(requestHandler, jsonRpcRequest);
@ -62,7 +98,6 @@ public class JsonRpcReader {
jsonRpcSender.sendBatchResponses(responseList);
}
}
}
private JsonRpcResponse handleRequest(final RequestHandler requestHandler, final JsonRpcRequest request) {
try {
@ -85,25 +120,6 @@ public class JsonRpcReader {
return null;
}
private JsonRpcMessage readMessage() {
while (!Thread.interrupted()) {
String input = lineSupplier.get();
if (input == null) {
logger.trace("Reached end of JSON-RPC input stream.");
break;
}
logger.trace("Incoming JSON-RPC message: {}", input);
JsonRpcMessage message = parseJsonRpcMessage(input);
if (message == null) continue;
return message;
}
return null;
}
private JsonRpcMessage parseJsonRpcMessage(final String input) {
final JsonNode jsonNode;
try {
@ -117,6 +133,26 @@ public class JsonRpcReader {
throw new AssertionError(e);
}
return parseJsonRpcMessage(jsonNode);
}
private JsonRpcMessage parseJsonRpcMessage(final InputStream input) {
final JsonNode jsonNode;
try {
jsonNode = objectMapper.readTree(input);
} catch (JsonParseException e) {
jsonRpcSender.sendResponse(JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.PARSE_ERROR,
e.getMessage(),
null), null));
return null;
} catch (IOException e) {
throw new AssertionError(e);
}
return parseJsonRpcMessage(jsonNode);
}
private JsonRpcMessage parseJsonRpcMessage(final JsonNode jsonNode) {
if (jsonNode == null) {
jsonRpcSender.sendResponse(JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.INVALID_REQUEST,
"invalid request",