Jelajahi Sumber

endpoint /search now returns Vec<SerializableMessage>

Yurii Sokolovskyi 1 bulan lalu
induk
melakukan
ce0604c70c
5 mengubah file dengan 27 tambahan dan 23 penghapusan
  1. 1 0
      src/imap.rs
  2. 3 1
      src/indexes.rs
  3. 2 2
      src/main.rs
  4. 8 17
      src/search.rs
  5. 13 3
      src/sonic.rs

+ 1 - 0
src/imap.rs

@@ -128,6 +128,7 @@ pub async fn list_imap_folders(session: &mut Session<TlsStream<TcpStream>>) -> a
 
 /// download all emails from one mailbox 
 pub async fn fetch_and_store_emails(session: &mut Session<TlsStream<TcpStream>>, list: String) -> anyhow::Result<Vec<(u32, PathBuf)>> {
+    // TODO send a message to UI about start and end of the download
     let out_dir_name = Config::global().out_dir.clone().join(list.clone());
     if !out_dir_name.exists() { create_dir_all(out_dir_name).ok(); }
     

+ 3 - 1
src/indexes.rs

@@ -337,7 +337,8 @@ impl Indexes {
         Ok(())
     }
 
-    fn find_by_id(messages: &Vec<SerializableMessage>, search_id: String) -> Option<SerializableMessage> {
+    pub fn find_by_id(messages: &Vec<SerializableMessage>, search_id: String) -> Option<SerializableMessage> {
+        // TODO remove messages from arguments
         for message in messages {
             if message.id == search_id {
                 return Some(message.clone());
@@ -347,6 +348,7 @@ impl Indexes {
     }
 
     pub fn find_by_uid(messages: &Vec<SerializableMessage>, search_uid: u32) -> Option<SerializableMessage> {
+        // TODO remove messages from arguments
         for message in messages {
             if message.uid == search_uid {
                 return Some(message.clone());

+ 2 - 2
src/main.rs

@@ -26,7 +26,7 @@ use crate::util::{compress_and_save_file};
 use crate::imap::{delete_folder, rename_folder, create_folder, check_for_updates, read_local_uids};
 use crate::js::email_scripts;
 use warp::Filter;
-use crate::search::{ingest_new_message, MY_TASK};
+use crate::search::{ingest_new_message, SONIC_MONITOR};
 
 mod smtp_client;
 mod arg;
@@ -402,7 +402,7 @@ async fn run(){
     parse_args();
 
     // monitor the sonic server
-    MY_TASK.run();
+    SONIC_MONITOR.run();
     
     // API
     let api_task = tokio::spawn(async move {

+ 8 - 17
src/search.rs

@@ -9,13 +9,13 @@ use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::Duration;
 use tokio::{task, time};
 
-pub struct MyTask {
+pub struct SonicMonitor {
     is_running: AtomicBool,
 }
 
-impl MyTask {
+impl SonicMonitor {
     pub fn new() -> Self {
-        MyTask {
+        SonicMonitor {
             is_running: AtomicBool::new(false),
         }
     }
@@ -24,7 +24,7 @@ impl MyTask {
         // Check if the task is already running
         if !self.is_running.load(Ordering::SeqCst) {
             // Spawn the background task using the Arc struct
-            let task_handle = MY_TASK.clone();
+            let task_handle = SONIC_MONITOR.clone();
             tokio::spawn(async move {
                 task_handle.run_background().await;
             });
@@ -32,41 +32,32 @@ impl MyTask {
     }
 
     pub fn start(&self) {
-        // Set the running flag to false to stop the task
-        println!("Started");
         self.is_running.store(true, Ordering::SeqCst);
     }
     
     pub fn stop(&self) {
-        // Set the running flag to false to stop the task
         self.is_running.store(false, Ordering::SeqCst);
     }
 
     async fn run_background(self: Arc<Self>) {
         loop {
-            // Check if the task should keep running
             if self.is_running.load(Ordering::SeqCst) {
                 check_server().await;
-                println!("Task is running");
-            } else {
-                println!("Task is stopped");
             }
-
-            // Wait for 10 seconds before running again
             tokio::time::sleep(Duration::from_secs(10)).await;
         }
     }
 }
 
 // Static variable to hold the task
-pub static MY_TASK: once_cell::sync::Lazy<Arc<MyTask>> = once_cell::sync::Lazy::new(|| {
-    Arc::new(MyTask::new())
+pub static SONIC_MONITOR: once_cell::sync::Lazy<Arc<SonicMonitor>> = once_cell::sync::Lazy::new(|| {
+    Arc::new(SonicMonitor::new())
 });
 
 pub async fn check_server(){
     match IngestSonic::new(){
         Ok(_) => {
-            MY_TASK.stop();
+            SONIC_MONITOR.stop();
             upload_from_queue().await.unwrap();
         }
         Err(_) => {}
@@ -92,7 +83,7 @@ pub fn ingest_new_message(message: StrMessage) -> anyhow::Result<()>{
 }
 
 pub fn write_to_queue(message: StrMessage) -> anyhow::Result<()>{
-    MY_TASK.start();
+    SONIC_MONITOR.start();
     
     let mut records: Vec<StrMessage> = Vec::new();
 

+ 13 - 3
src/sonic.rs

@@ -5,6 +5,7 @@ use sonic_channel_wasi::{Dest, IngestChannel, PushRequest, QueryRequest, SearchC
 #[cfg(not(target_os = "wasi"))]
 use sonic_channel::{Dest, IngestChannel, PushRequest, QueryRequest, SearchChannel, SonicChannel};
 use crate::config::Config;
+use crate::indexes::{Indexes, SerializableMessage};
 
 pub struct IngestSonic{
     ingest_channel: IngestChannel
@@ -27,7 +28,6 @@ impl IngestSonic{
     }
 
     pub fn ingest_document(&self, collection: &str, bucket: &str, object: &str, text: &str) -> anyhow::Result<()> {
-        // TODO may be we need to store ingested email names and make a queue  
         let contents = Self::slice_string_into_chunks(&*text);
         for c in contents {
             let dest = Dest::col_buc(collection, bucket).obj(object);
@@ -67,16 +67,26 @@ impl IngestSonic{
 pub struct SearchSonic{}
 
 impl SearchSonic{
-    pub fn search_document(collection: &str, bucket: &str, query: &str, limit: i32, offset: i32) -> anyhow::Result<Vec<String>> {
+    pub fn search_document(collection: &str, bucket: &str, query: &str, limit: i32, offset: i32) -> anyhow::Result<Vec<SerializableMessage>> {
         let search = SearchChannel::start(
             format!("{}:{}", Config::global().sonic_search_addr, Config::global().sonic_search_port), 
             Config::global().sonic_search_password.clone()
         )?;
-        let results: Vec<String> = search.query(QueryRequest::new(
+        let ids: Vec<String> = search.query(QueryRequest::new(
             Dest::col_buc(collection, bucket),
             query
         ).limit(limit as usize).offset(offset as usize)).unwrap_or_else(|_| { vec![] });
         search.quit()?;
+        
+        let mut results: Vec<SerializableMessage> = vec![];
+        let messages = Indexes::get_messages()?;
+        for id in ids {
+            match Indexes::find_by_id(&messages, id) {
+                None => {}
+                Some(message) => results.push(message)
+            }
+        }
+        
         Ok(results)
     }
 }