Switch to jsonrpsee

Fixes #1275
This commit is contained in:
AsamK 2023-08-11 00:21:38 +02:00
parent edbf803a98
commit 8037fb2d66
10 changed files with 928 additions and 521 deletions

View file

@ -0,0 +1,23 @@
use std::path::Path;
use futures_util::stream::StreamExt;
use jsonrpsee::core::client::{TransportReceiverT, TransportSenderT};
use jsonrpsee::core::Error;
use tokio::net::UnixStream;
use tokio_util::codec::Decoder;
use super::stream_codec::StreamCodec;
use super::{Receiver, Sender};
/// Connect to a JSON-RPC Unix Socket server.
pub async fn connect(
socket: impl AsRef<Path>,
) -> Result<(impl TransportSenderT + Send, impl TransportReceiverT + Send), Error> {
let connection = UnixStream::connect(socket).await?;
let (sink, stream) = StreamCodec::stream_incoming().framed(connection).split();
let sender = Sender { inner: sink };
let receiver = Receiver { inner: stream };
Ok((sender, receiver))
}

View file

@ -0,0 +1,64 @@
use futures_util::{stream::StreamExt, Sink, SinkExt, Stream};
use jsonrpsee::core::{
async_trait,
client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
};
use thiserror::Error;
pub mod ipc;
mod stream_codec;
pub mod tcp;
#[derive(Debug, Error)]
enum Errors {
#[error("Other: {0}")]
Other(String),
#[error("Closed")]
Closed,
}
struct Sender<T: Send + Sink<String>> {
inner: T,
}
#[async_trait]
impl<T: Send + Sink<String, Error = impl std::error::Error> + Unpin + 'static> TransportSenderT
for Sender<T>
{
type Error = Errors;
async fn send(&mut self, body: String) -> Result<(), Self::Error> {
self.inner
.send(body)
.await
.map_err(|e| Errors::Other(format!("{:?}", e)))?;
Ok(())
}
async fn close(&mut self) -> Result<(), Self::Error> {
self.inner
.close()
.await
.map_err(|e| Errors::Other(format!("{:?}", e)))?;
Ok(())
}
}
struct Receiver<T: Send + Stream> {
inner: T,
}
#[async_trait]
impl<T: Send + Stream<Item = Result<String, std::io::Error>> + Unpin + 'static> TransportReceiverT
for Receiver<T>
{
type Error = Errors;
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
match self.inner.next().await {
None => Err(Errors::Closed),
Some(Ok(msg)) => Ok(ReceivedMessage::Text(msg)),
Some(Err(e)) => Err(Errors::Other(format!("{:?}", e))),
}
}
}

View file

@ -0,0 +1,61 @@
use bytes::BytesMut;
use std::{io, str};
use tokio_util::codec::{Decoder, Encoder};
type Separator = u8;
/// Stream codec for streaming protocols (ipc, tcp)
#[derive(Debug, Default)]
pub struct StreamCodec {
incoming_separator: Separator,
outgoing_separator: Separator,
}
impl StreamCodec {
/// Default codec with streaming input data. Input can be both enveloped and not.
pub fn stream_incoming() -> Self {
StreamCodec::new(b'\n', b'\n')
}
/// New custom stream codec
pub fn new(incoming_separator: Separator, outgoing_separator: Separator) -> Self {
StreamCodec {
incoming_separator,
outgoing_separator,
}
}
}
impl Decoder for StreamCodec {
type Item = String;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
if let Some(i) = buf
.as_ref()
.iter()
.position(|&b| b == self.incoming_separator)
{
let line = buf.split_to(i);
let _ = buf.split_to(1);
match str::from_utf8(line.as_ref()) {
Ok(s) => Ok(Some(s.to_string())),
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid UTF-8")),
}
} else {
Ok(None)
}
}
}
impl Encoder<String> for StreamCodec {
type Error = io::Error;
fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
let mut payload = msg.into_bytes();
payload.push(self.outgoing_separator);
buf.extend_from_slice(&payload);
Ok(())
}
}

View file

@ -0,0 +1,21 @@
use futures_util::stream::StreamExt;
use jsonrpsee::core::client::{TransportReceiverT, TransportSenderT};
use jsonrpsee::core::Error;
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio_util::codec::Decoder;
use super::stream_codec::StreamCodec;
use super::{Receiver, Sender};
/// Connect to a JSON-RPC TCP server.
pub async fn connect(
socket: impl ToSocketAddrs,
) -> Result<(impl TransportSenderT + Send, impl TransportReceiverT + Send), Error> {
let connection = TcpStream::connect(socket).await?;
let (sink, stream) = StreamCodec::stream_incoming().framed(connection).split();
let sender = Sender { inner: sink };
let receiver = Receiver { inner: stream };
Ok((sender, receiver))
}