use std::collections::HashSet; use std::fs; use crate::config::Config; use crate::indexes::{Indexes, SerializableMessage, SerializableThread}; use serde::{Deserialize, Serialize}; use std::fs::{File, metadata}; use std::future::Future; use std::io::{BufReader, Read}; use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex}; use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt}; use warp::Filter; use warp::fs::file; use warp::http::{response, StatusCode}; use crate::{create_folder_lar, delete_folder_lar, IMAP_TASK_STORAGE, rename_folder_lar, start_checkin_for_all_updates}; use crate::imap::{connect_to_imap, get_session}; use crate::models::MailAddress; use crate::sonic::SearchSonic; use crate::util::read_and_decompress_file; use tokio::sync::mpsc::UnboundedSender; use warp::ws::{Message, WebSocket}; use once_cell::sync::Lazy; type Client = Arc>>>; static CLIENT: Lazy = Lazy::new(|| { Arc::new(Mutex::new(None)) }); /// folders -> GET, returns list of all mailboxes /// sorted_threads_by_date -> GET, returns hashmap with date: list of thread ids sorted by date /// get_thread -> GET, returns all information about one thread /// get_thread_messages -> GET, returns a list of SerializableMessage in the thread /// email -> GET, returns html for the message /// search -> GET, returns a list of message ids where search found matches /// create_folder -> POST, creates a new mailbox /// rename_folder -> POST, renames a mailbox /// delete_folder -> POST, deletes a mailbox pub async fn run_api() { let cors = warp::cors() .allow_any_origin() .allow_methods(vec!["GET", "POST"]) .allow_headers(vec![warp::http::header::CONTENT_TYPE]); let routes = warp::path("folders") .and(warp::get()) .and_then(get_folders_handle) .or(warp::path("sorted_threads_by_date") .and(warp::get()) .and(warp::query::()) .and_then(sorted_threads_by_date_handle)) .or(warp::path("get_thread") .and(warp::get()) .and(warp::query::()) .and_then(get_thread_handle)) .or(warp::path("get_thread_messages") .and(warp::get()) .and(warp::query::()) .and_then(get_thread_messages_handle)) .or(warp::path("email") .and(warp::get()) .and(warp::query::()) .and_then(get_email_handle)) .or(warp::path("get_attachments_info") .and(warp::get()) .and(warp::query::()) .and_then(get_attachments_info_handle)) .or(warp::path("get_attachment") .and(warp::get()) .and(warp::query::()) .and_then(get_attachment_handle)) .or(warp::path("search") .and(warp::get()) .and(warp::query::()) .and_then(search_handle)) .or(warp::path("create_folder") .and(warp::post()) .and(warp::body::json()) .and_then(create_folder_handle)) .or(warp::path("rename_folder") .and(warp::post()) .and(warp::body::json()) .and_then(rename_folder_handle)) .or(warp::path("delete_folder") .and(warp::post()) .and(warp::body::json()) .and_then(delete_folder_handle)) .or(warp::path("is_logged_in") .and(warp::get()) .and_then(is_logged_in_handle)) .or(warp::path("log_in") .and(warp::post()) .and(warp::body::json()) .and_then(log_in_handle)) .or(warp::path("connect") .and(warp::ws()) .map(|ws: warp::ws::Ws| { ws.on_upgrade(|socket| ws_handler(socket)) })) .or(warp::path("disconnect") .and(warp::post()) .and_then(disconnect_handle)) .with(cors); let ip: IpAddr = IpAddr::from_str(&*Config::global().api_addr.clone()).expect("Invalid API IP address"); let addr = SocketAddr::new(ip, Config::global().api_port.clone() as u16); warp::serve(routes).run(addr).await; } async fn ws_handler(ws: WebSocket) { let (mut ws_tx, mut ws_rx) = ws.split(); let (client_tx, mut client_rx) = tokio::sync::mpsc::unbounded_channel(); // Overwrite the existing client with the new one { let mut client_guard = CLIENT.lock().unwrap(); *client_guard = Some(client_tx); } // Task to receive messages from the client (if needed) let rx_task = tokio::spawn(async move { while let Some(result) = ws_rx.next().await { match result { Ok(msg) => { // Handle client messages here if necessary } Err(e) => { eprintln!("WebSocket error: {}", e); break; } } } }); // Task to send messages to the client let tx_task = tokio::spawn(async move { while let Some(result) = client_rx.recv().await { if ws_tx.send(result).await.is_err() { // Client disconnected break; } } }); // Wait for either task to complete tokio::select! { _ = rx_task => {}, _ = tx_task => {}, } // Remove the client from storage when done { let mut client_guard = CLIENT.lock().unwrap(); *client_guard = None; } } pub async fn broadcast_message(message: String) { let msg = Message::text(message); let client_guard = CLIENT.lock().unwrap(); if let Some(tx) = &*client_guard { let _ = tx.send(msg); } } // functions pub async fn get_attachment(folder: String, id: String, name: String) -> GetAttachmentResponse{ let attachment_path = Config::global().out_dir.clone() .join(folder.clone()) .join(".attachments") .join(id) .join(name.clone()); let file_result = File::open(attachment_path); let mut file = match file_result { Ok(f) => f, Err(_) => {return GetAttachmentResponse{ name, data: vec![] }} }; let mut buffer = Vec::new(); let read_result = file.read_to_end(&mut buffer); match read_result { Ok(_) => GetAttachmentResponse{ name, data: buffer }, Err(e) => GetAttachmentResponse{ name, data: vec![] }, } } pub async fn get_attachments_info(folder: String, id: String) -> String{ let mut attachments_info: Vec = Vec::new(); let attachments_path = Config::global().out_dir.clone() .join(folder.clone()) .join(".attachments") .join(id); let entries = fs::read_dir(attachments_path); match entries { Ok(entries) => { for entry in entries { match entry { Ok(entry) => { let path = entry.path(); if path.is_file() { match metadata(&path) { Ok(meta) => { let file_size = meta.len(); attachments_info.push(AttachmentInfo{ name: entry.file_name().to_str().unwrap().to_string(), size: file_size, path }) } Err(_) => {} } } } Err(_) => {} } } } Err(_) => {} } serde_json::to_string(&attachments_info).unwrap() } pub async fn get_folders() -> String { broadcast_message("folders".to_string()).await; serde_json::to_string(&Indexes::get_local_mailboxes()).unwrap() } pub async fn get_email(id: String) -> String { let messages = Indexes::get_messages().unwrap(); let message = match messages.iter().find(|message| message.id == id){ None => {return String::new()} Some(message) => {message} }; let abs_path = Config::global().out_dir.clone() .join(message.list.clone()) .join("messages") .join(message.hash.clone() + ".tar.gz"); let mut content = match read_and_decompress_file(abs_path, message.hash.clone()) { Ok(content) => { String::from_utf8_lossy(&content).to_string() } Err(_) => String::new() }; content } pub async fn get_thread_messages(id: u32) -> String { let threads = Indexes::get_threads().unwrap(); let messages = Indexes::get_messages().unwrap(); if let Some(thread) = threads.into_iter().find(|thread| thread.id == id) { let result_messages: Vec = thread.messages.into_iter() .filter_map(|message_id| { messages.iter().find(|message| message.id == message_id).cloned() }) .collect(); serde_json::to_string(&result_messages).unwrap() } else { serde_json::to_string(&Vec::::new()).unwrap() } } pub async fn get_thread(id: u32) -> String { let threads = Indexes::get_threads().unwrap(); let messages = Indexes::get_messages().unwrap(); let mut get_thread_response: GetThreadResponse = GetThreadResponse::new(); if let Some(thread) = threads.into_iter().find(|thread| thread.id == id) { get_thread_response.id = thread.id.clone(); get_thread_response.messages = thread.messages.clone(); if let Some(first_message_id) = thread.messages.first(){ if let Some(first_message) = messages.clone().into_iter().find(|message| message.id == *first_message_id){ get_thread_response.subject = first_message.subject.clone(); } } if let Some(last_message_id) = thread.messages.last(){ if let Some(last_message) = messages.clone().into_iter().find(|message| message.id == *last_message_id){ get_thread_response.date = last_message.date.clone(); get_thread_response.from_name = last_message.name.clone(); get_thread_response.from_address = last_message.from.clone(); get_thread_response.to = last_message.to.clone(); } } serde_json::to_string(&get_thread_response).unwrap() } else { serde_json::to_string(&GetThreadResponse::new()).unwrap() } } async fn get_threads(folder: String, limit: usize, offset: usize, file_name: String) -> String { let path = Config::global() .out_dir .clone() .join(folder) .join(file_name); let file = match File::open(path) { Ok(file) => file, Err(_) => return serde_json::to_string(&Vec::<(String, Vec)>::new()).unwrap(), }; let reader = BufReader::new(file); let messages: Vec<(String, Vec)> = match serde_json::from_reader(reader) { Ok(messages) => messages, Err(_) => return serde_json::to_string(&Vec::<(String, Vec)>::new()).unwrap(), }; // filtering elements with limit and offset let mut result = Vec::new(); let mut current_offset = offset; let mut remaining_limit = limit; for (msg, data) in messages.into_iter() { // Skip the inner data until we reach the required offset if current_offset >= data.len() { current_offset -= data.len(); continue; } // Take the required elements from the current vector let paginated_data: Vec = data.into_iter() .skip(current_offset) .take(remaining_limit) .collect(); // Push the result for the current message if !paginated_data.is_empty() { result.push((msg, paginated_data)); } // Update the remaining limit and reset the offset for the next iterations remaining_limit -= result.last().unwrap().1.len(); current_offset = 0; // If we've collected enough elements, break out of the loop if remaining_limit == 0 { break; } } serde_json::to_string(&result).unwrap() } pub async fn get_sorted_threads_by_date(folder: String, limit: usize, offset: usize) -> String { get_threads(folder, limit, offset, "date.json".to_string()).await } pub async fn is_logged_in() -> String { let response = if (*Config::global().username.lock().unwrap() == "" || *Config::global().password.lock().unwrap() == "") { serde_json::to_string(&IsLoggedInResponse { is_logged_in: false }) }else{ serde_json::to_string(&IsLoggedInResponse { is_logged_in: true }) }; response.unwrap_or_else(|_| serde_json::to_string(&IsLoggedInResponse { is_logged_in: true }).unwrap()) } pub async fn log_in(email: String, password: String) -> bool { Config::global().set_username(email.clone()); Config::global().set_password(password.clone()); match get_session().await{ Ok(_) => { match Config::global().update_credentials(&email, &password){ Ok(_) => { let mut idle_task_storage = IMAP_TASK_STORAGE.lock().unwrap(); idle_task_storage.start_downloading(); drop(idle_task_storage); start_checkin_for_all_updates(); true }, Err(_) => false } } Err(_) => false } } // Handlers #[derive(Deserialize)] struct GetAttachmentQuery { folder: String, id: String, name: String } #[derive(Deserialize, Serialize)] struct GetAttachmentResponse{ pub name: String, pub data: Vec } async fn get_attachment_handle(query: GetAttachmentQuery) -> Result{ let result: GetAttachmentResponse = get_attachment(query.folder, query.id, query.name).await; Ok(warp::reply::json(&result)) } #[derive(Deserialize)] struct GetAttachmentsInfoQuery { folder: String, id: String } #[derive(Deserialize, Serialize)] struct AttachmentInfo{ pub name: String, pub size: u64, pub path: PathBuf } async fn get_attachments_info_handle(query: GetAttachmentsInfoQuery) -> Result{ let result: Vec = serde_json::from_str(&*get_attachments_info(query.folder, query.id).await).unwrap(); Ok(warp::reply::json(&result)) } #[derive(Deserialize, Serialize)] struct ErrorResponse { error: String, } #[derive(Deserialize)] struct GetSortedThreadsByDateQuery { folder: String, limit: usize, offset: usize } async fn sorted_threads_by_date_handle(query: GetSortedThreadsByDateQuery) -> Result { let result: Vec<(String, Vec)> = serde_json::from_str(&*get_sorted_threads_by_date(query.folder, query.limit, query.offset).await).unwrap(); Ok(warp::reply::json(&result)) } async fn get_folders_handle() -> Result { let result: Vec = serde_json::from_str(&*get_folders().await).unwrap(); Ok(warp::reply::json(&result)) } #[derive(Deserialize)] struct GetEmailQuery { id: String, } async fn get_email_handle(query: GetEmailQuery) -> Result { let result: String = get_email(query.id).await; Ok(warp::reply::html(result)) } #[derive(Deserialize)] struct GetThreadQuery { id: u32, } #[derive(Serialize, Deserialize)] struct GetThreadResponse{ pub id: u32, pub messages: Vec, pub subject: String, pub date: DateTime, pub from_name: String, pub from_address: String, pub to: Vec, } impl GetThreadResponse{ pub fn new() -> Self{ GetThreadResponse{ id: 0, messages: vec![], subject: "".to_string(), date: Default::default(), from_name: "".to_string(), from_address: "".to_string(), to: vec![], } } } async fn get_thread_handle(query: GetThreadQuery) -> Result { let result: GetThreadResponse = serde_json::from_str(&*get_thread(query.id).await).unwrap(); Ok(warp::reply::json(&result)) } #[derive(Deserialize)] struct GetThreadMessagesQuery { id: u32, } async fn get_thread_messages_handle(query: GetThreadMessagesQuery) -> Result { let result: Vec = serde_json::from_str(&*get_thread_messages(query.id).await).unwrap(); Ok(warp::reply::json(&result)) } #[derive(Deserialize)] struct CreateFolderRequest { name: String, } async fn create_folder_handle(data: CreateFolderRequest) -> Result { match create_folder_lar(data.name.clone()).await { Ok(_) => Ok(warp::reply::with_status(warp::reply::json(&"OK"), StatusCode::OK)), Err(_) => { let error_response = ErrorResponse { error: format!("Cannot create folder {}", data.name), }; Ok(warp::reply::with_status(warp::reply::json(&error_response), StatusCode::BAD_REQUEST)) } } } #[derive(Deserialize)] struct DeleteFolderRequest { name: String, } async fn delete_folder_handle(data: DeleteFolderRequest) -> Result { match delete_folder_lar(data.name.clone()).await { Ok(_) => Ok(warp::reply::with_status(warp::reply::json(&"OK"), StatusCode::OK)), Err(_) => { let error_response = ErrorResponse { error: format!("Cannot delete folder {}", data.name), }; Ok(warp::reply::with_status(warp::reply::json(&error_response), StatusCode::BAD_REQUEST)) } } } #[derive(Deserialize)] struct RenameFolderRequest { old_name: String, new_name: String, } async fn rename_folder_handle(data: RenameFolderRequest) -> Result { match rename_folder_lar(data.old_name.clone(), data.new_name.clone()).await { Ok(_) => Ok(warp::reply::with_status(warp::reply::json(&"OK"), StatusCode::OK)), Err(_) => { let error_response = ErrorResponse { error: format!("Cannot rename folder from {} to {}", data.old_name, data.new_name), }; Ok(warp::reply::with_status(warp::reply::json(&error_response), StatusCode::BAD_REQUEST)) } } } #[derive(Deserialize)] struct SearchQuery { list: String, limit: i32, offset: i32, query: String } async fn search_handle(query: SearchQuery) -> Result { match SearchSonic::search_document("emails", &*query.list, &*query.query, query.limit, query.offset){ Ok(result) => Ok(warp::reply::json(&result)), Err(_) => Ok(warp::reply::json(&Vec::::new())) } } #[derive(Deserialize, Serialize)] struct IsLoggedInResponse{ is_logged_in: bool } async fn is_logged_in_handle() -> Result{ Ok(warp::reply::json(&is_logged_in().await)) } #[derive(Deserialize)] struct LogInRequest { email: String, password: String } async fn log_in_handle(query: LogInRequest) -> Result{ if log_in(query.email, query.password).await { Ok(warp::reply::with_status("Successful log in", StatusCode::OK)) }else{ Ok(warp::reply::with_status("Unable to log in", StatusCode::BAD_REQUEST)) } } #[derive(Debug)] struct NoClientConnected; impl warp::reject::Reject for NoClientConnected {} impl warp::reply::Reply for NoClientConnected { fn into_response(self) -> warp::reply::Response { warp::reply::with_status( "No client connected".to_string(), warp::http::StatusCode::NOT_FOUND, ) .into_response() } } async fn disconnect_handle() -> Result { // Lock the client storage let mut client_guard = CLIENT.lock().unwrap(); // Check if a client is connected if let Some(client) = client_guard.take() { // Send a close message to the client client.send(Message::close()).ok(); Ok(warp::reply::with_status( "Client disconnected", warp::http::StatusCode::OK, )) } else { Err(warp::reject::custom(NoClientConnected)) } }