123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623 |
- use std::collections::HashSet;
- use std::fs;
- use crate::config::Config;
- use crate::indexes::{Indexes, SerializableMessage, SerializableThread};
- use serde::{Deserialize, Serialize};
- use std::fs::{File, metadata};
- use std::future::Future;
- use std::io::{BufReader, Read};
- use std::net::{IpAddr, SocketAddr};
- use std::path::PathBuf;
- use std::str::FromStr;
- use std::sync::{Arc, Mutex};
- use chrono::{DateTime, Utc};
- use futures::{SinkExt, StreamExt};
- use warp::Filter;
- use warp::fs::file;
- use warp::http::{response, StatusCode};
- use crate::{create_folder_lar, delete_folder_lar, IMAP_TASK_STORAGE, rename_folder_lar, start_checkin_for_all_updates};
- use crate::imap::{connect_to_imap, get_session};
- use crate::models::MailAddress;
- use crate::sonic::SearchSonic;
- use crate::util::read_and_decompress_file;
- use tokio::sync::mpsc::UnboundedSender;
- use warp::ws::{Message, WebSocket};
- use once_cell::sync::Lazy;
- type Client = Arc<Mutex<Option<UnboundedSender<Message>>>>;
- static CLIENT: Lazy<Client> = Lazy::new(|| {
- Arc::new(Mutex::new(None))
- });
- /// folders -> GET, returns list of all mailboxes
- /// sorted_threads_by_date -> GET, returns hashmap with date: list of thread ids sorted by date
- /// get_thread -> GET, returns all information about one thread
- /// get_thread_messages -> GET, returns a list of SerializableMessage in the thread
- /// email -> GET, returns html for the message
- /// search -> GET, returns a list of message ids where search found matches
- /// create_folder -> POST, creates a new mailbox
- /// rename_folder -> POST, renames a mailbox
- /// delete_folder -> POST, deletes a mailbox
- pub async fn run_api() {
- let cors = warp::cors()
- .allow_any_origin()
- .allow_methods(vec!["GET", "POST"])
- .allow_headers(vec![warp::http::header::CONTENT_TYPE]);
- let routes = warp::path("folders")
- .and(warp::get())
- .and_then(get_folders_handle)
- .or(warp::path("sorted_threads_by_date")
- .and(warp::get())
- .and(warp::query::<GetSortedThreadsByDateQuery>())
- .and_then(sorted_threads_by_date_handle))
- .or(warp::path("get_thread")
- .and(warp::get())
- .and(warp::query::<GetThreadQuery>())
- .and_then(get_thread_handle))
- .or(warp::path("get_thread_messages")
- .and(warp::get())
- .and(warp::query::<GetThreadMessagesQuery>())
- .and_then(get_thread_messages_handle))
- .or(warp::path("email")
- .and(warp::get())
- .and(warp::query::<GetEmailQuery>())
- .and_then(get_email_handle))
- .or(warp::path("get_attachments_info")
- .and(warp::get())
- .and(warp::query::<GetAttachmentsInfoQuery>())
- .and_then(get_attachments_info_handle))
- .or(warp::path("get_attachment")
- .and(warp::get())
- .and(warp::query::<GetAttachmentQuery>())
- .and_then(get_attachment_handle))
- .or(warp::path("search")
- .and(warp::get())
- .and(warp::query::<SearchQuery>())
- .and_then(search_handle))
- .or(warp::path("create_folder")
- .and(warp::post())
- .and(warp::body::json())
- .and_then(create_folder_handle))
- .or(warp::path("rename_folder")
- .and(warp::post())
- .and(warp::body::json())
- .and_then(rename_folder_handle))
- .or(warp::path("delete_folder")
- .and(warp::post())
- .and(warp::body::json())
- .and_then(delete_folder_handle))
- .or(warp::path("is_logged_in")
- .and(warp::get())
- .and_then(is_logged_in_handle))
- .or(warp::path("log_in")
- .and(warp::post())
- .and(warp::body::json())
- .and_then(log_in_handle))
- .or(warp::path("connect")
- .and(warp::ws())
- .map(|ws: warp::ws::Ws| {
- ws.on_upgrade(|socket| ws_handler(socket))
- }))
- .or(warp::path("disconnect")
- .and(warp::post())
- .and_then(disconnect_handle))
- .with(cors);
- let ip: IpAddr = IpAddr::from_str(&*Config::global().api_addr.clone()).expect("Invalid API IP address");
- let addr = SocketAddr::new(ip, Config::global().api_port.clone() as u16);
- warp::serve(routes).run(addr).await;
- }
- async fn ws_handler(ws: WebSocket) {
- let (mut ws_tx, mut ws_rx) = ws.split();
- let (client_tx, mut client_rx) = tokio::sync::mpsc::unbounded_channel();
- // Overwrite the existing client with the new one
- {
- let mut client_guard = CLIENT.lock().unwrap();
- *client_guard = Some(client_tx);
- }
- // Task to receive messages from the client (if needed)
- let rx_task = tokio::spawn(async move {
- while let Some(result) = ws_rx.next().await {
- match result {
- Ok(msg) => {
- // Handle client messages here if necessary
- }
- Err(e) => {
- eprintln!("WebSocket error: {}", e);
- break;
- }
- }
- }
- });
- // Task to send messages to the client
- let tx_task = tokio::spawn(async move {
- while let Some(result) = client_rx.recv().await {
- if ws_tx.send(result).await.is_err() {
- // Client disconnected
- break;
- }
- }
- });
- // Wait for either task to complete
- tokio::select! {
- _ = rx_task => {},
- _ = tx_task => {},
- }
- // Remove the client from storage when done
- {
- let mut client_guard = CLIENT.lock().unwrap();
- *client_guard = None;
- }
- }
- pub async fn broadcast_message(message: String) {
- let msg = Message::text(message);
- let client_guard = CLIENT.lock().unwrap();
- if let Some(tx) = &*client_guard {
- let _ = tx.send(msg);
- }
- }
- // functions
- pub async fn get_attachment(folder: String, id: String, name: String) -> GetAttachmentResponse{
- let attachment_path = Config::global().out_dir.clone()
- .join(folder.clone())
- .join(".attachments")
- .join(id)
- .join(name.clone());
- let file_result = File::open(attachment_path);
- let mut file = match file_result {
- Ok(f) => f,
- Err(_) => {return GetAttachmentResponse{ name, data: vec![] }}
- };
- let mut buffer = Vec::new();
- let read_result = file.read_to_end(&mut buffer);
- match read_result {
- Ok(_) => GetAttachmentResponse{ name, data: buffer },
- Err(e) => GetAttachmentResponse{ name, data: vec![] },
- }
- }
- pub async fn get_attachments_info(folder: String, id: String) -> String{
- let mut attachments_info: Vec<AttachmentInfo> = Vec::new();
- let attachments_path = Config::global().out_dir.clone()
- .join(folder.clone())
- .join(".attachments")
- .join(id);
-
- let entries = fs::read_dir(attachments_path);
- match entries {
- Ok(entries) => {
- for entry in entries {
- match entry {
- Ok(entry) => {
- let path = entry.path();
- if path.is_file() {
- match metadata(&path) {
- Ok(meta) => {
- let file_size = meta.len();
- attachments_info.push(AttachmentInfo{
- name: entry.file_name().to_str().unwrap().to_string(),
- size: file_size,
- path
- })
- }
- Err(_) => {}
- }
- }
- }
- Err(_) => {}
- }
- }
- }
- Err(_) => {}
- }
-
- serde_json::to_string(&attachments_info).unwrap()
- }
- pub async fn get_folders() -> String {
- broadcast_message("folders".to_string()).await;
- serde_json::to_string(&Indexes::get_local_mailboxes()).unwrap()
- }
- pub async fn get_email(id: String) -> String {
- let messages = Indexes::get_messages().unwrap();
-
- let message = match messages.iter().find(|message| message.id == id){
- None => {return String::new()}
- Some(message) => {message}
- };
-
- let abs_path = Config::global().out_dir.clone()
- .join(message.list.clone())
- .join("messages")
- .join(message.hash.clone() + ".tar.gz");
- let mut content = match read_and_decompress_file(abs_path, message.hash.clone()) {
- Ok(content) => {
- String::from_utf8_lossy(&content).to_string()
- }
- Err(_) => String::new()
- };
-
- content
- }
- pub async fn get_thread_messages(id: u32) -> String {
- let threads = Indexes::get_threads().unwrap();
- let messages = Indexes::get_messages().unwrap();
- if let Some(thread) = threads.into_iter().find(|thread| thread.id == id) {
- let result_messages: Vec<SerializableMessage> = thread.messages.into_iter()
- .filter_map(|message_id| {
- messages.iter().find(|message| message.id == message_id).cloned()
- })
- .collect();
- serde_json::to_string(&result_messages).unwrap()
- } else {
- serde_json::to_string(&Vec::<SerializableMessage>::new()).unwrap()
- }
- }
- pub async fn get_thread(id: u32) -> String {
- let threads = Indexes::get_threads().unwrap();
- let messages = Indexes::get_messages().unwrap();
- let mut get_thread_response: GetThreadResponse = GetThreadResponse::new();
-
- if let Some(thread) = threads.into_iter().find(|thread| thread.id == id) {
- get_thread_response.id = thread.id.clone();
- get_thread_response.messages = thread.messages.clone();
- if let Some(first_message_id) = thread.messages.first(){
- if let Some(first_message) = messages.clone().into_iter().find(|message| message.id == *first_message_id){
- get_thread_response.subject = first_message.subject.clone();
- }
- }
- if let Some(last_message_id) = thread.messages.last(){
- if let Some(last_message) = messages.clone().into_iter().find(|message| message.id == *last_message_id){
- get_thread_response.date = last_message.date.clone();
- get_thread_response.from_name = last_message.name.clone();
- get_thread_response.from_address = last_message.from.clone();
- get_thread_response.to = last_message.to.clone();
- }
- }
- serde_json::to_string(&get_thread_response).unwrap()
- } else {
- serde_json::to_string(&GetThreadResponse::new()).unwrap()
- }
- }
- async fn get_threads(folder: String, limit: usize, offset: usize, file_name: String) -> String {
- let path = Config::global()
- .out_dir
- .clone()
- .join(folder)
- .join(file_name);
- let file = match File::open(path) {
- Ok(file) => file,
- Err(_) => return serde_json::to_string(&Vec::<(String, Vec<u32>)>::new()).unwrap(),
- };
- let reader = BufReader::new(file);
- let messages: Vec<(String, Vec<u32>)> = match serde_json::from_reader(reader)
- {
- Ok(messages) => messages,
- Err(_) => return serde_json::to_string(&Vec::<(String, Vec<u32>)>::new()).unwrap(),
- };
-
- // filtering elements with limit and offset
- let mut result = Vec::new();
- let mut current_offset = offset;
- let mut remaining_limit = limit;
- for (msg, data) in messages.into_iter() {
- // Skip the inner data until we reach the required offset
- if current_offset >= data.len() {
- current_offset -= data.len();
- continue;
- }
- // Take the required elements from the current vector
- let paginated_data: Vec<u32> = data.into_iter()
- .skip(current_offset)
- .take(remaining_limit)
- .collect();
- // Push the result for the current message
- if !paginated_data.is_empty() {
- result.push((msg, paginated_data));
- }
- // Update the remaining limit and reset the offset for the next iterations
- remaining_limit -= result.last().unwrap().1.len();
- current_offset = 0;
- // If we've collected enough elements, break out of the loop
- if remaining_limit == 0 {
- break;
- }
- }
- serde_json::to_string(&result).unwrap()
- }
- pub async fn get_sorted_threads_by_date(folder: String, limit: usize, offset: usize) -> String {
- get_threads(folder, limit, offset, "date.json".to_string()).await
- }
- pub async fn is_logged_in() -> String {
- let response = if (*Config::global().username.lock().unwrap() == "" || *Config::global().password.lock().unwrap() == "") {
- serde_json::to_string(&IsLoggedInResponse { is_logged_in: false })
- }else{
- serde_json::to_string(&IsLoggedInResponse { is_logged_in: true })
- };
- response.unwrap_or_else(|_| serde_json::to_string(&IsLoggedInResponse { is_logged_in: true }).unwrap())
- }
- pub async fn log_in(email: String, password: String) -> bool {
- Config::global().set_username(email.clone());
- Config::global().set_password(password.clone());
-
- match get_session().await{
- Ok(_) => {
- match Config::global().update_credentials(&email, &password){
- Ok(_) => {
- let mut idle_task_storage = IMAP_TASK_STORAGE.lock().unwrap();
- idle_task_storage.start_downloading();
- drop(idle_task_storage);
- start_checkin_for_all_updates();
- true
- },
- Err(_) => false
- }
- }
- Err(_) => false
- }
- }
- // Handlers
- #[derive(Deserialize)]
- struct GetAttachmentQuery {
- folder: String,
- id: String,
- name: String
- }
- #[derive(Deserialize, Serialize)]
- struct GetAttachmentResponse{
- pub name: String,
- pub data: Vec<u8>
- }
- async fn get_attachment_handle(query: GetAttachmentQuery) -> Result<impl warp::Reply, warp::Rejection>{
- let result: GetAttachmentResponse = get_attachment(query.folder, query.id, query.name).await;
- Ok(warp::reply::json(&result))
- }
- #[derive(Deserialize)]
- struct GetAttachmentsInfoQuery {
- folder: String,
- id: String
- }
- #[derive(Deserialize, Serialize)]
- struct AttachmentInfo{
- pub name: String,
- pub size: u64,
- pub path: PathBuf
- }
- async fn get_attachments_info_handle(query: GetAttachmentsInfoQuery) -> Result<impl warp::Reply, warp::Rejection>{
- let result: Vec<AttachmentInfo> = serde_json::from_str(&*get_attachments_info(query.folder, query.id).await).unwrap();
- Ok(warp::reply::json(&result))
- }
- #[derive(Deserialize, Serialize)]
- struct ErrorResponse {
- error: String,
- }
- #[derive(Deserialize)]
- struct GetSortedThreadsByDateQuery {
- folder: String,
- limit: usize,
- offset: usize
- }
- async fn sorted_threads_by_date_handle(query: GetSortedThreadsByDateQuery) -> Result<impl warp::Reply, warp::Rejection> {
- let result: Vec<(String, Vec<u32>)> = serde_json::from_str(&*get_sorted_threads_by_date(query.folder, query.limit, query.offset).await).unwrap();
- Ok(warp::reply::json(&result))
- }
- async fn get_folders_handle() -> Result<impl warp::Reply , warp::Rejection> {
- let result: Vec<String> = serde_json::from_str(&*get_folders().await).unwrap();
- Ok(warp::reply::json(&result))
- }
- #[derive(Deserialize)]
- struct GetEmailQuery {
- id: String,
- }
- async fn get_email_handle(query: GetEmailQuery) -> Result<impl warp::Reply, warp::Rejection> {
- let result: String = get_email(query.id).await;
- Ok(warp::reply::html(result))
- }
- #[derive(Deserialize)]
- struct GetThreadQuery {
- id: u32,
- }
- #[derive(Serialize, Deserialize)]
- struct GetThreadResponse{
- pub id: u32,
- pub messages: Vec<String>,
- pub subject: String,
- pub date: DateTime<Utc>,
- pub from_name: String,
- pub from_address: String,
- pub to: Vec<MailAddress>,
- }
- impl GetThreadResponse{
- pub fn new() -> Self{
- GetThreadResponse{
- id: 0,
- messages: vec![],
- subject: "".to_string(),
- date: Default::default(),
- from_name: "".to_string(),
- from_address: "".to_string(),
- to: vec![],
- }
- }
- }
- async fn get_thread_handle(query: GetThreadQuery) -> Result<impl warp::Reply, warp::Rejection> {
- let result: GetThreadResponse = serde_json::from_str(&*get_thread(query.id).await).unwrap();
- Ok(warp::reply::json(&result))
- }
- #[derive(Deserialize)]
- struct GetThreadMessagesQuery {
- id: u32,
- }
- async fn get_thread_messages_handle(query: GetThreadMessagesQuery) -> Result<impl warp::Reply, warp::Rejection> {
- let result: Vec<SerializableMessage> = serde_json::from_str(&*get_thread_messages(query.id).await).unwrap();
- Ok(warp::reply::json(&result))
- }
- #[derive(Deserialize)]
- struct CreateFolderRequest {
- name: String,
- }
- async fn create_folder_handle(data: CreateFolderRequest) -> Result<impl warp::Reply, warp::Rejection> {
- match create_folder_lar(data.name.clone()).await {
- Ok(_) => Ok(warp::reply::with_status(warp::reply::json(&"OK"), StatusCode::OK)),
- Err(_) => {
- let error_response = ErrorResponse {
- error: format!("Cannot create folder {}", data.name),
- };
- Ok(warp::reply::with_status(warp::reply::json(&error_response), StatusCode::BAD_REQUEST))
- }
- }
- }
- #[derive(Deserialize)]
- struct DeleteFolderRequest {
- name: String,
- }
- async fn delete_folder_handle(data: DeleteFolderRequest) -> Result<impl warp::Reply, warp::Rejection> {
- match delete_folder_lar(data.name.clone()).await {
- Ok(_) => Ok(warp::reply::with_status(warp::reply::json(&"OK"), StatusCode::OK)),
- Err(_) => {
- let error_response = ErrorResponse {
- error: format!("Cannot delete folder {}", data.name),
- };
- Ok(warp::reply::with_status(warp::reply::json(&error_response), StatusCode::BAD_REQUEST))
- }
- }
- }
- #[derive(Deserialize)]
- struct RenameFolderRequest {
- old_name: String,
- new_name: String,
- }
- async fn rename_folder_handle(data: RenameFolderRequest) -> Result<impl warp::Reply, warp::Rejection> {
- match rename_folder_lar(data.old_name.clone(), data.new_name.clone()).await {
- Ok(_) => Ok(warp::reply::with_status(warp::reply::json(&"OK"), StatusCode::OK)),
- Err(_) => {
- let error_response = ErrorResponse {
- error: format!("Cannot rename folder from {} to {}", data.old_name, data.new_name),
- };
- Ok(warp::reply::with_status(warp::reply::json(&error_response), StatusCode::BAD_REQUEST))
- }
- }
- }
- #[derive(Deserialize)]
- struct SearchQuery {
- list: String,
- limit: i32,
- offset: i32,
- query: String
- }
- async fn search_handle(query: SearchQuery) -> Result<impl warp::Reply, warp::Rejection> {
- match SearchSonic::search_document("emails", &*query.list, &*query.query, query.limit, query.offset){
- Ok(result) => Ok(warp::reply::json(&result)),
- Err(_) => Ok(warp::reply::json(&Vec::<String>::new()))
- }
- }
- #[derive(Deserialize, Serialize)]
- struct IsLoggedInResponse{
- is_logged_in: bool
- }
- async fn is_logged_in_handle() -> Result<impl warp::Reply, warp::Rejection>{
- Ok(warp::reply::json(&is_logged_in().await))
- }
- #[derive(Deserialize)]
- struct LogInRequest {
- email: String,
- password: String
- }
- async fn log_in_handle(query: LogInRequest) -> Result<impl warp::Reply, warp::Rejection>{
- if log_in(query.email, query.password).await {
- Ok(warp::reply::with_status("Successful log in", StatusCode::OK))
- }else{
- Ok(warp::reply::with_status("Unable to log in", StatusCode::BAD_REQUEST))
- }
- }
- #[derive(Debug)]
- struct NoClientConnected;
- impl warp::reject::Reject for NoClientConnected {}
- impl warp::reply::Reply for NoClientConnected {
- fn into_response(self) -> warp::reply::Response {
- warp::reply::with_status(
- "No client connected".to_string(),
- warp::http::StatusCode::NOT_FOUND,
- )
- .into_response()
- }
- }
- async fn disconnect_handle() -> Result<impl warp::Reply, warp::Rejection> {
- // Lock the client storage
- let mut client_guard = CLIENT.lock().unwrap();
- // Check if a client is connected
- if let Some(client) = client_guard.take() {
- // Send a close message to the client
- client.send(Message::close()).ok();
- Ok(warp::reply::with_status(
- "Client disconnected",
- warp::http::StatusCode::OK,
- ))
- } else {
- Err(warp::reject::custom(NoClientConnected))
- }
- }
|