channels.rs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. #[cfg(feature = "search")]
  2. mod search;
  3. #[cfg(feature = "search")]
  4. pub use search::*;
  5. #[cfg(feature = "ingest")]
  6. mod ingest;
  7. #[cfg(feature = "ingest")]
  8. pub use ingest::*;
  9. #[cfg(feature = "control")]
  10. mod control;
  11. #[cfg(feature = "control")]
  12. pub use control::*;
  13. use std::cell::RefCell;
  14. use std::io::{Read, Write};
  15. use wasmedge_wasi_socket::{TcpStream, ToSocketAddrs};
  16. use crate::commands::{StartCommand, StreamCommand};
  17. use crate::protocol::{self, Protocol};
  18. use crate::result::*;
  19. const UNINITIALIZED_MODE_MAX_BUFFER_SIZE: usize = 200;
  20. /// Channel modes supported by sonic search backend.
  21. #[derive(Debug, Clone, Copy, PartialEq, Eq)]
  22. pub enum ChannelMode {
  23. /// Sonic server search channel mode.
  24. ///
  25. /// In this mode you can use `query`, `pag_query`, `suggest`, `lim_suggest`, `ping`
  26. /// and `quit` commands.
  27. ///
  28. /// Note: This mode requires enabling the `search` feature.
  29. #[cfg(feature = "search")]
  30. Search,
  31. /// Sonic server ingest channel mode.
  32. ///
  33. /// In this mode you can use `push`, `pop`, `flush`, `count` `ping` and `quit` commands.
  34. ///
  35. /// Note: This mode requires enabling the `ingest` feature.
  36. #[cfg(feature = "ingest")]
  37. Ingest,
  38. /// Sonic server control channel mode.
  39. ///
  40. /// In this mode you can use `trigger`, `consolidate`, `backup`, `restore`,
  41. /// `ping` and `quit` commands.
  42. ///
  43. /// Note: This mode requires enabling the `control` feature.
  44. #[cfg(feature = "control")]
  45. Control,
  46. }
  47. impl ChannelMode {
  48. /// Converts enum to &str
  49. pub fn as_str(&self) -> &str {
  50. match self {
  51. #[cfg(feature = "search")]
  52. ChannelMode::Search => "search",
  53. #[cfg(feature = "ingest")]
  54. ChannelMode::Ingest => "ingest",
  55. #[cfg(feature = "control")]
  56. ChannelMode::Control => "control",
  57. }
  58. }
  59. }
  60. impl std::fmt::Display for ChannelMode {
  61. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  62. write!(f, "{}", self.as_str())
  63. }
  64. }
  65. /// Root and Heart of this library.
  66. ///
  67. /// You can connect to the sonic search backend and run all supported protocol methods.
  68. ///
  69. #[derive(Debug)]
  70. pub struct SonicStream {
  71. stream: RefCell<TcpStream>,
  72. mode: Option<ChannelMode>, // None – Uninitialized mode
  73. max_buffer_size: usize,
  74. protocol: Protocol,
  75. }
  76. impl SonicStream {
  77. fn send<SC: StreamCommand>(&self, command: &SC) -> Result<()> {
  78. let buf = self
  79. .protocol
  80. .format_request(command.request())
  81. .map_err(|_| Error::WriteToStream)?;
  82. self.stream
  83. .borrow_mut()
  84. .write_all(&buf)
  85. .map_err(|_| Error::WriteToStream)?;
  86. Ok(())
  87. }
  88. fn read_line(&self) -> Result<protocol::Response> {
  89. let mut buffer = Vec::with_capacity(self.max_buffer_size);
  90. let mut stream = self.stream.borrow_mut();
  91. loop {
  92. let mut byte = [0u8; 1];
  93. match stream.read(&mut byte) {
  94. Ok(0) => {
  95. break;
  96. }
  97. Ok(1) => {
  98. buffer.push(byte[0]);
  99. if byte[0] == b'\n' {
  100. break;
  101. }
  102. }
  103. Ok(_) => {
  104. return Err(Error::ReadStream);
  105. }
  106. Err(_) => {
  107. return Err(Error::ReadStream);
  108. }
  109. }
  110. }
  111. let line_string = String::from_utf8(buffer.to_vec()).map_err(|e| {
  112. println!("Invalid UTF-8 sequence: {:?}", e);
  113. Error::ReadStream
  114. })?;
  115. log::debug!("[channel] {}", &line_string);
  116. self.protocol.parse_response(&line_string)
  117. }
  118. pub(crate) fn run_command<SC: StreamCommand>(&self, command: SC) -> Result<SC::Response> {
  119. self.send(&command)?;
  120. let res = loop {
  121. let res = self.read_line()?;
  122. if !matches!(&res, protocol::Response::Pending(_)) {
  123. break res;
  124. }
  125. };
  126. command.receive(res)
  127. }
  128. fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
  129. let stream = TcpStream::connect(addr).map_err(|_| Error::ConnectToServer)?;
  130. let channel = SonicStream {
  131. stream: RefCell::new(stream),
  132. mode: None,
  133. max_buffer_size: UNINITIALIZED_MODE_MAX_BUFFER_SIZE,
  134. protocol: Default::default(),
  135. };
  136. let res = channel.read_line()?;
  137. if matches!(res, protocol::Response::Connected) {
  138. Ok(channel)
  139. } else {
  140. Err(Error::ConnectToServer)
  141. }
  142. }
  143. fn start<S: ToString>(&mut self, mode: ChannelMode, password: S) -> Result<()> {
  144. if self.mode.is_some() {
  145. return Err(Error::RunCommand);
  146. }
  147. let res = self.run_command(StartCommand {
  148. mode,
  149. password: password.to_string(),
  150. })?;
  151. self.max_buffer_size = res.max_buffer_size;
  152. self.protocol = Protocol::from(res.protocol_version);
  153. self.mode = Some(res.mode);
  154. Ok(())
  155. }
  156. /// Connect to the search backend in chosen mode.
  157. ///
  158. /// I think we shouldn't separate commands connect and start because we haven't
  159. /// possibility to change channel in sonic server, if we already chosen one of them. 🤔
  160. pub(crate) fn connect_with_start<A, S>(mode: ChannelMode, addr: A, password: S) -> Result<Self>
  161. where
  162. A: ToSocketAddrs,
  163. S: ToString,
  164. {
  165. let mut channel = Self::connect(addr)?;
  166. channel.start(mode, password)?;
  167. Ok(channel)
  168. }
  169. }
  170. /// This trait should be implemented for all supported sonic channels
  171. pub trait SonicChannel {
  172. /// Sonic channel struct
  173. type Channel;
  174. /// Returns reference for sonic stream of connection
  175. fn stream(&self) -> &SonicStream;
  176. /// Connects to sonic backend and run start command.
  177. ///
  178. /// ```rust,no_run
  179. /// # use sonic_channel::*;
  180. /// # fn main() -> result::Result<()> {
  181. /// let search_channel = SearchChannel::start(
  182. /// "localhost:1491",
  183. /// "SecretPassword",
  184. /// )?;
  185. /// # Ok(())
  186. /// # }
  187. /// ```
  188. fn start<A, S>(addr: A, password: S) -> Result<Self::Channel>
  189. where
  190. A: ToSocketAddrs,
  191. S: ToString;
  192. }
  193. #[cfg(test)]
  194. mod tests {
  195. use super::*;
  196. #[test]
  197. fn format_channel_enums() {
  198. #[cfg(feature = "search")]
  199. assert_eq!(format!("{}", ChannelMode::Search), String::from("search"));
  200. #[cfg(feature = "ingest")]
  201. assert_eq!(format!("{}", ChannelMode::Ingest), String::from("ingest"));
  202. #[cfg(feature = "control")]
  203. assert_eq!(format!("{}", ChannelMode::Control), String::from("control"));
  204. }
  205. }