Browse Source

IMAP update (need wasi fix)

Yurii Sokolovskyi 3 months ago
parent
commit
3e4ceef9ed
5 changed files with 239 additions and 57 deletions
  1. 1 1
      Cargo.toml
  2. 74 2
      src/imap.rs
  3. 77 14
      src/imap/imap_client.rs
  4. 85 40
      src/main.rs
  5. 2 0
      src/util.rs

+ 1 - 1
Cargo.toml

@@ -35,7 +35,7 @@ base64 = "0.22.1"
 lettre = {version = "0.11.7", default-features = false, features = ["builder"]}
 
 [target.'cfg(target_os = "wasi")'.dependencies]
-tokio_wasi = {version = "1.25.2", features = ["full"] }
+tokio_wasi = {version = "1.25.2", features = ["full", "rt", "rt-multi-thread", "macros", "time"] }
 tokio-rustls-wasi = "0.25.0-alpha"
 httpcodec = "0.2.3"
 bytecodec = "0.4.15"

+ 74 - 2
src/imap.rs

@@ -2,22 +2,25 @@ use std::collections::{HashMap, HashSet};
 use std::error::Error;
 use std::{io};
 use std::fs::File;
+use std::future::Future;
 use std::net::{TcpStream};
 use std::path::{PathBuf};
 use std::io::{BufReader, BufWriter, ErrorKind, Read, Write};
+use std::time::Duration;
 use kuchiki::iter::NodeIterator;
 use serde_json::to_writer;
+use tokio::task::JoinHandle;
 use crate::config::Config;
 
 #[cfg(not(target_os = "wasi"))]
 use native_tls::{TlsStream};
 #[cfg(not(target_os = "wasi"))]
 use imap::{Session};
-
+use crate::{add_email};
 #[cfg(target_os = "wasi")]
 use crate::imap::imap_client::{Client, connect_to_imap_server};
 
-mod imap_client;
+pub(crate) mod imap_client;
 
 #[cfg(not(target_os = "wasi"))]
 pub async fn download_email_from_imap(maildir_path: PathBuf, imap_domain: String, username: String, password: String) -> Result<Vec<(u32, PathBuf)>, Box<dyn Error>>{
@@ -345,6 +348,75 @@ pub async fn delete_folder(name: String) -> anyhow::Result<()>{
     Ok(())
 }
 
+#[cfg(target_os = "wasi")]
+pub async fn check_for_updates(mailbox: String) -> anyhow::Result<()>{
+    let mut client = connect_to_imap_server(Config::global().imap_domain.clone(), Config::global().imap_port.clone() as u16).await?;
+    client.login(Config::global().username.clone(), Config::global().password.clone()).await?;
+
+    client.select(mailbox).await?;
+    let mut id = client.idle().await?;
+
+    println!("Start looking for updates");
+    loop {
+        println!("UPDATE loop");
+        client.read_response_by_keyword("EXISTS".to_string()).await?;
+
+        // TODO do not update all emails (IMAP returns * {number} RECENT) and do it only for one mailbox
+        let new_paths = download_email_from_imap(
+            Config::global().maildir.clone(),
+            Config::global().imap_domain.clone(),
+            Config::global().username.clone(),
+            Config::global().password.clone()
+        ).await.expect("Cannot download new emails");
+
+        for (uid, path) in new_paths.clone() {
+            match add_email(path.clone(), uid.clone()){
+                Ok(_) => {}
+                Err(_) => {println!("Error adding email from {:?}", path.clone())}
+            };
+        }
+    }
+
+    client.idle_done().await;
+    client.logout().await;
+    Ok(()) 
+}
+
+#[cfg(not(target_os = "wasi"))]
+pub async fn check_for_updates(mailbox: String) -> anyhow::Result<()>{
+    let tls = native_tls::TlsConnector::builder().build().unwrap();
+    let client = imap::connect((Config::global().imap_domain.clone(), Config::global().imap_port.clone() as u16), Config::global().imap_domain.clone(), &tls).unwrap();
+    let mut imap_session = client
+        .login(Config::global().username.clone(), Config::global().password.clone())
+        .map_err(|e| e.0)?;
+
+    imap_session.select(mailbox)?;
+
+    println!("Start looking for updates");
+    loop {
+        let idle = imap_session.idle()?;
+        idle.wait()?;
+
+        // TODO do not update all emails (IMAP returns * {number} RECENT) and do it only for one mailbox
+        let new_paths = download_email_from_imap(
+            Config::global().maildir.clone(),
+            Config::global().imap_domain.clone(),
+            Config::global().username.clone(),
+            Config::global().password.clone()
+        ).await.expect("Cannot download new emails");
+
+        for (uid, path) in new_paths.clone() {
+            match add_email(path.clone(), uid.clone()){
+                Ok(_) => {}
+                Err(_) => {println!("Error adding email from {:?}", path.clone())}
+            };
+        }
+    }
+
+    imap_session.logout()?;
+    Ok(())
+}
+
 fn store(
     path: PathBuf,
     uid: String,

+ 77 - 14
src/imap/imap_client.rs

@@ -1,25 +1,28 @@
 #![cfg(target_os = "wasi")]
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 use regex::Regex;
 use rustls_pki_types::ServerName;
 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
 use tokio::net::TcpStream;
 use tokio_rustls::rustls::{ClientConfig, RootCertStore};
 use std::convert::TryFrom;
+use std::time::Duration;
 use anyhow::anyhow;
 use imap_proto::{self, MailboxDatum};
 use imap_proto::parser::parse_response;
+use tokio::time::{sleep, timeout};
 
 pub struct Client{
-    reader: BufReader<ReadHalf<tokio_rustls::client::TlsStream<TcpStream>>>,
-    writer: WriteHalf<tokio_rustls::client::TlsStream<TcpStream>>
+    reader: Arc<tokio::sync::Mutex<BufReader<ReadHalf<tokio_rustls::client::TlsStream<TcpStream>>>>>,
+    writer: Arc<tokio::sync::Mutex<WriteHalf<tokio_rustls::client::TlsStream<TcpStream>>>>
 }
 
 impl Client {
     pub fn new(stream: tokio_rustls::client::TlsStream<tokio::net::TcpStream>) -> Client {
         let (reader, writer) = tokio::io::split(stream);
-        let reader = BufReader::new(reader);
-
+        let reader = Arc::new(tokio::sync::Mutex::new(BufReader::new(reader)));
+        let writer = Arc::new(tokio::sync::Mutex::new(writer));
+        
         Client {
             reader,
             writer,
@@ -27,16 +30,23 @@ impl Client {
     }
 
     async fn run_command(&mut self, command: String) -> anyhow::Result<()>{
-        self.writer.write_all((command + "\r\n").as_bytes()).await?;
-        self.writer.flush().await?;
+        let writer_clone = Arc::clone(&self.writer);
+        let mut writer = writer_clone.lock().await;
+        
+        (*writer).write_all((command + "\r\n").as_bytes()).await?;
+        (*writer).flush().await?;
+        drop(writer);
 
         Ok(())
     }
 
-    async fn read_response(&mut self, id: String) -> anyhow::Result<String>{
+    pub async fn read_response_by_id(&mut self, id: String) -> anyhow::Result<String>{
         let mut response = String::new();
         let mut temp = String::new();
-        while let Ok(bytes) = self.reader.read_line(&mut temp).await {
+        let reader_clone = Arc::clone(&self.reader);
+        let mut reader = reader_clone.lock().await;
+
+        while let Ok(bytes) = (*reader).read_line(&mut temp).await {
             println!("Temp: {:?}", temp);
             if bytes == 0 || temp.contains(&*id.to_string()) { // if bytes == 0 || temp.contains(&*format!("{} OK", id))
                 break;
@@ -44,15 +54,56 @@ impl Client {
             response += &*temp;
             temp.clear();
         }
+        drop(reader);
 
         Ok(response)
     }
 
+    pub async fn read_response_by_keyword(&self, key: String) -> anyhow::Result<String> {
+        // Clone the Arc to move into the async task
+        let reader_clone = Arc::clone(&self.reader);
+
+        let handler = tokio::spawn(async move {
+            let mut response = String::new();
+            let mut temp = String::new();
+            loop {
+                // Lock the reader for access
+                let mut reader = reader_clone.lock().await;
+
+                // Use tokio::select! for concurrent operations
+                let result = timeout(Duration::from_secs(2), reader.read_line(&mut temp)).await;
+                match result {
+                    Ok(Ok(bytes)) => {
+                        println!("Temp: {:?}", temp);
+                        if bytes == 0 || temp.contains(&*key) {
+                            return Ok(response)
+                        }
+                        response += &*temp;
+                        temp.clear();
+                    }
+                    Ok(Err(e)) => {
+                        eprintln!("Error reading line: {:?}", e);
+                        return Err(anyhow!("Error reading line from IMAP server"));
+                    }
+                    _ => {
+                        sleep(Duration::from_secs(1)).await;
+                        tokio::task::yield_now().await;
+                    }
+                }
+                
+                drop(reader);
+            }
+        });
+
+        // Wait for the handler to complete and return the result
+        handler.await.unwrap_or_else(|e| Err(anyhow!("Task panicked: {:?}", e)))
+    }
+
     pub async fn login(&mut self, username: String, password: String) -> anyhow::Result<String>{
         let id = "aLogin".to_string();
         self.run_command(format!("{} LOGIN {} {}", id, username, password)).await?;
 
-        Ok(self.read_response(id).await?)
+        Ok(self.read_response_by_id(id).await?)
     }
 
     pub async fn logout(&mut self) -> anyhow::Result<()>{
@@ -64,7 +115,7 @@ impl Client {
     pub async fn list(&mut self) -> anyhow::Result<Vec<String>>{
         let list_command = "aList LIST \"\" \"*\"";
         self.run_command(list_command.to_string()).await.expect("Unable to run a command");
-        let lists = self.read_response("aList".to_string()).await?;
+        let lists = self.read_response_by_id("aList".to_string()).await?;
         let mut lines = lists.as_bytes();
         
         let mut things = Vec::new();
@@ -99,7 +150,7 @@ impl Client {
         let id = "aSelect".to_string();
         self.run_command(format!("{} SELECT {}", id, list)).await.expect("Unable to run a command");
 
-        self.read_response(id).await
+        self.read_response_by_id(id).await
     }
 
 
@@ -107,7 +158,7 @@ impl Client {
         let id = "aUidSearch".to_string(); // Tag for the command
         self.run_command(format!("{} UID SEARCH {}", id, query)).await.expect("Unable to run a command");
 
-        Ok(Self::uids_parse(self.read_response(id).await?).unwrap())
+        Ok(Self::uids_parse(self.read_response_by_id(id).await?).unwrap())
     }
 
     fn uids_parse(uids: String) -> anyhow::Result<Vec<String>>{
@@ -123,7 +174,7 @@ impl Client {
         let id = "aUidFetch".to_string();
         self.run_command(format!("{} UID FETCH {} {}", id, uid, query)).await.expect("Unable to run a command");
 
-        Ok(self.read_response(id).await?)
+        Ok(self.read_response_by_id(id).await?)
     }
     
     pub async fn mark_deleted(&mut self, uid: u32) -> anyhow::Result<()>{
@@ -160,6 +211,18 @@ impl Client {
         self.run_command(format!("{} RENAME \"{}\" \"{}\"", id, name, new_name)).await.unwrap();
         Ok(())
     }
+    
+    pub async fn idle(&mut self) -> anyhow::Result<String>{
+        let id = "aIdle".to_string();
+        self.run_command(format!("{} IDLE", id)).await.unwrap();
+        Ok(id)
+    }
+
+    pub async fn idle_done(&mut self) -> anyhow::Result<String>{
+        let id = "aIdleDone".to_string();
+        self.run_command(format!("{} DONE", id)).await.unwrap();
+        Ok(id)
+    }
 }
 
 pub async fn connect_to_imap_server(domain: String, port: u16) -> anyhow::Result<Client> {

+ 85 - 40
src/main.rs

@@ -1,13 +1,14 @@
 use mail_parser::{Message};
 use std::collections::{HashMap, HashSet};
 use std::error::Error;
-use std::path::{PathBuf};
+use std::path::{Path, PathBuf};
 use models::*;
 use std::ffi::{OsStr, OsString};
-use std::fs;
+use std::{fs, thread};
 use std::fs::{create_dir_all, File, OpenOptions};
 use std::io::{BufReader, Write};
 use std::net::SocketAddr;
+use std::time::Duration;
 use anyhow::anyhow;
 use serde::Deserialize;
 use crate::indexes::{Indexes, SerializableMessage, SerializableThread};
@@ -36,6 +37,8 @@ use bytecodec::DecodeExt;
 use httpcodec::{HttpVersion, ReasonPhrase, Request, RequestDecoder, Response, StatusCode, Header};
 #[cfg(target_os = "wasi")]
 use lettre::message::{Mailbox, Message as LettreMessage, MultiPart, SinglePart};
+use tokio::runtime::Runtime;
+use tokio::time::sleep;
 #[cfg(target_os = "wasi")]
 use wasmedge_wasi_socket::{Shutdown, TcpListener, TcpStream};
 #[cfg(target_os = "wasi")]
@@ -194,6 +197,30 @@ fn persist_email(msg: &Message, uid: u32, list: String, original_path: PathBuf)
     Ok(())
 }
 
+pub fn add_email(path: PathBuf, uid: u32) -> anyhow::Result<()>{
+    let maildir = match path.parent() {
+        None => {return Err(anyhow!("Cannot retrieve parent folder from the path"))}
+        Some(parent) => {
+            match parent.parent() {
+                None => {return Err(anyhow!("Cannot retrieve parent folder from the path"))}
+                Some(parent) => {parent.file_name().unwrap().to_owned().into_string().unwrap()}
+            }
+        }
+    };
+    let maildir_out_path = Config::global().out_dir.clone().join(maildir.clone());
+    if !maildir_out_path.exists() { create_dir_all(maildir_out_path).ok(); }
+
+    let data = std::fs::read(&path).expect("could not read mail");
+
+    if let Some(mail) = Message::parse(&data) {
+        let _ = persist_email(&mail, uid.clone(), maildir.clone(), path.clone());
+    } else {
+        println!("Could not parse message {:?}", path);
+    }
+    
+    Ok(())
+}
+
 fn parse_args(){
     let args = arg::Args::from_env();
     
@@ -211,7 +238,7 @@ async fn run(){
 
     // downloading new emails
     let new_paths = imap::download_email_from_imap(
-        Config::global().maildir.clone(),
+        Config::global().maildir.clone(), // tODO remove all args
         Config::global().imap_domain.clone(),
         Config::global().username.clone(),
         Config::global().password.clone()
@@ -219,8 +246,7 @@ async fn run(){
 
     
     let mut lists: Vec<String> = Vec::new();
-    
-    // reading the folder with lists
+
     for mail_list in std::fs::read_dir(Config::global().maildir.clone())
         .expect("could not read maildir")
         .filter_map(|m| m.ok())
@@ -228,28 +254,25 @@ async fn run(){
         let dir_name = mail_list.file_name().into_string().unwrap();
         
         if dir_name.starts_with('.') || ["cur", "new", "tmp"].contains(&dir_name.as_str()) {
-            continue;
+            continue; 
         }
+        
         lists.push(dir_name.clone());
+        let out_dir_name = Config::global().out_dir.clone().join(dir_name);
+        if !out_dir_name.exists() { create_dir_all(out_dir_name).ok(); }
+    }
+    
+    // let mut uid = 1; // fake uid for tests
+    // for entry in std::fs::read_dir(Config::global().maildir.clone().join("Sent").join("new")).expect("could not read maildir").filter_map(|m| m.ok()){
+    for (uid, path) in new_paths.clone() {
+        // let path = entry.path();
+
+        match add_email(path.clone(), uid.clone()){
+            Ok(_) => {}
+            Err(_) => {println!("Error adding email from {:?}", path.clone())}
+        };
         
-        if !Config::global().out_dir.clone().join(dir_name.clone()).exists(){
-            std::fs::create_dir_all(Config::global().out_dir.clone().join(dir_name.clone())).ok();
-        }
-        
-        // let mut uid = 1; // fake uid for tests
-        // for entry in std::fs::read_dir(Config::global().maildir.clone().join(mail_list.path().clone()).join("new")).expect("could not read maildir").filter_map(|m| m.ok()){
-        for (uid, path) in new_paths.clone().into_iter().filter(|(u16, path)| path.starts_with(mail_list.path())) {
-            // let path = entry.path();
-            
-            let data = std::fs::read(&path).expect("could not read mail");
-            
-            if let Some(mail) = Message::parse(&data) {
-                let _ = persist_email(&mail, uid.clone(), dir_name.clone(), path.clone());
-            } else {
-                println!("Could not parse message {:?}", path);
-            }
-            // uid += 1;
-        }
+        // uid += 1;
     }
 
     let threads_indexes_path = Config::global().out_dir.clone().join("threads.json");
@@ -322,14 +345,20 @@ async fn main() -> anyhow::Result<()> {
     // imap::delete_folder("newTestFolder".to_string()).await.unwrap();
     
     
+    // looking for updates
+    // let one = tokio::spawn(async {
+    //     let imap_update_handler = imap::check_for_updates("INBOX".to_string()).await;
+    // });
+    
     // API
+    
     println!("Server is running on {}", Config::global().api_port);
     let listener = TcpListener::bind(format!("{}:{}", Config::global().api_addr, Config::global().api_port), false)?;
     loop {
         let (stream, _) = listener.accept(false)?;
         let _ = handle_client(stream).await;
     }
-
+    
     Ok(())
 }
 
@@ -360,6 +389,12 @@ async fn main() {
     // delete_email("lqo7m8r2.1js7w080jvr2o@express.medallia.com".to_string()).await.expect("Unable to delete an email");
 
 
+    // looking for updates
+    let imap_update_handle = tokio::spawn(async move {
+        imap::check_for_updates("INBOX".to_string()).await.expect("TODO: panic message");
+    });
+    
+
     // imap::create_folder("testFolder".to_string()).await.unwrap();
     // imap::rename_folder("testFolder".to_string(), "newTestFolder".to_string()).await.unwrap();
     // imap::delete_folder("newTestFolder".to_string()).await.unwrap();
@@ -367,21 +402,31 @@ async fn main() {
     
     // API
     // Define the CORS layer
-    let cors = CorsLayer::new()
-        .allow_origin(Any)
-        .allow_methods(vec![Method::GET, Method::POST])
-        .allow_headers(Any);
-    
-    let app = Router::new()
-        .route("/folders", get(get_folders_handle))
-        .route("/sorted_threads_by_date", get(sorted_threads_by_date_handle))
-        .route("/get_thread_messages", get(get_thread_messages_handle))
-        .route("/email", get(get_email_handle))
-        .layer(cors);
-    
-    let listener = tokio::net::TcpListener::bind(format!("{}:{}", Config::global().api_addr, Config::global().api_port)).await.unwrap();
-    println!("Server is running on {}", Config::global().api_port);
-    axum::serve(listener, app).await.unwrap();
+    let handle = thread::spawn(|| {
+        let cors = CorsLayer::new()
+            .allow_origin(Any)
+            .allow_methods(vec![Method::GET, Method::POST])
+            .allow_headers(Any);
+
+        let app = Router::new()
+            .route("/folders", get(get_folders_handle))
+            .route("/sorted_threads_by_date", get(sorted_threads_by_date_handle))
+            .route("/get_thread_messages", get(get_thread_messages_handle))
+            .route("/email", get(get_email_handle))
+            .layer(cors);
+
+        
+        
+        let rt = Runtime::new().unwrap();
+        // Use the runtime to block on the async task
+        rt.block_on(async {
+            let listener = tokio::net::TcpListener::bind(format!("{}:{}", Config::global().api_addr, Config::global().api_port)).await.unwrap();
+            println!("Server is running on {}", Config::global().api_port);
+            axum::serve(listener, app).await.unwrap();
+        });
+    });
+
+    handle.join().unwrap();
 }
 
 #[cfg(not(target_os = "wasi"))]

+ 2 - 0
src/util.rs

@@ -1,3 +1,5 @@
+use std::path::PathBuf;
+
 // Truncate a string, adding ellpises if it's too long
 pub fn truncate_ellipsis(s: &str, n: usize) -> String {
     let mut out = String::with_capacity(n);