Forráskód Böngészése

sonic search queue added

Yurii Sokolovskyi 1 hónapja
szülő
commit
63f70a986d
6 módosított fájl, 175 hozzáadás és 26 törlés
  1. 1 1
      Cargo.toml
  2. 1 1
      src/imap.rs
  3. 8 18
      src/main.rs
  4. 1 1
      src/models.rs
  5. 162 0
      src/search.rs
  6. 2 5
      src/templates/xml.rs

+ 1 - 1
Cargo.toml

@@ -16,7 +16,7 @@ linkify = "0.10.0"
 html5ever = "0.24.0 "
 select = "0.5.0"
 mail-builder = {git = "https://github.com/alexwennerberg/mail-builder"}
-mail-parser = "0.8.2"
+mail-parser = {version = "0.8.2", features = ["serde_support"]}
 once_cell = "1.9.0"
 urlencoding = "2.1.0"
 mailparse = "0.13.0"

+ 1 - 1
src/imap.rs

@@ -176,7 +176,7 @@ pub async fn fetch_and_store_emails(session: &mut Session<TlsStream<TcpStream>>,
                         stored_paths.push((uid.to_string().parse().unwrap(), file.clone()));
                         
                         // persist to the output dir
-                        match add_email(file.clone(), uid.clone()){
+                        match add_email(file.clone(), uid.clone()).await{
                             Ok(_) => {}
                             Err(_) => {println!("Error adding email from {:?}", file.clone())}
                         };

+ 8 - 18
src/main.rs

@@ -26,7 +26,7 @@ use crate::util::{compress_and_save_file};
 use crate::imap::{delete_folder, rename_folder, create_folder, check_for_updates, read_local_uids};
 use crate::js::email_scripts;
 use warp::Filter;
-use crate::sonic::{IngestSonic};
+use crate::search::{ingest_new_message, MY_TASK};
 
 mod smtp_client;
 mod arg;
@@ -40,6 +40,7 @@ mod server;
 mod js;
 mod sonic;
 mod api_broadcast_structure;
+mod search;
 
 struct ImapTaskStorage {
     downloading: Option<JoinHandle<()>>,
@@ -203,8 +204,6 @@ fn retrieve_and_save_attachments(parsed_mail: ParsedMail, list: String, file_nam
 
 /// saves email to local file system
 fn persist_email(msg: &Message, uid: u32, list: String, original_path: PathBuf) -> anyhow::Result<()>{
-    let mut indexes = Indexes::new();
-    
     // creating all folders
     if Config::global().out_dir.exists() {
         std::fs::create_dir_all(&Config::global().out_dir).ok();
@@ -232,20 +231,8 @@ fn persist_email(msg: &Message, uid: u32, list: String, original_path: PathBuf)
         };
     }
     
-    // ingest text from the message to the sonic server // TODO create as queue if server is not accessible
-    // TODO add attachments
-    match IngestSonic::new(){
-        Ok(mut ingest_channel) => {
-            let data = message.to_ingest();
-            match ingest_channel.ingest_document("emails", &*list, &*message.hash.clone(), &*data) { // TODO change message.hash to id
-                Ok(_) => {}
-                Err(_) => {println!("Unable to ingest {} email to sonic search", message.hash.clone())}
-            };
-            let _ = ingest_channel.stop_ingest();
-        }
-        Err(_) => {}
-    };
-    
+    // ingest new massage to the sonic server
+    ingest_new_message(message.clone())?;
     
     // indexing
     let _ = Indexes::add(message.clone());
@@ -254,7 +241,7 @@ fn persist_email(msg: &Message, uid: u32, list: String, original_path: PathBuf)
 }
 
 /// read raw email and saves HTML version
-pub fn add_email(path: PathBuf, uid: u32) -> anyhow::Result<()>{
+pub async fn add_email(path: PathBuf, uid: u32) -> anyhow::Result<()>{
     let maildir = match path.parent() {
         None => {return Err(anyhow!("Cannot retrieve parent folder from the path"))}
         Some(parent) => {
@@ -414,6 +401,9 @@ async fn run(){
     // parse args from CLI
     parse_args();
 
+    // monitor the sonic server
+    MY_TASK.run();
+    
     // API
     let api_task = tokio::spawn(async move {
         run_api().await

+ 1 - 1
src/models.rs

@@ -6,7 +6,7 @@ use hex::encode as hex_encode;
 use serde::{Deserialize, Serialize};
 
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct StrMessage {
     pub id: String,
     pub hash: String,

+ 162 - 0
src/search.rs

@@ -0,0 +1,162 @@
+use std::fs::{File, OpenOptions};
+use serde::{Deserialize, Serialize};
+use crate::config::Config;
+use crate::models::StrMessage;
+use crate::sonic::IngestSonic;
+use std::io::{Read, Write};
+use std::sync::{Arc, Mutex, RwLock};
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+use tokio::{task, time};
+
+pub struct MyTask {
+    is_running: AtomicBool,
+}
+
+impl MyTask {
+    pub fn new() -> Self {
+        MyTask {
+            is_running: AtomicBool::new(false),
+        }
+    }
+
+    pub fn run(&self) {
+        // Check if the task is already running
+        if !self.is_running.load(Ordering::SeqCst) {
+            // Spawn the background task using the Arc struct
+            let task_handle = MY_TASK.clone();
+            tokio::spawn(async move {
+                task_handle.run_background().await;
+            });
+        }
+    }
+
+    pub fn start(&self) {
+        // Set the running flag to false to stop the task
+        println!("Started");
+        self.is_running.store(true, Ordering::SeqCst);
+    }
+    
+    pub fn stop(&self) {
+        // Set the running flag to false to stop the task
+        self.is_running.store(false, Ordering::SeqCst);
+    }
+
+    async fn run_background(self: Arc<Self>) {
+        loop {
+            // Check if the task should keep running
+            if self.is_running.load(Ordering::SeqCst) {
+                check_server().await;
+                println!("Task is running");
+            } else {
+                println!("Task is stopped");
+            }
+
+            // Wait for 10 seconds before running again
+            tokio::time::sleep(Duration::from_secs(10)).await;
+        }
+    }
+}
+
+// Static variable to hold the task
+pub static MY_TASK: once_cell::sync::Lazy<Arc<MyTask>> = once_cell::sync::Lazy::new(|| {
+    Arc::new(MyTask::new())
+});
+
+pub async fn check_server(){
+    match IngestSonic::new(){
+        Ok(_) => {
+            MY_TASK.stop();
+            upload_from_queue().await.unwrap();
+        }
+        Err(_) => {}
+    };
+}
+
+pub fn ingest_new_message(message: StrMessage) -> anyhow::Result<()>{
+    // ingest text from the message to the sonic server
+    // TODO add attachments
+    match IngestSonic::new(){
+        Ok(mut ingest_channel) => {
+            let data = message.to_ingest();
+            match ingest_channel.ingest_document("emails", &*message.list.clone(), &*message.id.clone(), &*data) {
+                Ok(_) => {}
+                Err(_) => write_to_queue(message)?
+            };
+            let _ = ingest_channel.stop_ingest();
+        }
+        Err(_) => write_to_queue(message)?
+    };
+    
+    Ok(())
+}
+
+pub fn write_to_queue(message: StrMessage) -> anyhow::Result<()>{
+    MY_TASK.start();
+    
+    let mut records: Vec<StrMessage> = Vec::new();
+
+    let file_path = Config::global().out_dir.clone().join(".sonic_search_queue.json");
+    
+    // Open or create the file if it doesn't exist
+    let mut file = OpenOptions::new()
+        .read(true)
+        .write(true)
+        .create(true)
+        .open(file_path.clone())?;
+
+    // Read the existing file contents if it's not empty
+    let mut file_contents = String::new();
+    file.read_to_string(&mut file_contents)?;
+
+    // Deserialize existing data if the file is not empty
+    if !file_contents.trim().is_empty() {
+        records = serde_json::from_str(&file_contents)?;
+    }
+
+    // Add the new record to the list
+    records.push(message.clone());
+
+    // Serialize the updated records back to JSON
+    let updated_json = serde_json::to_string_pretty(&records)?;
+
+    // Write the updated JSON back to the file
+    let mut file = File::create(file_path.clone())?;
+    file.write_all(updated_json.as_bytes())?;
+    
+    Ok(())
+}
+
+pub async fn upload_from_queue() -> anyhow::Result<()>{
+    let mut records: Vec<StrMessage> = Vec::new();
+
+    let file_path = Config::global().out_dir.clone().join(".sonic_search_queue.json");
+
+    // Open or create the file if it doesn't exist
+    let mut file = OpenOptions::new()
+        .read(true)
+        .write(true)
+        .create(true)
+        .open(file_path.clone())?;
+
+    // Read the existing file contents if it's not empty
+    let mut file_contents = String::new();
+    file.read_to_string(&mut file_contents)?;
+
+    // Deserialize existing data if the file is not empty
+    if !file_contents.trim().is_empty() {
+        records = serde_json::from_str(&file_contents)?;
+    }
+
+    for record in records {
+        match ingest_new_message(record){
+            Ok(_) => {}
+            Err(_) => {}
+        }
+    }
+
+    let mut file = File::create(file_path.clone())?;
+    file.write_all("".as_bytes())?;
+    
+    Ok(())
+}

+ 2 - 5
src/templates/xml.rs

@@ -76,15 +76,12 @@ impl StrMessage {
 
         let data = match fs::read_to_string(self.original_path.clone()) {
             Ok(content) => content,
-            Err(e) => {
-                eprintln!("Error reading file: {}", e);
-                return "Error loading content".to_string();
-            }
+            Err(_) => return String::new()
         };
 
         let parsed_mail = match parse_mail(&data.as_bytes()) {
             Ok(mail) => mail,
-            Err(e) => return format!("Error parsing email: {}", e),
+            Err(_) => return String::new(),
         };
         let html = parse_email(&parsed_mail).unwrap_or_else(|_| msg.body.clone());