server.rs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. use std::collections::HashSet;
  2. use std::fs;
  3. use crate::config::Config;
  4. use crate::indexes::{Indexes, SerializableMessage, SerializableThread};
  5. use serde::{Deserialize, Serialize};
  6. use std::fs::{File, metadata};
  7. use std::future::Future;
  8. use std::io::{BufReader, Read};
  9. use std::net::{IpAddr, SocketAddr};
  10. use std::path::PathBuf;
  11. use std::str::FromStr;
  12. use std::sync::{Arc, Mutex};
  13. use chrono::{DateTime, Utc};
  14. use futures::{SinkExt, StreamExt};
  15. use warp::Filter;
  16. use warp::fs::file;
  17. use warp::http::{response, StatusCode};
  18. use crate::{create_folder_lar, delete_folder_lar, IMAP_TASK_STORAGE, rename_folder_lar, start_checkin_for_all_updates};
  19. use crate::imap::{connect_to_imap, get_session};
  20. use crate::models::MailAddress;
  21. use crate::sonic::SearchSonic;
  22. use crate::util::read_and_decompress_file;
  23. use tokio::sync::mpsc::UnboundedSender;
  24. use warp::ws::{Message, WebSocket};
  25. use once_cell::sync::Lazy;
  26. type Client = Arc<Mutex<Option<UnboundedSender<Message>>>>;
  27. static CLIENT: Lazy<Client> = Lazy::new(|| {
  28. Arc::new(Mutex::new(None))
  29. });
  30. /// folders -> GET, returns list of all mailboxes
  31. /// sorted_threads_by_date -> GET, returns hashmap with date: list of thread ids sorted by date
  32. /// get_thread -> GET, returns all information about one thread
  33. /// get_thread_messages -> GET, returns a list of SerializableMessage in the thread
  34. /// email -> GET, returns html for the message
  35. /// search -> GET, returns a list of message ids where search found matches
  36. /// create_folder -> POST, creates a new mailbox
  37. /// rename_folder -> POST, renames a mailbox
  38. /// delete_folder -> POST, deletes a mailbox
  39. pub async fn run_api() {
  40. let cors = warp::cors()
  41. .allow_any_origin()
  42. .allow_methods(vec!["GET", "POST"])
  43. .allow_headers(vec![warp::http::header::CONTENT_TYPE]);
  44. let routes = warp::path("folders")
  45. .and(warp::get())
  46. .and_then(get_folders_handle)
  47. .or(warp::path("sorted_threads_by_date")
  48. .and(warp::get())
  49. .and(warp::query::<GetSortedThreadsByDateQuery>())
  50. .and_then(sorted_threads_by_date_handle))
  51. .or(warp::path("get_thread")
  52. .and(warp::get())
  53. .and(warp::query::<GetThreadQuery>())
  54. .and_then(get_thread_handle))
  55. .or(warp::path("get_thread_messages")
  56. .and(warp::get())
  57. .and(warp::query::<GetThreadMessagesQuery>())
  58. .and_then(get_thread_messages_handle))
  59. .or(warp::path("email")
  60. .and(warp::get())
  61. .and(warp::query::<GetEmailQuery>())
  62. .and_then(get_email_handle))
  63. .or(warp::path("get_attachments_info")
  64. .and(warp::get())
  65. .and(warp::query::<GetAttachmentsInfoQuery>())
  66. .and_then(get_attachments_info_handle))
  67. .or(warp::path("get_attachment")
  68. .and(warp::get())
  69. .and(warp::query::<GetAttachmentQuery>())
  70. .and_then(get_attachment_handle))
  71. .or(warp::path("search")
  72. .and(warp::get())
  73. .and(warp::query::<SearchQuery>())
  74. .and_then(search_handle))
  75. .or(warp::path("create_folder")
  76. .and(warp::post())
  77. .and(warp::body::json())
  78. .and_then(create_folder_handle))
  79. .or(warp::path("rename_folder")
  80. .and(warp::post())
  81. .and(warp::body::json())
  82. .and_then(rename_folder_handle))
  83. .or(warp::path("delete_folder")
  84. .and(warp::post())
  85. .and(warp::body::json())
  86. .and_then(delete_folder_handle))
  87. .or(warp::path("is_logged_in")
  88. .and(warp::get())
  89. .and_then(is_logged_in_handle))
  90. .or(warp::path("log_in")
  91. .and(warp::post())
  92. .and(warp::body::json())
  93. .and_then(log_in_handle))
  94. .or(warp::path("connect")
  95. .and(warp::ws())
  96. .map(|ws: warp::ws::Ws| {
  97. ws.on_upgrade(|socket| ws_handler(socket))
  98. }))
  99. .or(warp::path("disconnect")
  100. .and(warp::post())
  101. .and_then(disconnect_handle))
  102. .with(cors);
  103. let ip: IpAddr = IpAddr::from_str(&*Config::global().api_addr.clone()).expect("Invalid API IP address");
  104. let addr = SocketAddr::new(ip, Config::global().api_port.clone() as u16);
  105. warp::serve(routes).run(addr).await;
  106. }
  107. async fn ws_handler(ws: WebSocket) {
  108. let (mut ws_tx, mut ws_rx) = ws.split();
  109. let (client_tx, mut client_rx) = tokio::sync::mpsc::unbounded_channel();
  110. // Overwrite the existing client with the new one
  111. {
  112. let mut client_guard = CLIENT.lock().unwrap();
  113. *client_guard = Some(client_tx);
  114. }
  115. // Task to receive messages from the client (if needed)
  116. let rx_task = tokio::spawn(async move {
  117. while let Some(result) = ws_rx.next().await {
  118. match result {
  119. Ok(msg) => {
  120. // Handle client messages here if necessary
  121. }
  122. Err(e) => {
  123. eprintln!("WebSocket error: {}", e);
  124. break;
  125. }
  126. }
  127. }
  128. });
  129. // Task to send messages to the client
  130. let tx_task = tokio::spawn(async move {
  131. while let Some(result) = client_rx.recv().await {
  132. if ws_tx.send(result).await.is_err() {
  133. // Client disconnected
  134. break;
  135. }
  136. }
  137. });
  138. // Wait for either task to complete
  139. tokio::select! {
  140. _ = rx_task => {},
  141. _ = tx_task => {},
  142. }
  143. // Remove the client from storage when done
  144. {
  145. let mut client_guard = CLIENT.lock().unwrap();
  146. *client_guard = None;
  147. }
  148. }
  149. pub async fn broadcast_message(message: String) {
  150. let msg = Message::text(message);
  151. let client_guard = CLIENT.lock().unwrap();
  152. if let Some(tx) = &*client_guard {
  153. let _ = tx.send(msg);
  154. }
  155. }
  156. // functions
  157. pub async fn get_attachment(folder: String, id: String, name: String) -> GetAttachmentResponse{
  158. let attachment_path = Config::global().out_dir.clone()
  159. .join(folder.clone())
  160. .join(".attachments")
  161. .join(id)
  162. .join(name.clone());
  163. let file_result = File::open(attachment_path);
  164. let mut file = match file_result {
  165. Ok(f) => f,
  166. Err(_) => {return GetAttachmentResponse{ name, data: vec![] }}
  167. };
  168. let mut buffer = Vec::new();
  169. let read_result = file.read_to_end(&mut buffer);
  170. match read_result {
  171. Ok(_) => GetAttachmentResponse{ name, data: buffer },
  172. Err(e) => GetAttachmentResponse{ name, data: vec![] },
  173. }
  174. }
  175. pub async fn get_attachments_info(folder: String, id: String) -> String{
  176. let mut attachments_info: Vec<AttachmentInfo> = Vec::new();
  177. let attachments_path = Config::global().out_dir.clone()
  178. .join(folder.clone())
  179. .join(".attachments")
  180. .join(id);
  181. let entries = fs::read_dir(attachments_path);
  182. match entries {
  183. Ok(entries) => {
  184. for entry in entries {
  185. match entry {
  186. Ok(entry) => {
  187. let path = entry.path();
  188. if path.is_file() {
  189. match metadata(&path) {
  190. Ok(meta) => {
  191. let file_size = meta.len();
  192. attachments_info.push(AttachmentInfo{
  193. name: entry.file_name().to_str().unwrap().to_string(),
  194. size: file_size,
  195. path
  196. })
  197. }
  198. Err(_) => {}
  199. }
  200. }
  201. }
  202. Err(_) => {}
  203. }
  204. }
  205. }
  206. Err(_) => {}
  207. }
  208. serde_json::to_string(&attachments_info).unwrap()
  209. }
  210. pub async fn get_folders() -> String {
  211. broadcast_message("folders".to_string()).await;
  212. serde_json::to_string(&Indexes::get_local_mailboxes()).unwrap()
  213. }
  214. pub async fn get_email(id: String) -> String {
  215. let messages = Indexes::get_messages().unwrap();
  216. let message = match messages.iter().find(|message| message.id == id){
  217. None => {return String::new()}
  218. Some(message) => {message}
  219. };
  220. let abs_path = Config::global().out_dir.clone()
  221. .join(message.list.clone())
  222. .join("messages")
  223. .join(message.hash.clone() + ".tar.gz");
  224. let mut content = match read_and_decompress_file(abs_path, message.hash.clone()) {
  225. Ok(content) => {
  226. String::from_utf8_lossy(&content).to_string()
  227. }
  228. Err(_) => String::new()
  229. };
  230. content
  231. }
  232. pub async fn get_thread_messages(id: u32) -> String {
  233. let threads = Indexes::get_threads().unwrap();
  234. let messages = Indexes::get_messages().unwrap();
  235. if let Some(thread) = threads.into_iter().find(|thread| thread.id == id) {
  236. let result_messages: Vec<SerializableMessage> = thread.messages.into_iter()
  237. .filter_map(|message_id| {
  238. messages.iter().find(|message| message.id == message_id).cloned()
  239. })
  240. .collect();
  241. serde_json::to_string(&result_messages).unwrap()
  242. } else {
  243. serde_json::to_string(&Vec::<SerializableMessage>::new()).unwrap()
  244. }
  245. }
  246. pub async fn get_thread(id: u32) -> String {
  247. let threads = Indexes::get_threads().unwrap();
  248. let messages = Indexes::get_messages().unwrap();
  249. let mut get_thread_response: GetThreadResponse = GetThreadResponse::new();
  250. if let Some(thread) = threads.into_iter().find(|thread| thread.id == id) {
  251. get_thread_response.id = thread.id.clone();
  252. get_thread_response.messages = thread.messages.clone();
  253. if let Some(first_message_id) = thread.messages.first(){
  254. if let Some(first_message) = messages.clone().into_iter().find(|message| message.id == *first_message_id){
  255. get_thread_response.subject = first_message.subject.clone();
  256. }
  257. }
  258. if let Some(last_message_id) = thread.messages.last(){
  259. if let Some(last_message) = messages.clone().into_iter().find(|message| message.id == *last_message_id){
  260. get_thread_response.date = last_message.date.clone();
  261. get_thread_response.from_name = last_message.name.clone();
  262. get_thread_response.from_address = last_message.from.clone();
  263. get_thread_response.to = last_message.to.clone();
  264. }
  265. }
  266. serde_json::to_string(&get_thread_response).unwrap()
  267. } else {
  268. serde_json::to_string(&GetThreadResponse::new()).unwrap()
  269. }
  270. }
  271. async fn get_threads(folder: String, limit: usize, offset: usize, file_name: String) -> String {
  272. let path = Config::global()
  273. .out_dir
  274. .clone()
  275. .join(folder)
  276. .join(file_name);
  277. let file = match File::open(path) {
  278. Ok(file) => file,
  279. Err(_) => return serde_json::to_string(&Vec::<(String, Vec<u32>)>::new()).unwrap(),
  280. };
  281. let reader = BufReader::new(file);
  282. let messages: Vec<(String, Vec<u32>)> = match serde_json::from_reader(reader)
  283. {
  284. Ok(messages) => messages,
  285. Err(_) => return serde_json::to_string(&Vec::<(String, Vec<u32>)>::new()).unwrap(),
  286. };
  287. // filtering elements with limit and offset
  288. let mut result = Vec::new();
  289. let mut current_offset = offset;
  290. let mut remaining_limit = limit;
  291. for (msg, data) in messages.into_iter() {
  292. // Skip the inner data until we reach the required offset
  293. if current_offset >= data.len() {
  294. current_offset -= data.len();
  295. continue;
  296. }
  297. // Take the required elements from the current vector
  298. let paginated_data: Vec<u32> = data.into_iter()
  299. .skip(current_offset)
  300. .take(remaining_limit)
  301. .collect();
  302. // Push the result for the current message
  303. if !paginated_data.is_empty() {
  304. result.push((msg, paginated_data));
  305. }
  306. // Update the remaining limit and reset the offset for the next iterations
  307. remaining_limit -= result.last().unwrap().1.len();
  308. current_offset = 0;
  309. // If we've collected enough elements, break out of the loop
  310. if remaining_limit == 0 {
  311. break;
  312. }
  313. }
  314. serde_json::to_string(&result).unwrap()
  315. }
  316. pub async fn get_sorted_threads_by_date(folder: String, limit: usize, offset: usize) -> String {
  317. get_threads(folder, limit, offset, "date.json".to_string()).await
  318. }
  319. pub async fn is_logged_in() -> String {
  320. let response = if (*Config::global().username.lock().unwrap() == "" || *Config::global().password.lock().unwrap() == "") {
  321. serde_json::to_string(&IsLoggedInResponse { is_logged_in: false })
  322. }else{
  323. serde_json::to_string(&IsLoggedInResponse { is_logged_in: true })
  324. };
  325. response.unwrap_or_else(|_| serde_json::to_string(&IsLoggedInResponse { is_logged_in: true }).unwrap())
  326. }
  327. pub async fn log_in(email: String, password: String) -> bool {
  328. Config::global().set_username(email.clone());
  329. Config::global().set_password(password.clone());
  330. match get_session().await{
  331. Ok(_) => {
  332. match Config::global().update_credentials(&email, &password){
  333. Ok(_) => {
  334. let mut idle_task_storage = IMAP_TASK_STORAGE.lock().unwrap();
  335. idle_task_storage.start_downloading();
  336. drop(idle_task_storage);
  337. start_checkin_for_all_updates();
  338. true
  339. },
  340. Err(_) => false
  341. }
  342. }
  343. Err(_) => false
  344. }
  345. }
  346. // Handlers
  347. #[derive(Deserialize)]
  348. struct GetAttachmentQuery {
  349. folder: String,
  350. id: String,
  351. name: String
  352. }
  353. #[derive(Deserialize, Serialize)]
  354. struct GetAttachmentResponse{
  355. pub name: String,
  356. pub data: Vec<u8>
  357. }
  358. async fn get_attachment_handle(query: GetAttachmentQuery) -> Result<impl warp::Reply, warp::Rejection>{
  359. let result: GetAttachmentResponse = get_attachment(query.folder, query.id, query.name).await;
  360. Ok(warp::reply::json(&result))
  361. }
  362. #[derive(Deserialize)]
  363. struct GetAttachmentsInfoQuery {
  364. folder: String,
  365. id: String
  366. }
  367. #[derive(Deserialize, Serialize)]
  368. struct AttachmentInfo{
  369. pub name: String,
  370. pub size: u64,
  371. pub path: PathBuf
  372. }
  373. async fn get_attachments_info_handle(query: GetAttachmentsInfoQuery) -> Result<impl warp::Reply, warp::Rejection>{
  374. let result: Vec<AttachmentInfo> = serde_json::from_str(&*get_attachments_info(query.folder, query.id).await).unwrap();
  375. Ok(warp::reply::json(&result))
  376. }
  377. #[derive(Deserialize, Serialize)]
  378. struct ErrorResponse {
  379. error: String,
  380. }
  381. #[derive(Deserialize)]
  382. struct GetSortedThreadsByDateQuery {
  383. folder: String,
  384. limit: usize,
  385. offset: usize
  386. }
  387. async fn sorted_threads_by_date_handle(query: GetSortedThreadsByDateQuery) -> Result<impl warp::Reply, warp::Rejection> {
  388. let result: Vec<(String, Vec<u32>)> = serde_json::from_str(&*get_sorted_threads_by_date(query.folder, query.limit, query.offset).await).unwrap();
  389. Ok(warp::reply::json(&result))
  390. }
  391. async fn get_folders_handle() -> Result<impl warp::Reply , warp::Rejection> {
  392. let result: Vec<String> = serde_json::from_str(&*get_folders().await).unwrap();
  393. Ok(warp::reply::json(&result))
  394. }
  395. #[derive(Deserialize)]
  396. struct GetEmailQuery {
  397. id: String,
  398. }
  399. async fn get_email_handle(query: GetEmailQuery) -> Result<impl warp::Reply, warp::Rejection> {
  400. let result: String = get_email(query.id).await;
  401. Ok(warp::reply::html(result))
  402. }
  403. #[derive(Deserialize)]
  404. struct GetThreadQuery {
  405. id: u32,
  406. }
  407. #[derive(Serialize, Deserialize)]
  408. struct GetThreadResponse{
  409. pub id: u32,
  410. pub messages: Vec<String>,
  411. pub subject: String,
  412. pub date: DateTime<Utc>,
  413. pub from_name: String,
  414. pub from_address: String,
  415. pub to: Vec<MailAddress>,
  416. }
  417. impl GetThreadResponse{
  418. pub fn new() -> Self{
  419. GetThreadResponse{
  420. id: 0,
  421. messages: vec![],
  422. subject: "".to_string(),
  423. date: Default::default(),
  424. from_name: "".to_string(),
  425. from_address: "".to_string(),
  426. to: vec![],
  427. }
  428. }
  429. }
  430. async fn get_thread_handle(query: GetThreadQuery) -> Result<impl warp::Reply, warp::Rejection> {
  431. let result: GetThreadResponse = serde_json::from_str(&*get_thread(query.id).await).unwrap();
  432. Ok(warp::reply::json(&result))
  433. }
  434. #[derive(Deserialize)]
  435. struct GetThreadMessagesQuery {
  436. id: u32,
  437. }
  438. async fn get_thread_messages_handle(query: GetThreadMessagesQuery) -> Result<impl warp::Reply, warp::Rejection> {
  439. let result: Vec<SerializableMessage> = serde_json::from_str(&*get_thread_messages(query.id).await).unwrap();
  440. Ok(warp::reply::json(&result))
  441. }
  442. #[derive(Deserialize)]
  443. struct CreateFolderRequest {
  444. name: String,
  445. }
  446. async fn create_folder_handle(data: CreateFolderRequest) -> Result<impl warp::Reply, warp::Rejection> {
  447. match create_folder_lar(data.name.clone()).await {
  448. Ok(_) => Ok(warp::reply::with_status(warp::reply::json(&"OK"), StatusCode::OK)),
  449. Err(_) => {
  450. let error_response = ErrorResponse {
  451. error: format!("Cannot create folder {}", data.name),
  452. };
  453. Ok(warp::reply::with_status(warp::reply::json(&error_response), StatusCode::BAD_REQUEST))
  454. }
  455. }
  456. }
  457. #[derive(Deserialize)]
  458. struct DeleteFolderRequest {
  459. name: String,
  460. }
  461. async fn delete_folder_handle(data: DeleteFolderRequest) -> Result<impl warp::Reply, warp::Rejection> {
  462. match delete_folder_lar(data.name.clone()).await {
  463. Ok(_) => Ok(warp::reply::with_status(warp::reply::json(&"OK"), StatusCode::OK)),
  464. Err(_) => {
  465. let error_response = ErrorResponse {
  466. error: format!("Cannot delete folder {}", data.name),
  467. };
  468. Ok(warp::reply::with_status(warp::reply::json(&error_response), StatusCode::BAD_REQUEST))
  469. }
  470. }
  471. }
  472. #[derive(Deserialize)]
  473. struct RenameFolderRequest {
  474. old_name: String,
  475. new_name: String,
  476. }
  477. async fn rename_folder_handle(data: RenameFolderRequest) -> Result<impl warp::Reply, warp::Rejection> {
  478. match rename_folder_lar(data.old_name.clone(), data.new_name.clone()).await {
  479. Ok(_) => Ok(warp::reply::with_status(warp::reply::json(&"OK"), StatusCode::OK)),
  480. Err(_) => {
  481. let error_response = ErrorResponse {
  482. error: format!("Cannot rename folder from {} to {}", data.old_name, data.new_name),
  483. };
  484. Ok(warp::reply::with_status(warp::reply::json(&error_response), StatusCode::BAD_REQUEST))
  485. }
  486. }
  487. }
  488. #[derive(Deserialize)]
  489. struct SearchQuery {
  490. list: String,
  491. limit: i32,
  492. offset: i32,
  493. query: String
  494. }
  495. async fn search_handle(query: SearchQuery) -> Result<impl warp::Reply, warp::Rejection> {
  496. match SearchSonic::search_document("emails", &*query.list, &*query.query, query.limit, query.offset){
  497. Ok(result) => Ok(warp::reply::json(&result)),
  498. Err(_) => Ok(warp::reply::json(&Vec::<String>::new()))
  499. }
  500. }
  501. #[derive(Deserialize, Serialize)]
  502. struct IsLoggedInResponse{
  503. is_logged_in: bool
  504. }
  505. async fn is_logged_in_handle() -> Result<impl warp::Reply, warp::Rejection>{
  506. Ok(warp::reply::json(&is_logged_in().await))
  507. }
  508. #[derive(Deserialize)]
  509. struct LogInRequest {
  510. email: String,
  511. password: String
  512. }
  513. async fn log_in_handle(query: LogInRequest) -> Result<impl warp::Reply, warp::Rejection>{
  514. if log_in(query.email, query.password).await {
  515. Ok(warp::reply::with_status("Successful log in", StatusCode::OK))
  516. }else{
  517. Ok(warp::reply::with_status("Unable to log in", StatusCode::BAD_REQUEST))
  518. }
  519. }
  520. #[derive(Debug)]
  521. struct NoClientConnected;
  522. impl warp::reject::Reject for NoClientConnected {}
  523. impl warp::reply::Reply for NoClientConnected {
  524. fn into_response(self) -> warp::reply::Response {
  525. warp::reply::with_status(
  526. "No client connected".to_string(),
  527. warp::http::StatusCode::NOT_FOUND,
  528. )
  529. .into_response()
  530. }
  531. }
  532. async fn disconnect_handle() -> Result<impl warp::Reply, warp::Rejection> {
  533. // Lock the client storage
  534. let mut client_guard = CLIENT.lock().unwrap();
  535. // Check if a client is connected
  536. if let Some(client) = client_guard.take() {
  537. // Send a close message to the client
  538. client.send(Message::close()).ok();
  539. Ok(warp::reply::with_status(
  540. "Client disconnected",
  541. warp::http::StatusCode::OK,
  542. ))
  543. } else {
  544. Err(warp::reject::custom(NoClientConnected))
  545. }
  546. }