Kaynağa Gözat

monitoring updates for all mailboxes + start/stop monitoring on creation/deletion of new mailboxes

Yurii Sokolovskyi 2 ay önce
ebeveyn
işleme
325c893bbc
4 değiştirilmiş dosya ile 178 ekleme ve 56 silme
  1. 84 24
      src/imap.rs
  2. 20 1
      src/indexes.rs
  3. 73 15
      src/main.rs
  4. 1 16
      src/server.rs

+ 84 - 24
src/imap.rs

@@ -16,20 +16,25 @@ use tokio_rustls::client::TlsStream;
 use tokio_rustls::rustls::{ClientConfig, RootCertStore};
 use tokio_rustls::rustls::pki_types::ServerName;
 use crate::config::Config;
-use crate::{add_email};
+use crate::{add_email, delete_email};
 use tokio::task;
 use tokio::time::sleep;
+use imap_proto::Response;
+use crate::indexes::{Indexes, SerializableMessage};
 
 #[cfg(not(target_os = "wasi"))]
 use async_imap::{Client, Session};
 #[cfg(not(target_os = "wasi"))]
 use async_imap::extensions::idle::IdleResponse::NewData;
-
+#[cfg(not(target_os = "wasi"))]
+use async_imap::types::{Uid, UnsolicitedResponse};
+use tokio::task::JoinHandle;
 #[cfg(target_os = "wasi")]
 use async_imap_wasi::{Client, Session};
 #[cfg(target_os = "wasi")]
 use async_imap_wasi::extensions::idle::IdleResponse::NewData;
-use crate::indexes::Indexes;
+#[cfg(target_os = "wasi")]
+use async_imap_wasi::types::Uid;
 
 /// create TLS connect with the IMAP server
 pub async fn connect_to_imap() -> anyhow::Result<Client<TlsStream<TcpStream>>>{
@@ -149,7 +154,6 @@ pub async fn fetch_and_store_emails(session: &mut Session<TlsStream<TcpStream>>,
                 let mail_file = store(Config::global().maildir.clone().join(list.clone()), uid.clone().to_string(), "new".to_string(), body, "");
                 match mail_file {
                     Ok(file) => {
-                        // TODO convert and persist html
                         // persist to the maildir
                         stored_paths.push((uid.to_string().parse().unwrap(), file.clone()));
                         
@@ -159,7 +163,6 @@ pub async fn fetch_and_store_emails(session: &mut Session<TlsStream<TcpStream>>,
                             Err(_) => {println!("Error adding email from {:?}", file.clone())}
                         };
                         
-                        // TODO adjust indexes
                         Indexes::persist_threads().expect("Unable to persist threads");
                         Indexes::persist_indexes(vec![list.clone()]).expect("Unable to persist indexes");
                     },
@@ -184,7 +187,7 @@ pub async fn fetch_and_store_emails(session: &mut Session<TlsStream<TcpStream>>,
 }
 
 /// read uids that have been already downloaded
-fn read_local_uids(uids_path: PathBuf) -> anyhow::Result<HashMap<String, HashSet<String>>> {
+pub fn read_local_uids(uids_path: PathBuf) -> anyhow::Result<HashMap<String, HashSet<String>>> {
     let file = match File::open(&uids_path) {
         Ok(file) => file,
         Err(_) => return Ok(HashMap::new()), // Propagate other errors
@@ -299,8 +302,53 @@ pub async fn delete_folder(name: String) -> anyhow::Result<()>{
     Ok(())
 }
 
+/// deletes all emails locally that were deleted remotely
+pub async fn remove_deleted_emails(mailbox: String) -> anyhow::Result<u32> {
+    // TODO move session creation to the function
+    let mut client = match connect_to_imap().await {
+        Ok(client) => client,
+        Err(e) => {
+            return Err(anyhow!("Failed to connect to IMAP"));
+        }
+    };
+
+    let session_result = client
+        .login(Config::global().username.clone(), Config::global().password.clone())
+        .await;
+
+    let mut session = match session_result {
+        Ok(session) => session,
+        Err(_) => return Err(anyhow!("Unable to login to IMAP server")),
+    };
+
+    session.select(mailbox.clone()).await?;
+    
+    let uids_path = Config::global().maildir.clone().join(".uids.json");
+    let uids_local = match read_local_uids(uids_path.clone()).unwrap().get(&mailbox){
+        None => HashSet::new(),
+        Some(hash_set) => (*hash_set).clone()
+    };
+    let uids_server = session.uid_search("ALL").await.unwrap();
+
+    let deleted_uids: Vec<u32> = uids_local
+        .into_iter()
+        .filter_map(|s| s.parse::<Uid>().ok())
+        .filter(|uid| !uids_server.contains(uid))
+        .collect();
+
+    // delete email from indexes
+    for deleted_uid in &deleted_uids {
+        match Indexes::find_by_uid(&Indexes::get_messages()?, *deleted_uid) {
+            None => {}
+            Some(message) => delete_email(message.id).await.unwrap()
+        }
+    }
+    
+    Ok(deleted_uids.len() as u32)
+}
+
 /// run a async task to monitor any changes in the mailbox 
-pub async fn check_for_updates(mailbox: String) -> anyhow::Result<()> {
+pub fn check_for_updates(mailbox: String) -> JoinHandle<Result<(), anyhow::Error>> {
     task::spawn(async move {
         let mut client = match connect_to_imap().await {
             Ok(client) => client,
@@ -321,28 +369,42 @@ pub async fn check_for_updates(mailbox: String) -> anyhow::Result<()> {
         session.select(mailbox.clone()).await?;
 
         loop {
+            println!("Start IDLE loop");
             let mut idle = session.idle();
             idle.init().await.unwrap();
             let (idle_wait, interrupt) = idle.wait();
             let idle_handle = task::spawn(async move {
-                println!("IDLE: waiting for 30s"); // TODO remove debug prints
-                sleep(Duration::from_secs(30)).await;
-                println!("IDLE: waited 30 secs, now interrupting idle");
+                sleep(Duration::from_mins(25)).await;
                 drop(interrupt);
             });
 
             match idle_wait.await.unwrap() {
-                // TODO add more cases like delete, move...
                 NewData(data) => {
-                    // TODO do not update all emails (IMAP returns * {number} RECENT) and do it only for one mailbox
-                    // TODO CHANGE IT!!!
-                    // let new_paths = download_email_from_imap().await.expect("Cannot download new emails");
-                    // for (uid, path) in new_paths.clone() {
-                    //     match add_email(path.clone(), uid.clone()){
-                    //         Ok(_) => {}
-                    //         Err(_) => {println!("Error adding email from {:?}", path.clone())}
-                    //     };
-                    // }
+                    println!("Parsed response from IDLE: {:?}", data.parsed());
+                    match data.parsed() {
+                        Response::Capabilities(_) => {}
+                        Response::Continue { .. } => {}
+                        Response::Done { .. } => {}
+                        Response::Data { .. } => {}
+                        Response::Expunge(_) => {
+                            let _ = remove_deleted_emails(mailbox.clone()).await;
+                            // check other folders if the email was moved
+                            // download_email_from_imap().await.expect("Cannot download new emails");
+                        }
+                        Response::Vanished { .. } => {}
+                        Response::Fetch(_, _) => {}
+                        Response::MailboxData(_) => {
+                            //TODO check only RECENT
+                            download_email_from_imap().await.expect("Cannot download new emails");
+                        }
+                        Response::Quota(_) => {}
+                        Response::QuotaRoot(_) => {}
+                        Response::Id(_) => {}
+                        Response::Acl(_) => {}
+                        Response::ListRights(_) => {}
+                        Response::MyRights(_) => {}
+                        _ => {}
+                    }
                 }
                 reason => {
                     println!("IDLE failed {:?}", reason);
@@ -350,14 +412,12 @@ pub async fn check_for_updates(mailbox: String) -> anyhow::Result<()> {
             }
 
             // Ensure the idle handle is dropped before the next loop iteration
-            idle_handle.await.unwrap();
+            // idle_handle.await.unwrap();
 
             // Reassign session to prevent ownership issues in the next loop iteration
             session = idle.done().await.unwrap();
         }
-    });
-
-    Ok(())
+    })
 }
 
 /// saves a raw email properly

+ 20 - 1
src/indexes.rs

@@ -347,7 +347,7 @@ impl Indexes {
         None
     }
 
-    fn find_by_uid(messages: &Vec<SerializableMessage>, search_uid: u32) -> Option<SerializableMessage> {
+    pub fn find_by_uid(messages: &Vec<SerializableMessage>, search_uid: u32) -> Option<SerializableMessage> {
         for message in messages {
             if message.uid == search_uid {
                 return Some(message.clone());
@@ -438,6 +438,25 @@ impl Indexes {
         Self::write_to_json(serde_json::to_string_pretty(&threads)?, path)?;
         Ok(())
     }
+
+    pub fn get_local_mailboxes() -> Vec<String> {
+        let mut folders: Vec<String> = Vec::new();
+        for entry in std::fs::read_dir(Config::global().out_dir.clone())
+            .expect("could not read maildir")
+            .filter_map(|m| m.ok())
+        {
+            let metadata = match entry.metadata() {
+                Ok(metadata) => metadata,
+                Err(_) => continue,
+            };
+            if !metadata.is_dir() {
+                continue;
+            }
+            folders.push(entry.file_name().into_string().unwrap());
+        }
+
+        folders
+    }
 }
 
 

+ 73 - 15
src/main.rs

@@ -1,12 +1,16 @@
+#![feature(duration_constructors)]
+
 use mail_parser::{Message};
 use std::error::Error;
 use std::path::{PathBuf};
 use models::*;
 use std::ffi::{OsStr, OsString};
 use std::{fs};
+use std::collections::HashMap;
 use std::fs::{create_dir_all, File};
 use std::future::Future;
 use std::io::{Write};
+use std::sync::Mutex;
 use std::time::{Instant};
 use anyhow::anyhow;
 use serde::{Deserialize, Serialize};
@@ -14,10 +18,12 @@ use crate::indexes::{Indexes};
 use config::{Config, INSTANCE};
 use crate::server::{run_api};
 use mailparse::{DispositionType, parse_mail, ParsedMail};
+use once_cell::sync::Lazy;
 use regex::Regex;
+use tokio::task::JoinHandle;
 use crate::templates::util::parse_email;
 use crate::util::{compress_and_save_file};
-use crate::imap::{delete_folder, rename_folder, create_folder, check_for_updates};
+use crate::imap::{delete_folder, rename_folder, create_folder, check_for_updates, read_local_uids};
 use crate::js::email_scripts;
 use warp::Filter;
 use crate::sonic::{IngestSonic};
@@ -34,6 +40,31 @@ mod server;
 mod js;
 mod sonic;
 
+struct IdleTaskStorage{
+    handlers: HashMap<String, JoinHandle<Result<(), anyhow::Error>>>
+}
+
+impl IdleTaskStorage {
+    fn new() -> Self {
+        IdleTaskStorage {
+            handlers: HashMap::new(),
+        }
+    }
+    
+    pub fn add(&mut self, mailbox: String, handler: JoinHandle<Result<(), anyhow::Error>>) {
+        self.handlers.insert(mailbox, handler);
+    }
+    
+    pub fn delete(&mut self, mailbox: String){
+        if self.handlers.contains_key(&mailbox.clone()) {
+            self.handlers.remove(&mailbox.clone());
+        }
+    }
+}
+
+pub static IDLE_TASK_STORAGE: Lazy<Mutex<IdleTaskStorage>> = Lazy::new(|| Mutex::new(IdleTaskStorage::new()));
+
+
 /// appends an extension to a path
 pub fn append_ext(ext: impl AsRef<OsStr>, path: &PathBuf) -> PathBuf {
     let mut os_string: OsString = path.into();
@@ -231,10 +262,12 @@ pub fn add_email(path: PathBuf, uid: u32) -> anyhow::Result<()>{
 
 /// delete a single message both locally and remotely
 async fn delete_email(message_id: String) -> anyhow::Result<()>{
-    let message = Indexes::delete_from_messages(message_id);
+    let message = Indexes::delete_from_messages(message_id.clone());
     match message {
         Ok(message) => {
+            // delete from indexes
             let _ = Indexes::delete_from_threads(message.clone());
+            let _ = Indexes::delete_from_messages(message_id.clone());
             let lists: Vec<String> = fs::read_dir(Config::global().out_dir.clone())?
                 .filter_map(Result::ok)
                 .filter(|entry| entry.metadata().map(|m| m.is_dir()).unwrap_or(false))
@@ -242,16 +275,19 @@ async fn delete_email(message_id: String) -> anyhow::Result<()>{
                 .collect();
             let _ = Indexes::persist_indexes(lists);
 
+            // delete HTML file
             let message_html_name = Config::global().out_dir.clone()
                 .join(message.list.clone())
                 .join("messages")
                 .join(message.hash.clone() + ".html");
-            let message_xml_name = Config::global().out_dir.clone()
-                .join(message.list.clone())
-                .join("messages_xml")
-                .join(message.hash.clone() + ".xml");
             let _ = fs::remove_file(message_html_name);
-            let _ = fs::remove_file(message_xml_name);
+
+            // delete uid from downloaded uid list
+            let uids_path = Config::global().maildir.clone().join(".uids.json");
+            let mut uids_local = read_local_uids(uids_path.clone()).unwrap();
+            if let Some(set) = uids_local.get_mut(&message.list.clone()) {
+                set.remove(&message.uid.clone().to_string());
+            }
 
             let _ = imap::delete_email_by_uid(message.list.clone(), message.uid.clone()).await;
         }
@@ -272,6 +308,12 @@ pub async fn create_folder_lar(name: String) -> anyhow::Result<()>{
                     Err(_) => Err(anyhow!("Cannot create folder locally")),
                 }
             }
+
+            // start monitoring
+            let handler = check_for_updates(name.clone());
+            let mut idle_task_storage = IDLE_TASK_STORAGE.lock().unwrap();
+            idle_task_storage.add(name.clone(), handler);
+            
             Ok(())
         }
         Err(_) => Err(anyhow!("Cannot create folder remotely"))
@@ -282,15 +324,20 @@ pub async fn create_folder_lar(name: String) -> anyhow::Result<()>{
 pub async fn delete_folder_lar(name: String) -> anyhow::Result<()>{
     match delete_folder(name.clone()).await {
         Ok(_) => {
-            let folder_path = Config::global().out_dir.clone().join(name);
+            let folder_path = Config::global().out_dir.clone().join(name.clone());
             if folder_path.exists() {
                 return match fs::remove_dir_all(folder_path) {
                     Ok(_) => Ok(()),
                     Err(_) => Err(anyhow!("Cannot delete folder locally")),
                 }
             }
+
+            // stop monitoring the folder
+            let mut idle_task_storage = IDLE_TASK_STORAGE.lock().unwrap();
+            idle_task_storage.delete(name.clone());
+            
             Ok(())
-        }
+        }        
         Err(_) => Err(anyhow!("Cannot delete folder remotely"))
     }
 }
@@ -299,14 +346,23 @@ pub async fn delete_folder_lar(name: String) -> anyhow::Result<()>{
 pub async fn rename_folder_lar(old_name: String, new_name: String) -> anyhow::Result<()>{
     match rename_folder(old_name.clone(), new_name.clone()).await {
         Ok(_) => {
-            let old_path = Config::global().out_dir.clone().join(old_name);
-            let new_path = Config::global().out_dir.clone().join(new_name);
+            let old_path = Config::global().out_dir.clone().join(old_name.clone());
+            let new_path = Config::global().out_dir.clone().join(new_name.clone());
             if old_path.exists() {
                 return match fs::rename(old_path, new_path) {
                     Ok(_) => Ok(()),
                     Err(_) => Err(anyhow!("Cannot rename folder locally"))
                 }
             }
+
+            // stop monitoring old folder
+            let mut idle_task_storage = IDLE_TASK_STORAGE.lock().unwrap();
+            idle_task_storage.delete(old_name.clone());
+            
+            // start monitoring new folder
+            let handler = check_for_updates(new_name.clone());
+            idle_task_storage.add(new_name.clone(), handler);
+            
             Ok(())
         }
         Err(_) => Err(anyhow!("Cannot rename folder remotely"))
@@ -351,10 +407,12 @@ async fn run(){
     //     uid += 1;
     // }
 
-
-    // monitor updates in INBOX
-    if let Err(e) = check_for_updates("INBOX".to_string()).await {
-        eprintln!("Failed to monitor mailbox: {:?}", e);
+    
+    // start updates monitoring
+    for mailbox in Indexes::get_local_mailboxes() {
+        let handler = check_for_updates(mailbox.clone());
+        let mut idle_task_storage = IDLE_TASK_STORAGE.lock().unwrap();
+        idle_task_storage.add(mailbox.clone(), handler);
     }
     
     // dont stop the program while API is running

+ 1 - 16
src/server.rs

@@ -141,22 +141,7 @@ pub async fn get_attachments_info(folder: String, id: String) -> String{
 }
 
 pub async fn get_folders() -> String {
-    let mut folders: Vec<String> = Vec::new();
-    for entry in std::fs::read_dir(Config::global().out_dir.clone())
-        .expect("could not read maildir")
-        .filter_map(|m| m.ok())
-    {
-        let metadata = match entry.metadata() {
-            Ok(metadata) => metadata,
-            Err(_) => continue,
-        };
-        if !metadata.is_dir() {
-            continue;
-        }
-        folders.push(entry.file_name().into_string().unwrap());
-    }
-
-    serde_json::to_string(&folders).unwrap()
+    serde_json::to_string(&Indexes::get_local_mailboxes()).unwrap()
 }
 
 pub async fn get_email(id: String) -> String { // TODO replace with Indexes::get_messages()