protocol.rs 10 KB


  1. use std::io::{self, BufWriter, Write};
  2. use std::{path::PathBuf, str::FromStr};
  3. use crate::{result::*, ChannelMode};
  4. #[derive(Debug, Default)]
  5. pub struct Protocol {
  6. #[allow(dead_code)]
  7. version: Version,
  8. }
  9. impl From<Version> for Protocol {
  10. fn from(version: Version) -> Self {
  11. Self { version }
  12. }
  13. }
  14. impl Protocol {
  15. pub fn format_request(&self, req: Request) -> io::Result<Vec<u8>> {
  16. let mut res = BufWriter::new(Vec::new());
  17. match req {
  18. Request::Quit => write!(res, "QUIT")?,
  19. Request::Ping => write!(res, "PING")?,
  20. Request::Start { mode, password } => write!(res, "START {} {}", mode, password)?,
  21. #[rustfmt::skip]
  22. Request::Count { collection, bucket, object } => match (bucket, object) {
  23. (Some(b), Some(o)) => write!(res, "COUNT {} {} {}", collection, b, o)?,
  24. (Some(b), None) => write!(res, "COUNT {} {}", collection, b)?,
  25. (None, None) => write!(res, "COUNT {}", collection)?,
  26. _ => panic!("Wrong protocol format"),
  27. },
  28. #[rustfmt::skip]
  29. Request::Flush { collection, bucket, object } => match (bucket, object) {
  30. (Some(b), Some(o)) => write!(res, "FLUSHO {} {} {}", collection, b, o)?,
  31. (Some(b), None) => write!(res, "FLUSHB {} {}", collection, b)?,
  32. (None, None) => write!(res, "FLUSHC {}", collection)?,
  33. _ => panic!("Wrong protocol format"),
  34. },
  35. #[rustfmt::skip]
  36. Request::Pop { collection, bucket, object, terms } => {
  37. write!(res, "POP {} {} {} \"{}\"", collection, bucket, object, terms)?
  38. },
  39. #[rustfmt::skip]
  40. Request::Push { collection, bucket, object, terms, lang } => {
  41. let oneline_terms = remove_multiline(&terms);
  42. write!(res, "PUSH {} {} {} \"{}\"", collection, bucket, object, oneline_terms)?;
  43. if let Some(lang) = lang {
  44. write!(res, " LANG({})", lang)?
  45. }
  46. }
  47. #[rustfmt::skip]
  48. Request::Query { collection, bucket, terms, offset, limit, lang } => {
  49. write!(res, "QUERY {} {} \"{}\"", collection, bucket, terms)?;
  50. if let Some(limit) = limit {
  51. write!(res, " LIMIT({})", limit)?;
  52. }
  53. if let Some(offset) = offset {
  54. write!(res, " OFFSET({})", offset)?;
  55. }
  56. if let Some(lang) = lang {
  57. write!(res, " LANG({})", lang)?;
  58. }
  59. }
  60. #[rustfmt::skip]
  61. Request::Suggest { collection, bucket, word, limit } => {
  62. write!(res, "SUGGEST {} {} \"{}\"", collection, bucket, word)?;
  63. if let Some(limit) = limit {
  64. write!(res, " LIMIT({})", limit)?;
  65. }
  66. }
  67. #[rustfmt::skip]
  68. Request::List { collection, bucket, limit, offset } => {
  69. write!(res, "LIST {} {}", collection, bucket)?;
  70. if let Some(limit) = limit {
  71. write!(res, " LIMIT({})", limit)?;
  72. }
  73. if let Some(offset) = offset {
  74. write!(res, " OFFSET({})", offset)?;
  75. }
  76. }
  77. Request::Trigger(triger_req) => match triger_req {
  78. TriggerRequest::Consolidate => write!(res, "TRIGGER consolidate")?,
  79. TriggerRequest::Backup(path) => {
  80. write!(res, "TRIGGER backup {}", path.to_str().unwrap())?
  81. }
  82. TriggerRequest::Restore(path) => {
  83. write!(res, "TRIGGER restore {}", path.to_str().unwrap())?
  84. }
  85. },
  86. }
  87. write!(res, "\r\n")?;
  88. res.flush()?;
  89. Ok(res.into_inner()?)
  90. }
  91. pub fn parse_response(&self, line: &str) -> Result<Response> {
  92. let mut segments = line.split_whitespace();
  93. match segments.next() {
  94. Some("STARTED") => match (segments.next(), segments.next(), segments.next()) {
  95. (Some(_raw_mode), Some(raw_protocol), Some(raw_buffer_size)) => {
  96. Ok(Response::Started(StartedPayload {
  97. protocol_version: parse_server_config(raw_protocol)?,
  98. max_buffer_size: parse_server_config(raw_buffer_size)?,
  99. }))
  100. }
  101. _ => Err(Error::WrongResponse),
  102. },
  103. Some("PENDING") => {
  104. let event_id = segments
  105. .next()
  106. .map(String::from)
  107. .ok_or(Error::WrongResponse)?;
  108. Ok(Response::Pending(event_id))
  109. }
  110. Some("RESULT") => match segments.next() {
  111. Some(num) => num
  112. .parse()
  113. .map(Response::Result)
  114. .map_err(|_| Error::WrongResponse),
  115. _ => Err(Error::WrongResponse),
  116. },
  117. Some("EVENT") => {
  118. let event_kind = match segments.next() {
  119. Some("SUGGEST") => Ok(EventKind::Suggest),
  120. Some("QUERY") => Ok(EventKind::Query),
  121. Some("LIST") => Ok(EventKind::List),
  122. _ => Err(Error::WrongResponse),
  123. }?;
  124. let event_id = segments
  125. .next()
  126. .map(String::from)
  127. .ok_or(Error::WrongResponse)?;
  128. let objects = segments.map(String::from).collect();
  129. Ok(Response::Event(event_kind, event_id, objects))
  130. }
  131. Some("OK") => Ok(Response::Ok),
  132. Some("ENDED") => Ok(Response::Ended),
  133. Some("CONNECTED") => Ok(Response::Connected),
  134. Some("ERR") => match segments.next() {
  135. Some(message) => Err(Error::SonicServer(String::from(message))),
  136. _ => Err(Error::WrongResponse),
  137. },
  138. _ => Err(Error::WrongResponse),
  139. }
  140. }
  141. }
  142. //===========================================================================//
  143. // Primitives //
  144. //===========================================================================//
  145. #[derive(Debug, PartialEq, Eq)]
  146. #[repr(u8)]
  147. pub enum Version {
  148. V1 = 1,
  149. }
  150. impl Default for Version {
  151. fn default() -> Self {
  152. Self::V1
  153. }
  154. }
  155. impl TryFrom<u8> for Version {
  156. type Error = ();
  157. fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
  158. match value {
  159. 1 => Ok(Self::V1),
  160. _ => Err(()),
  161. }
  162. }
  163. }
  164. //===========================================================================//
  165. // Response //
  166. //===========================================================================//
  167. pub type EventId = String;
  168. #[derive(Debug)]
  169. pub enum Response {
  170. Ok,
  171. Ended,
  172. Connected,
  173. Pending(EventId),
  174. Pong,
  175. Started(StartedPayload),
  176. Result(usize),
  177. Event(EventKind, EventId, Vec<String>),
  178. }
  179. #[derive(Debug)]
  180. pub struct StartedPayload {
  181. pub protocol_version: u8,
  182. pub max_buffer_size: usize,
  183. }
  184. #[derive(Debug)]
  185. pub enum EventKind {
  186. Suggest,
  187. Query,
  188. List,
  189. }
  190. //===========================================================================//
  191. // Request //
  192. //===========================================================================//
  193. #[derive(Debug)]
  194. pub enum Request {
  195. Start {
  196. mode: ChannelMode,
  197. password: String,
  198. },
  199. Quit,
  200. Ping,
  201. Trigger(TriggerRequest),
  202. Suggest {
  203. collection: String,
  204. bucket: String,
  205. word: String,
  206. limit: Option<usize>,
  207. },
  208. List {
  209. collection: String,
  210. bucket: String,
  211. limit: Option<usize>,
  212. offset: Option<usize>,
  213. },
  214. Query {
  215. collection: String,
  216. bucket: String,
  217. terms: String,
  218. offset: Option<usize>,
  219. limit: Option<usize>,
  220. lang: Option<&'static str>,
  221. },
  222. Push {
  223. collection: String,
  224. bucket: String,
  225. object: String,
  226. terms: String,
  227. lang: Option<&'static str>,
  228. },
  229. Pop {
  230. collection: String,
  231. bucket: String,
  232. object: String,
  233. terms: String,
  234. },
  235. Flush {
  236. collection: String,
  237. bucket: Option<String>,
  238. object: Option<String>,
  239. },
  240. Count {
  241. collection: String,
  242. bucket: Option<String>,
  243. object: Option<String>,
  244. },
  245. }
  246. #[derive(Debug)]
  247. pub enum TriggerRequest {
  248. Consolidate,
  249. Backup(PathBuf),
  250. Restore(PathBuf),
  251. }
  252. //===========================================================================//
  253. // Utils //
  254. //===========================================================================//
  255. fn parse_server_config<T: FromStr>(raw: &str) -> Result<T> {
  256. raw.split_terminator(&['(', ')'])
  257. .nth(1)
  258. .ok_or(Error::WrongResponse)?
  259. .parse()
  260. .map_err(|_| Error::WrongResponse)
  261. }
  262. fn remove_multiline(text: &str) -> String {
  263. text.lines()
  264. .enumerate()
  265. .fold(String::new(), |mut acc, (i, line)| {
  266. if i != 0 && !line.is_empty() && !acc.is_empty() && !acc.ends_with(' ') {
  267. acc.push(' ');
  268. }
  269. acc.push_str(line);
  270. acc
  271. })
  272. }
  273. #[cfg(test)]
  274. mod tests {
  275. use super::*;
  276. #[test]
  277. fn should_parse_protocol() {
  278. match parse_server_config::<u8>("protocol(1)") {
  279. Ok(protocol) => assert_eq!(protocol, 1),
  280. _ => unreachable!(),
  281. }
  282. }
  283. #[test]
  284. fn should_parse_buffer_size() {
  285. match parse_server_config::<usize>("buffer_size(20000)") {
  286. Ok(buffer_size) => assert_eq!(buffer_size, 20000),
  287. _ => unreachable!(),
  288. }
  289. }
  290. #[test]
  291. fn should_make_single_line() {
  292. let text = "
  293. Hello
  294. World
  295. ";
  296. let expected_text = "Hello World";
  297. assert_eq!(remove_multiline(text), expected_text);
  298. }
  299. }