123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- #[cfg(feature = "search")]
- mod search;
- #[cfg(feature = "search")]
- pub use search::*;
- #[cfg(feature = "ingest")]
- mod ingest;
- #[cfg(feature = "ingest")]
- pub use ingest::*;
- #[cfg(feature = "control")]
- mod control;
- #[cfg(feature = "control")]
- pub use control::*;
- use std::cell::RefCell;
- use std::io::{Read, Write};
- use wasmedge_wasi_socket::{TcpStream, ToSocketAddrs};
- use crate::commands::{StartCommand, StreamCommand};
- use crate::protocol::{self, Protocol};
- use crate::result::*;
- const UNINITIALIZED_MODE_MAX_BUFFER_SIZE: usize = 200;
- /// Channel modes supported by sonic search backend.
- #[derive(Debug, Clone, Copy, PartialEq, Eq)]
- pub enum ChannelMode {
- /// Sonic server search channel mode.
- ///
- /// In this mode you can use `query`, `pag_query`, `suggest`, `lim_suggest`, `ping`
- /// and `quit` commands.
- ///
- /// Note: This mode requires enabling the `search` feature.
- #[cfg(feature = "search")]
- Search,
- /// Sonic server ingest channel mode.
- ///
- /// In this mode you can use `push`, `pop`, `flush`, `count` `ping` and `quit` commands.
- ///
- /// Note: This mode requires enabling the `ingest` feature.
- #[cfg(feature = "ingest")]
- Ingest,
- /// Sonic server control channel mode.
- ///
- /// In this mode you can use `trigger`, `consolidate`, `backup`, `restore`,
- /// `ping` and `quit` commands.
- ///
- /// Note: This mode requires enabling the `control` feature.
- #[cfg(feature = "control")]
- Control,
- }
- impl ChannelMode {
- /// Converts enum to &str
- pub fn as_str(&self) -> &str {
- match self {
- #[cfg(feature = "search")]
- ChannelMode::Search => "search",
- #[cfg(feature = "ingest")]
- ChannelMode::Ingest => "ingest",
- #[cfg(feature = "control")]
- ChannelMode::Control => "control",
- }
- }
- }
- impl std::fmt::Display for ChannelMode {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "{}", self.as_str())
- }
- }
- /// Root and Heart of this library.
- ///
- /// You can connect to the sonic search backend and run all supported protocol methods.
- ///
- #[derive(Debug)]
- pub struct SonicStream {
- stream: RefCell<TcpStream>,
- mode: Option<ChannelMode>, // None – Uninitialized mode
- max_buffer_size: usize,
- protocol: Protocol,
- }
- impl SonicStream {
- fn send<SC: StreamCommand>(&self, command: &SC) -> Result<()> {
- let buf = self
- .protocol
- .format_request(command.request())
- .map_err(|_| Error::WriteToStream)?;
- self.stream
- .borrow_mut()
- .write_all(&buf)
- .map_err(|_| Error::WriteToStream)?;
- Ok(())
- }
- fn read_line(&self) -> Result<protocol::Response> {
- let mut buffer = Vec::with_capacity(self.max_buffer_size);
- let mut stream = self.stream.borrow_mut();
- loop {
- let mut byte = [0u8; 1];
- match stream.read(&mut byte) {
- Ok(0) => {
- break;
- }
- Ok(1) => {
- buffer.push(byte[0]);
- if byte[0] == b'\n' {
- break;
- }
- }
- Ok(_) => {
- return Err(Error::ReadStream);
- }
- Err(_) => {
- return Err(Error::ReadStream);
- }
- }
- }
- let line_string = String::from_utf8(buffer.to_vec()).map_err(|e| {
- println!("Invalid UTF-8 sequence: {:?}", e);
- Error::ReadStream
- })?;
-
- log::debug!("[channel] {}", &line_string);
- self.protocol.parse_response(&line_string)
- }
- pub(crate) fn run_command<SC: StreamCommand>(&self, command: SC) -> Result<SC::Response> {
- self.send(&command)?;
- let res = loop {
- let res = self.read_line()?;
- if !matches!(&res, protocol::Response::Pending(_)) {
- break res;
- }
- };
- command.receive(res)
- }
- fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
- let stream = TcpStream::connect(addr).map_err(|_| Error::ConnectToServer)?;
- let channel = SonicStream {
- stream: RefCell::new(stream),
- mode: None,
- max_buffer_size: UNINITIALIZED_MODE_MAX_BUFFER_SIZE,
- protocol: Default::default(),
- };
- let res = channel.read_line()?;
- if matches!(res, protocol::Response::Connected) {
- Ok(channel)
- } else {
- Err(Error::ConnectToServer)
- }
- }
- fn start<S: ToString>(&mut self, mode: ChannelMode, password: S) -> Result<()> {
- if self.mode.is_some() {
- return Err(Error::RunCommand);
- }
- let res = self.run_command(StartCommand {
- mode,
- password: password.to_string(),
- })?;
- self.max_buffer_size = res.max_buffer_size;
- self.protocol = Protocol::from(res.protocol_version);
- self.mode = Some(res.mode);
- Ok(())
- }
- /// Connect to the search backend in chosen mode.
- ///
- /// I think we shouldn't separate commands connect and start because we haven't
- /// possibility to change channel in sonic server, if we already chosen one of them. 🤔
- pub(crate) fn connect_with_start<A, S>(mode: ChannelMode, addr: A, password: S) -> Result<Self>
- where
- A: ToSocketAddrs,
- S: ToString,
- {
- let mut channel = Self::connect(addr)?;
- channel.start(mode, password)?;
- Ok(channel)
- }
- }
- /// This trait should be implemented for all supported sonic channels
- pub trait SonicChannel {
- /// Sonic channel struct
- type Channel;
- /// Returns reference for sonic stream of connection
- fn stream(&self) -> &SonicStream;
- /// Connects to sonic backend and run start command.
- ///
- /// ```rust,no_run
- /// # use sonic_channel::*;
- /// # fn main() -> result::Result<()> {
- /// let search_channel = SearchChannel::start(
- /// "localhost:1491",
- /// "SecretPassword",
- /// )?;
- /// # Ok(())
- /// # }
- /// ```
- fn start<A, S>(addr: A, password: S) -> Result<Self::Channel>
- where
- A: ToSocketAddrs,
- S: ToString;
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- #[test]
- fn format_channel_enums() {
- #[cfg(feature = "search")]
- assert_eq!(format!("{}", ChannelMode::Search), String::from("search"));
- #[cfg(feature = "ingest")]
- assert_eq!(format!("{}", ChannelMode::Ingest), String::from("ingest"));
- #[cfg(feature = "control")]
- assert_eq!(format!("{}", ChannelMode::Control), String::from("control"));
- }
- }
|