Browse Source

added API subscription and small fix in smtp_client.rs

Yurii Sokolovskyi 1 month ago
parent
commit
c3716a12cd
2 changed files with 118 additions and 4 deletions
  1. 116 2
      src/server.rs
  2. 2 2
      src/smtp_client.rs

+ 116 - 2
src/server.rs

@@ -1,3 +1,4 @@
+use std::collections::HashSet;
 use std::fs;
 use std::fs;
 use crate::config::Config;
 use crate::config::Config;
 use crate::indexes::{Indexes, SerializableMessage, SerializableThread};
 use crate::indexes::{Indexes, SerializableMessage, SerializableThread};
@@ -8,7 +9,9 @@ use std::io::{BufReader, Read};
 use std::net::{IpAddr, SocketAddr};
 use std::net::{IpAddr, SocketAddr};
 use std::path::PathBuf;
 use std::path::PathBuf;
 use std::str::FromStr;
 use std::str::FromStr;
+use std::sync::{Arc, Mutex};
 use chrono::{DateTime, Utc};
 use chrono::{DateTime, Utc};
+use futures::{SinkExt, StreamExt};
 use warp::Filter;
 use warp::Filter;
 use warp::fs::file;
 use warp::fs::file;
 use warp::http::{response, StatusCode};
 use warp::http::{response, StatusCode};
@@ -17,6 +20,15 @@ use crate::imap::{connect_to_imap, get_session};
 use crate::models::MailAddress;
 use crate::models::MailAddress;
 use crate::sonic::SearchSonic;
 use crate::sonic::SearchSonic;
 use crate::util::read_and_decompress_file;
 use crate::util::read_and_decompress_file;
+use tokio::sync::mpsc::UnboundedSender;
+use warp::ws::{Message, WebSocket};
+use once_cell::sync::Lazy;
+
+type Client = Arc<Mutex<Option<UnboundedSender<Message>>>>;
+
+static CLIENT: Lazy<Client> = Lazy::new(|| {
+    Arc::new(Mutex::new(None))
+});
 
 
 /// folders -> GET, returns list of all mailboxes
 /// folders -> GET, returns list of all mailboxes
 /// sorted_threads_by_date -> GET, returns hashmap with date: list of thread ids sorted by date
 /// sorted_threads_by_date -> GET, returns hashmap with date: list of thread ids sorted by date
@@ -83,13 +95,81 @@ pub async fn run_api() {
             .and(warp::post())
             .and(warp::post())
             .and(warp::body::json())
             .and(warp::body::json())
             .and_then(log_in_handle))
             .and_then(log_in_handle))
-        .with(cors);
+        .or(warp::path("connect")
+            .and(warp::ws())
+            .map(|ws: warp::ws::Ws| {
+                ws.on_upgrade(|socket| ws_handler(socket))
+            }))
+        .or(warp::path("disconnect")
+            .and(warp::post())
+            .and_then(disconnect_handle))
+    .with(cors);
 
 
     let ip: IpAddr = IpAddr::from_str(&*Config::global().api_addr.clone()).expect("Invalid API IP address");
     let ip: IpAddr = IpAddr::from_str(&*Config::global().api_addr.clone()).expect("Invalid API IP address");
     let addr = SocketAddr::new(ip, Config::global().api_port.clone() as u16);
     let addr = SocketAddr::new(ip, Config::global().api_port.clone() as u16);
     warp::serve(routes).run(addr).await;
     warp::serve(routes).run(addr).await;
 }
 }
 
 
+async fn ws_handler(ws: WebSocket) {
+    let (mut ws_tx, mut ws_rx) = ws.split();
+
+    let (client_tx, mut client_rx) = tokio::sync::mpsc::unbounded_channel();
+
+    // Overwrite the existing client with the new one
+    {
+        let mut client_guard = CLIENT.lock().unwrap();
+        *client_guard = Some(client_tx);
+    }
+
+    // Task to receive messages from the client (if needed)
+    let rx_task = tokio::spawn(async move {
+        while let Some(result) = ws_rx.next().await {
+            match result {
+                Ok(msg) => {
+                    // Handle client messages here if necessary
+                }
+                Err(e) => {
+                    eprintln!("WebSocket error: {}", e);
+                    break;
+                }
+            }
+        }
+    });
+
+    // Task to send messages to the client
+    let tx_task = tokio::spawn(async move {
+        while let Some(result) = client_rx.recv().await {
+            if ws_tx.send(result).await.is_err() {
+                // Client disconnected
+                break;
+            }
+        }
+    });
+
+    // Wait for either task to complete
+    tokio::select! {
+        _ = rx_task => {},
+        _ = tx_task => {},
+    }
+
+    // Remove the client from storage when done
+    {
+        let mut client_guard = CLIENT.lock().unwrap();
+        *client_guard = None;
+    }
+}
+
+
+async fn broadcast_message(message: String) {
+    let msg = Message::text(message);
+
+    let client_guard = CLIENT.lock().unwrap();
+    if let Some(tx) = &*client_guard {
+        let _ = tx.send(msg);
+    }
+}
+
+
 // functions
 // functions
 pub async fn get_attachment(folder: String, id: String, name: String) -> GetAttachmentResponse{    
 pub async fn get_attachment(folder: String, id: String, name: String) -> GetAttachmentResponse{    
     let attachment_path = Config::global().out_dir.clone()
     let attachment_path = Config::global().out_dir.clone()
@@ -150,6 +230,7 @@ pub async fn get_attachments_info(folder: String, id: String) -> String{
 }
 }
 
 
 pub async fn get_folders() -> String {
 pub async fn get_folders() -> String {
+    broadcast_message("folders".to_string()).await;
     serde_json::to_string(&Indexes::get_local_mailboxes()).unwrap()
     serde_json::to_string(&Indexes::get_local_mailboxes()).unwrap()
 }
 }
 
 
@@ -506,4 +587,37 @@ async fn log_in_handle(query: LogInRequest) -> Result<impl warp::Reply, warp::Re
     }else{
     }else{
         Ok(warp::reply::with_status("Unable to log in", StatusCode::BAD_REQUEST))
         Ok(warp::reply::with_status("Unable to log in", StatusCode::BAD_REQUEST))
     }
     }
-}
+}
+
+#[derive(Debug)]
+struct NoClientConnected;
+
+impl warp::reject::Reject for NoClientConnected {}
+
+impl warp::reply::Reply for NoClientConnected {
+    fn into_response(self) -> warp::reply::Response {
+        warp::reply::with_status(
+            "No client connected".to_string(),
+            warp::http::StatusCode::NOT_FOUND,
+        )
+            .into_response()
+    }
+}
+
+async fn disconnect_handle() -> Result<impl warp::Reply, warp::Rejection> {
+    // Lock the client storage
+    let mut client_guard = CLIENT.lock().unwrap();
+
+    // Check if a client is connected
+    if let Some(client) = client_guard.take() {
+        // Send a close message to the client
+        client.send(Message::close()).ok();
+
+        Ok(warp::reply::with_status(
+            "Client disconnected",
+            warp::http::StatusCode::OK,
+        ))
+    } else {
+        Err(warp::reject::custom(NoClientConnected))
+    }
+}

+ 2 - 2
src/smtp_client.rs

@@ -128,8 +128,8 @@ async fn login() -> Result<tokio_rustls::client::TlsStream<TcpStream>, Box<dyn s
 
 
     // Authenticate using LOGIN method
     // Authenticate using LOGIN method
     send_command(&mut stream, "AUTH LOGIN\r\n").await;
     send_command(&mut stream, "AUTH LOGIN\r\n").await;
-    send_command(&mut stream, &(encode(Config::global().username.clone()) + "\r\n")).await;
-    send_command(&mut stream, &(encode(Config::global().password.clone()) + "\r\n")).await;
+    send_command(&mut stream, &(encode(Config::global().username.lock().unwrap().clone()) + "\r\n")).await;
+    send_command(&mut stream, &(encode(Config::global().password.lock().unwrap().clone()) + "\r\n")).await;
     
     
     Ok(stream)
     Ok(stream)
 }
 }