Jelajahi Sumber

fixed bug in search buckets, added search_attachments endpoint, search -> search_emails, ingesting attachments into sonic search

Yurii Sokolovskyi 1 hari lalu
induk
melakukan
895dc32fbc
6 mengubah file dengan 89 tambahan dan 14 penghapusan
  1. 2 2
      Cargo.toml
  2. 1 1
      src/imap.rs
  3. 1 2
      src/main.rs
  4. 50 5
      src/search.rs
  5. 15 4
      src/server.rs
  6. 20 0
      src/sonic.rs

+ 2 - 2
Cargo.toml

@@ -1,7 +1,7 @@
 [package]
 name = "crabmail"
 version = "0.1.0"
-authors = ["alex wennerberg <alex@alexwennerberg.com>"]
+authors = ["yurii sokolovskyi <sokolovskiiyura@gmail.com>"]
 edition = "2021"
 
 [features]
@@ -15,7 +15,7 @@ anyhow = "1.0.86"
 linkify = "0.10.0"
 html5ever = "0.24.0 "
 select = "0.5.0"
-mail-builder = {git = "https://github.com/alexwennerberg/mail-builder"}
+mail-builder = {git = "https://github.com/alexwennerberg/mail-builder"} # TODO check if we need it
 mail-parser = {version = "0.8.2", features = ["serde_support"]}
 once_cell = "1.9.0"
 urlencoding = "2.1.0"

+ 1 - 1
src/imap.rs

@@ -378,7 +378,7 @@ pub fn check_for_updates(mailbox: String) -> JoinHandle<Result<(), anyhow::Error
             idle.init().await.unwrap();
             let (idle_wait, interrupt) = idle.wait();
             let idle_handle = task::spawn(async move {
-                sleep(Duration::from_mins(25)).await;
+                sleep(Duration::from_secs(25 * 60)).await;
                 drop(interrupt);
             });
 

+ 1 - 2
src/main.rs

@@ -1,5 +1,3 @@
-#![feature(duration_constructors)]
-
 use mail_parser::{Message};
 use std::error::Error;
 use std::path::{PathBuf};
@@ -455,6 +453,7 @@ async fn main() -> anyhow::Result<()> {
 #[cfg(not(target_os = "wasi"))]
 #[tokio::main]
 async fn main() -> anyhow::Result<()>{
+    // TODO folder doesnt exists when run for the first time without folders created
     let start = Instant::now();
     
     run().await;

+ 50 - 5
src/search.rs

@@ -1,3 +1,4 @@
+use std::fs;
 use std::fs::{File, OpenOptions};
 use serde::{Deserialize, Serialize};
 use crate::config::Config;
@@ -66,19 +67,63 @@ pub async fn check_server(){
 
 pub fn ingest_new_message(message: StrMessage) -> anyhow::Result<()>{
     // ingest text from the message to the sonic server
-    // TODO add attachments
     match IngestSonic::new(){
         Ok(mut ingest_channel) => {
             let data = message.to_ingest();
-            match ingest_channel.ingest_document("emails", &*message.list.clone(), &*message.id.clone(), &*data) {
+            // TODO move "emails" to config
+            match ingest_channel.ingest_document("emails", &*message.list.clone().to_lowercase(), &*message.id.clone(), &*data) {
                 Ok(_) => {}
-                Err(_) => write_to_queue(message)?
+                Err(_) => write_to_queue(message.clone())?
+            };
+            let _ = ingest_channel.stop_ingest();
+
+            // checking for saved attachments
+            let attachments_path = Config::global().out_dir.clone()
+                .join(message.list.clone())
+                .join(".attachments")
+                .join(message.id.clone());
+
+            match fs::read_dir(attachments_path) {
+                Ok(entries) => {
+                    for entry in entries {
+                        if let Ok(entry) = entry {
+                            ingest_new_attachment(message.list.clone(), message.id.clone(), entry.file_name().to_str().unwrap().to_string())?;
+                        }
+                    }
+                }
+                Err(_) => {},
+            }
+        }
+        Err(_) => write_to_queue(message.clone())?
+    };
+    
+    Ok(())
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct AttachmentSearchInfo{
+    pub name: String,
+    pub email_id: String,
+    pub mailbox: String
+}
+pub fn ingest_new_attachment(mailbox: String, email_id: String, name: String) -> anyhow::Result<()> {
+    println!("{:?}", name.clone());
+    match IngestSonic::new(){
+        Ok(mut ingest_channel) => {
+            let obj = AttachmentSearchInfo{
+                name: name.clone(),
+                email_id,
+                mailbox: mailbox.clone()
+            };
+            match ingest_channel.ingest_document("attachments", &*mailbox.clone().to_lowercase(), &*serde_json::to_string(&obj)?, &*name.clone()) {
+                Ok(_) => {}
+                Err(_) => {}
             };
             let _ = ingest_channel.stop_ingest();
         }
-        Err(_) => write_to_queue(message)?
+        Err(_) => {}
     };
-    
+
     Ok(())
 }
 

+ 15 - 4
src/server.rs

@@ -63,10 +63,14 @@ pub async fn run_api() {
             .and(warp::get())
             .and(warp::query::<GetAttachmentQuery>())
             .and_then(get_attachment_handle))
-        .or(warp::path("search")
+        .or(warp::path("search_emails")
             .and(warp::get())
             .and(warp::query::<SearchQuery>())
-            .and_then(search_handle))
+            .and_then(search_emails_handle))
+        .or(warp::path("search_attachments")
+            .and(warp::get())
+            .and(warp::query::<SearchQuery>())
+            .and_then(search_attachments_handle))
         .or(warp::path("get_threads_by_message")
             .and(warp::get())
             .and(warp::query::<GetThreadsByMessageQuery>())
@@ -554,8 +558,15 @@ struct SearchQuery {
     query: String
 }
 
-async fn search_handle(query: SearchQuery) -> Result<impl warp::Reply, warp::Rejection> {
-    match SearchSonic::search_document("emails", &*query.list, &*query.query, query.limit, query.offset){
+async fn search_emails_handle(query: SearchQuery) -> Result<impl warp::Reply, warp::Rejection> {
+    match SearchSonic::search_document("emails", &*query.list.to_lowercase(), &*query.query, query.limit, query.offset){
+        Ok(result) => Ok(warp::reply::json(&result)),
+        Err(_) => Ok(warp::reply::json(&Vec::<String>::new()))
+    }
+}
+
+async fn search_attachments_handle(query: SearchQuery) -> Result<impl warp::Reply, warp::Rejection> {
+    match SearchSonic::search_attachment("attachments", &*query.list.to_lowercase(), &*query.query, query.limit, query.offset){
         Ok(result) => Ok(warp::reply::json(&result)),
         Err(_) => Ok(warp::reply::json(&Vec::<String>::new()))
     }

+ 20 - 0
src/sonic.rs

@@ -6,6 +6,7 @@ use sonic_channel_wasi::{Dest, IngestChannel, PushRequest, QueryRequest, SearchC
 use sonic_channel::{Dest, IngestChannel, PushRequest, QueryRequest, SearchChannel, SonicChannel};
 use crate::config::Config;
 use crate::indexes::{Indexes, SerializableMessage};
+use crate::search::AttachmentSearchInfo;
 
 pub struct IngestSonic{
     ingest_channel: IngestChannel
@@ -89,4 +90,23 @@ impl SearchSonic{
         
         Ok(results)
     }
+
+    pub fn search_attachment(collection: &str, bucket: &str, query: &str, limit: i32, offset: i32) -> anyhow::Result<Vec<AttachmentSearchInfo>> {
+        let search = SearchChannel::start(
+            format!("{}:{}", Config::global().sonic_search_addr, Config::global().sonic_search_port),
+            Config::global().sonic_search_password.clone()
+        )?;
+        let data: Vec<String> = search.query(QueryRequest::new(
+            Dest::col_buc(collection, bucket),
+            query
+        ).limit(limit as usize).offset(offset as usize)).unwrap_or_else(|_| { vec![] });
+        search.quit()?;
+
+        let mut results: Vec<AttachmentSearchInfo> = vec![];
+        for i in data{
+            results.push(serde_json::from_str::<AttachmentSearchInfo>(i.as_str())?);
+        }
+
+        Ok(results)
+    }
 }