Browse Source

axum/socket -> warp

Yurii Sokolovskyi 1 month ago
parent
commit
05f1866239
4 changed files with 132 additions and 342 deletions
  1. 2 5
      Cargo.toml
  2. 15 198
      src/main.rs
  3. 115 10
      src/server.rs
  4. 0 129
      src/server_wasi.rs

+ 2 - 5
Cargo.toml

@@ -40,17 +40,14 @@ futures = "0.3.30"
 [target.'cfg(target_os = "wasi")'.dependencies]
 tokio_wasi = {version = "1.25.2", features = ["full", "rt", "rt-multi-thread", "macros", "time"] }
 tokio-rustls-wasi = "0.25.0-alpha"
-httpcodec = "0.2.3"
-bytecodec = "0.4.15"
-wasmedge_wasi_socket = "0.5.4"
 async-imap-wasi = {path = "./async-imap-wasi/", default-features = false, features = ["runtime-tokio"]}
+warp_wasi = "0.3.3"
 
 [target.'cfg(not(target_os = "wasi"))'.dependencies]
 maildir = "0.6.4"
 tokio = { version = "1.39.2", features = ["full"] }
 native-tls = "0.2.11"
 async-imap = {version =  "0.9.7" , default-features = false, features = ["runtime-tokio"]}
-axum = "0.7.5"
-tower-http = {version = "0.5.2", features = ["cors"] }
 tokio-rustls = "0.26.0"
+warp = "0.3.7"
 

+ 15 - 198
src/main.rs

@@ -1,55 +1,25 @@
 use mail_parser::{Message};
-use std::collections::{HashMap, HashSet};
 use std::error::Error;
-use std::path::{Path, PathBuf};
+use std::path::{PathBuf};
 use models::*;
 use std::ffi::{OsStr, OsString};
-use std::{fs, thread};
-use std::fs::{create_dir_all, File, OpenOptions};
+use std::{fs};
+use std::fs::{create_dir_all, File};
 use std::future::Future;
-use std::io::{BufReader, Write};
-use std::net::SocketAddr;
-use std::time::{Duration, Instant};
+use std::io::{Write};
+use std::time::{Instant};
 use anyhow::anyhow;
 use serde::{Deserialize, Serialize};
-use crate::indexes::{Indexes, SerializableMessage, SerializableThread};
+use crate::indexes::{Indexes};
 use config::{Config, INSTANCE};
-use crate::server::{get_email, get_folders, get_sorted_threads_by_date, get_thread_messages};
+use crate::server::{run_api};
 use mailparse::parse_mail;
 use regex::Regex;
 use crate::templates::util::parse_email;
-use crate::util::{compress_and_save_file, read_and_decompress_file};
+use crate::util::{compress_and_save_file};
 use crate::imap::{delete_folder, rename_folder, create_folder, check_for_updates};
 use crate::js::email_scripts;
-use tokio::runtime::Runtime;
-
-#[cfg(not(target_os = "wasi"))]
-use axum::extract::Query;
-#[cfg(not(target_os = "wasi"))]
-use axum::{Json, Router};
-#[cfg(not(target_os = "wasi"))]
-use axum::http::{Method, StatusCode};
-#[cfg(not(target_os = "wasi"))]
-use axum::response::{IntoResponse, Response};
-#[cfg(not(target_os = "wasi"))]
-use axum::routing::{get, post};
-#[cfg(not(target_os = "wasi"))]
-use tower_http::cors::{Any, CorsLayer};
-
-#[cfg(target_os = "wasi")]
-use bytecodec::DecodeExt;
-#[cfg(target_os = "wasi")]
-use httpcodec::{HttpVersion, ReasonPhrase, Request, RequestDecoder, Response, StatusCode, Header};
-#[cfg(target_os = "wasi")]
-use lettre::message::{Mailbox, Message as LettreMessage, MultiPart, SinglePart};
-use tokio::task;
-use tokio::time::sleep;
-#[cfg(target_os = "wasi")]
-use wasmedge_wasi_socket::{Shutdown, TcpListener, TcpStream};
-#[cfg(target_os = "wasi")]
-use crate::server_wasi::{handle_client};
-#[cfg(target_os = "wasi")]
-use crate::smtp_client::{send_email};
+use warp::Filter;
 
 mod smtp_client;
 
@@ -61,7 +31,6 @@ mod util;
 mod indexes;
 mod imap;
 mod server;
-mod server_wasi;
 mod js;
 
 pub fn append_ext(ext: impl AsRef<OsStr>, path: &PathBuf) -> PathBuf {
@@ -403,24 +372,16 @@ async fn main() -> anyhow::Result<()> {
     
     
     // looking for updates
-    let imap_update_handler = imap::check_for_updates("INBOX".to_string()).await;
-    
+    if let Err(e) = check_for_updates("INBOX".to_string()).await {
+        eprintln!("Failed to monitor mailbox: {:?}", e);
+    }    
     
     let duration = start.elapsed();
     println!("Duration {:?}", duration);
     
     
     // API
-    println!("Server is running on {}", Config::global().api_port);
-    let listener = TcpListener::bind(format!("{}:{}", Config::global().api_addr, Config::global().api_port), false)?;
-    loop {
-        let (stream, _) = listener.accept(false)?;
-        task::spawn(async move {
-            if let Err(e) = handle_client(stream).await {
-                eprintln!("Error handling client: {:?}", e);
-            }
-        });
-    }
+    run_api().await;
     
     Ok(())
 }
@@ -454,158 +415,14 @@ async fn main() {
     
     // delete_email("lqo7m8r2.1js7w080jvr2o@express.medallia.com".to_string()).await.expect("Unable to delete an email");
 
-
-    // looking for updates
-    // let imap_update_handle = tokio::spawn(async move {
-    //     if let Err(e) = check_for_updates("INBOX".to_string()).await {
-    //         eprintln!("Failed to monitor mailbox: {:?}", e);
-    //     }
-    // });
+    
     if let Err(e) = check_for_updates("INBOX".to_string()).await {
         eprintln!("Failed to monitor mailbox: {:?}", e);
     }
 
-    // imap::create_folder("testFolder".to_string()).await.unwrap();
-    // imap::rename_folder("testFolder".to_string(), "newTestFolder".to_string()).await.unwrap();
-    // imap::delete_folder("newTestFolder".to_string()).await.unwrap();
-
     let duration = start.elapsed();
     println!("Duration {:?}", duration);
     
     // API
-    // Define the CORS layer
-    let handle = thread::spawn(|| {
-        let cors = CorsLayer::new()
-            .allow_origin(Any)
-            .allow_methods(vec![Method::GET, Method::POST])
-            .allow_headers(Any);
-    
-        let app = Router::new()
-            .route("/folders", get(get_folders_handle))
-            .route("/sorted_threads_by_date", get(sorted_threads_by_date_handle))
-            .route("/get_thread_messages", get(get_thread_messages_handle))
-            .route("/email", get(get_email_handle))
-            .route("/create_folder", post(create_folder_handle))
-            .route("/rename_folder", post(rename_folder_handle))
-            .route("/delete_folder", post(delete_folder_handle))
-            .layer(cors);
-    
-        
-        
-        let rt = Runtime::new().unwrap();
-        // Use the runtime to block on the async task
-        rt.block_on(async {
-            let listener = tokio::net::TcpListener::bind(format!("{}:{}", Config::global().api_addr, Config::global().api_port)).await.unwrap();
-            println!("Server is running on {}", Config::global().api_port);
-            axum::serve(listener, app).await.unwrap();
-        });
-    });
-    
-    handle.join().unwrap();
-}
-
-#[derive(Deserialize)]
-#[derive(Serialize)]
-struct ErrorResponse {
-    error: String,
-}
-
-#[cfg(not(target_os = "wasi"))]
-#[derive(Deserialize)]
-struct GetSortedThreadsByDateQuery {
-    folder: String,
-}
-#[cfg(not(target_os = "wasi"))]
-async fn sorted_threads_by_date_handle(Query(params): Query<GetSortedThreadsByDateQuery>) -> Json<Vec<(String, Vec<u32>)>> {
-    let result: Vec<(String, Vec<u32>)> = serde_json::from_str(&*get_sorted_threads_by_date(params.folder).await).unwrap();
-    Json(result)
-}
-
-#[cfg(not(target_os = "wasi"))]
-async fn get_folders_handle() -> Json<Vec<String>> {
-    let result: Vec<String>  = serde_json::from_str(&*get_folders().await).unwrap();
-    Json(result)
-}
-
-#[cfg(not(target_os = "wasi"))]
-#[derive(Deserialize)]
-struct GetEmailQuery {
-    id: String,
-}
-#[cfg(not(target_os = "wasi"))]
-async fn get_email_handle(Query(params): Query<GetEmailQuery>) -> Response {
-    let result: String  = get_email(params.id).await;
-    result.into_response()
-}
-
-#[cfg(not(target_os = "wasi"))]
-#[derive(Deserialize)]
-struct GetThreadMessagesQuery {
-    id: u32,
-}
-#[cfg(not(target_os = "wasi"))]
-async fn get_thread_messages_handle(Query(params): Query<GetThreadMessagesQuery>) -> Json<Vec<SerializableMessage>> {
-    let result: Vec<SerializableMessage> = serde_json::from_str(&*get_thread_messages(params.id).await).unwrap();
-    Json(result)
-}
-
-#[cfg(not(target_os = "wasi"))]
-#[derive(Deserialize)]
-struct CreateFolderRequest {
-    name: String,
-}
-#[cfg(not(target_os = "wasi"))]
-async fn create_folder_handle(Json(data): Json<CreateFolderRequest>) -> impl IntoResponse {
-    match create_folder_lar(data.name.clone()).await {
-        Ok(_) => {
-            (StatusCode::OK, "OK").into_response()
-        }
-        Err(_) => {
-            let error_response = ErrorResponse {
-                error: format!("Cannot create folder {}", data.name),
-            };
-            (StatusCode::BAD_REQUEST, Json(error_response)).into_response()
-        }
-    }
-}
-
-#[cfg(not(target_os = "wasi"))]
-#[derive(Deserialize)]
-struct DeleteFolderRequest {
-    name: String,
-}
-#[cfg(not(target_os = "wasi"))]
-async fn delete_folder_handle(Json(data): Json<DeleteFolderRequest>) -> impl IntoResponse {
-    match delete_folder_lar(data.name.clone()).await {
-        Ok(_) => {
-            (StatusCode::OK, "OK").into_response()
-        }
-        Err(_) => {
-            let error_response = ErrorResponse {
-                error: format!("Cannot delete folder {}", data.name),
-            };
-            (StatusCode::BAD_REQUEST, Json(error_response)).into_response()
-        }
-    }
-}
-
-#[cfg(not(target_os = "wasi"))]
-#[derive(Deserialize)]
-struct RenameFolderRequest {
-    old_name: String,
-    new_name: String
-}
-#[cfg(not(target_os = "wasi"))]
-async fn rename_folder_handle(Json(data): Json<RenameFolderRequest>) -> impl IntoResponse {
-    match rename_folder_lar(data.old_name.clone(), data.new_name.clone()).await {
-        Ok(_) => {
-            (StatusCode::OK, "OK").into_response()
-        }
-        Err(_) => {
-            let error_response = ErrorResponse {
-                error: format!("Cannot rename folder from {} to {}", data.old_name, data.new_name),
-            };
-            (StatusCode::BAD_REQUEST, Json(error_response)).into_response()
-        }
-    }
+    run_api().await;
 }

+ 115 - 10
src/server.rs

@@ -3,9 +3,49 @@ use crate::indexes::{SerializableMessage, SerializableThread};
 use serde::{Deserialize, Serialize};
 use std::fs::File;
 use std::io::{BufReader};
+use warp::Filter;
+use warp::http::StatusCode;
 use crate::{create_folder_lar, delete_folder_lar, rename_folder_lar};
 use crate::util::read_and_decompress_file;
 
+pub async fn run_api() {
+    let cors = warp::cors()
+        .allow_any_origin()
+        .allow_methods(vec!["GET", "POST"])
+        .allow_headers(vec![warp::http::header::CONTENT_TYPE]);
+
+    let routes = warp::path("folders")
+        .and(warp::get())
+        .and_then(get_folders_handle)
+        .or(warp::path("sorted_threads_by_date")
+            .and(warp::get())
+            .and(warp::query::<GetSortedThreadsByDateQuery>())
+            .and_then(sorted_threads_by_date_handle))
+        .or(warp::path("get_thread_messages")
+            .and(warp::get())
+            .and(warp::query::<GetThreadMessagesQuery>())
+            .and_then(get_thread_messages_handle))
+        .or(warp::path("email")
+            .and(warp::get())
+            .and(warp::query::<GetEmailQuery>())
+            .and_then(get_email_handle))
+        .or(warp::path("create_folder")
+            .and(warp::post())
+            .and(warp::body::json())
+            .and_then(create_folder_handle))
+        .or(warp::path("rename_folder")
+            .and(warp::post())
+            .and(warp::body::json())
+            .and_then(rename_folder_handle))
+        .or(warp::path("delete_folder")
+            .and(warp::post())
+            .and(warp::body::json())
+            .and_then(delete_folder_handle))
+        .with(cors);
+
+    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; // TODO from config
+}
+
 pub async fn get_folders() -> String {
     let mut folders: Vec<String> = Vec::new();
     for entry in std::fs::read_dir(Config::global().out_dir.clone())
@@ -131,30 +171,95 @@ pub async fn get_sorted_threads_by_date(folder: String) -> String {
     get_threads(folder, "date.json".to_string()).await
 }
 
+// Handlers
+#[derive(Deserialize, Serialize)]
+struct ErrorResponse {
+    error: String,
+}
+
+#[derive(Deserialize)]
+struct GetSortedThreadsByDateQuery {
+    folder: String,
+}
+
+async fn sorted_threads_by_date_handle(query: GetSortedThreadsByDateQuery) -> Result<impl warp::Reply, warp::Rejection> {
+    let result: Vec<(String, Vec<u32>)> = serde_json::from_str(&*get_sorted_threads_by_date(query.folder).await).unwrap();
+    Ok(warp::reply::json(&result))
+}
+
+async fn get_folders_handle() -> Result<impl warp::Reply , warp::Rejection> {
+    let result: Vec<String> = serde_json::from_str(&*get_folders().await).unwrap();
+    Ok(warp::reply::json(&result))
+}
+
+#[derive(Deserialize)]
+struct GetEmailQuery {
+    id: String,
+}
+
+async fn get_email_handle(query: GetEmailQuery) -> Result<impl warp::Reply, warp::Rejection> {
+    let result: String = get_email(query.id).await;
+    Ok(warp::reply::html(result))
+}
+
+#[derive(Deserialize)]
+struct GetThreadMessagesQuery {
+    id: u32,
+}
+
+async fn get_thread_messages_handle(query: GetThreadMessagesQuery) -> Result<impl warp::Reply, warp::Rejection> {
+    let result: Vec<SerializableMessage> = serde_json::from_str(&*get_thread_messages(query.id).await).unwrap();
+    Ok(warp::reply::json(&result))
+}
+
 #[derive(Deserialize)]
 struct CreateFolderRequest {
     name: String,
 }
-pub async fn create_folder(body: String) -> anyhow::Result<()>{
-    let data: CreateFolderRequest = serde_json::from_str(&*body)?;
-    create_folder_lar(data.name.clone()).await
+
+async fn create_folder_handle(data: CreateFolderRequest) -> Result<impl warp::Reply, warp::Rejection> {
+    match create_folder_lar(data.name.clone()).await {
+        Ok(_) => Ok(warp::reply::with_status(warp::reply::json(&"OK"), StatusCode::OK)),
+        Err(_) => {
+            let error_response = ErrorResponse {
+                error: format!("Cannot create folder {}", data.name),
+            };
+            Ok(warp::reply::with_status(warp::reply::json(&error_response), StatusCode::BAD_REQUEST))
+        }
+    }
 }
 
 #[derive(Deserialize)]
 struct DeleteFolderRequest {
     name: String,
 }
-pub async fn delete_folder(body: String) -> anyhow::Result<()>{
-    let data: DeleteFolderRequest = serde_json::from_str(&*body)?;
-    delete_folder_lar(data.name.clone()).await
+
+async fn delete_folder_handle(data: DeleteFolderRequest) -> Result<impl warp::Reply, warp::Rejection> {
+    match delete_folder_lar(data.name.clone()).await {
+        Ok(_) => Ok(warp::reply::with_status(warp::reply::json(&"OK"), StatusCode::OK)),
+        Err(_) => {
+            let error_response = ErrorResponse {
+                error: format!("Cannot delete folder {}", data.name),
+            };
+            Ok(warp::reply::with_status(warp::reply::json(&error_response), StatusCode::BAD_REQUEST))
+        }
+    }
 }
 
 #[derive(Deserialize)]
 struct RenameFolderRequest {
     old_name: String,
-    new_name: String
+    new_name: String,
 }
-pub async fn rename_folder(body: String) -> anyhow::Result<()>{
-    let data: RenameFolderRequest = serde_json::from_str(&*body)?;
-    rename_folder_lar(data.old_name.clone(), data.new_name.clone()).await
+
+async fn rename_folder_handle(data: RenameFolderRequest) -> Result<impl warp::Reply, warp::Rejection> {
+    match rename_folder_lar(data.old_name.clone(), data.new_name.clone()).await {
+        Ok(_) => Ok(warp::reply::with_status(warp::reply::json(&"OK"), StatusCode::OK)),
+        Err(_) => {
+            let error_response = ErrorResponse {
+                error: format!("Cannot rename folder from {} to {}", data.old_name, data.new_name),
+            };
+            Ok(warp::reply::with_status(warp::reply::json(&error_response), StatusCode::BAD_REQUEST))
+        }
+    }
 }

+ 0 - 129
src/server_wasi.rs

@@ -1,129 +0,0 @@
-#![cfg(target_os = "wasi")]
-
-use std::collections::HashMap;
-use std::io::{Read, Write};
-use anyhow::anyhow;
-use serde::de::Unexpected::Str;
-use bytecodec::DecodeExt;
-use httpcodec::{HttpVersion, ReasonPhrase, Request, RequestDecoder, Response, StatusCode, Header};
-use wasmedge_wasi_socket::{Shutdown, TcpListener, TcpStream};
-use crate::server::{get_email, get_folders, get_sorted_threads_by_date, get_thread_messages, create_folder, delete_folder, rename_folder};
-
-async fn handle_http(req: Request<String>, request_line: &str) -> anyhow::Result<String> {
-    let parts: Vec<&str> = request_line.split_whitespace().collect();
-    if parts.len() < 3 {
-        return Err(anyhow!("Invalid Input"))
-    }
-    let (path, query) = parse_url(parts[1]);
-
-    // Basic CORS headers
-    let cors_headers = "HTTP/1.1 200 OK\r\n\
-    Content-Type: application/json\r\n\
-    Access-Control-Allow-Origin: *\r\n\
-    Access-Control-Allow-Methods: GET, POST, PUT, DELETE, OPTIONS\r\n\
-    Access-Control-Allow-Headers: Content-Type, Authorization\r\n";
-
-    if req.method().to_string().eq("OPTIONS") {
-        return Ok(format!("{}Content-Length: {}\r\n", cors_headers, 0))
-    }
-
-    
-    let content = match path.as_str() {
-        "/folders" => format!("{}", get_folders().await),
-        "/sorted_threads_by_date" => {
-            if let Some(value) = query.get("folder"){
-                format!("{}", get_sorted_threads_by_date(value.to_owned()).await)
-            }else{ "404 Not Found".to_string() }
-        },
-        "/get_thread_messages" => {
-            if let Some(value) = query.get("id"){
-                match value.parse::<u32>() {
-                    Ok(value) => format!("{}", get_thread_messages(value).await),
-                    Err(_e) => "404 Not Found".to_string(),
-                }
-            }else{"404 Not Found".to_string()}
-        }
-        "/email" => {
-            if let Some(value) = query.get("id"){
-                format!("{}", get_email(value.to_owned()).await)
-            }else{ "404 Not Found".to_string() }
-        },
-        "/create_folder" => {
-            match create_folder(req.body().clone()).await {
-                Ok(_) => "OK".to_string(), // TODO return 200 OK
-                Err(_) => "400 Bad Request".to_string(),
-            }
-        },
-        "/delete_folder" => {
-            match delete_folder(req.body().clone()).await {
-                Ok(_) => "OK".to_string(), // TODO return 200 OK
-                Err(_) => "400 Bad Request".to_string(),
-            }
-        },
-        "/rename_folder" => {
-            match rename_folder(req.body().clone()).await {
-                Ok(_) => "OK".to_string(), // TODO return 200 OK
-                Err(_) => "400 Bad Request".to_string(),
-            }
-        },
-        _ => "404 Not Found".to_string(),
-    };
-
-    Ok(format!("{}Content-Length: {}\r\n\r\n{}", cors_headers, content.len(), content))
-}
-
-fn parse_url(url: &str) -> (String, HashMap<String, String>) {
-    // Split the string into path and query parts
-    let parts: Vec<&str> = url.splitn(2, '?').collect();
-
-    let path = parts[0].to_string();
-    let mut query_params = HashMap::new();
-
-    if parts.len() > 1 {
-        // Split the query part into key-value pairs
-        let query_part = parts[1];
-        for param in query_part.split('&') {
-            let kv: Vec<&str> = param.splitn(2, '=').collect();
-            if kv.len() == 2 {
-                query_params.insert(kv[0].to_string(), kv[1].to_string());
-            } else {
-                query_params.insert(kv[0].to_string(), String::new());
-            }
-        }
-    }
-
-    (path, query_params)
-}
-
-pub async fn handle_client(mut stream: TcpStream) -> std::io::Result<()> {
-    let mut buffer = [0u8; 1024];
-    let mut data = Vec::new();
-
-    while let Ok(n) = stream.read(&mut buffer) {
-        if n == 0 {
-            break; // Connection was closed
-        }
-        data.extend_from_slice(&buffer[..n]);
-        if n < buffer.len() { break; } // Less data means end of request
-    }
-
-    // Assuming the first line is the request line
-    let request_line = std::str::from_utf8(&data)
-        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?
-        .lines()
-        .next()
-        .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "No request line"))?;
-
-    let mut decoder = RequestDecoder::<httpcodec::BodyDecoder<bytecodec::bytes::Utf8Decoder>>::default();
-    let req = decoder.decode_from_bytes(&data)
-        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
-
-    let response = handle_http(req, request_line)
-        .await
-        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
-
-    let write_buf = response.to_string();
-    stream.write_all(write_buf.as_bytes())?;
-    stream.shutdown(Shutdown::Both)?;
-    Ok(())
-}