use std::fs::{File, OpenOptions}; use serde::{Deserialize, Serialize}; use crate::config::Config; use crate::models::StrMessage; use crate::sonic::IngestSonic; use std::io::{Read, Write}; use std::sync::{Arc, Mutex, RwLock}; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use tokio::{task, time}; pub struct SonicMonitor { is_running: AtomicBool, } impl SonicMonitor { pub fn new() -> Self { SonicMonitor { is_running: AtomicBool::new(false), } } pub fn run(&self) { // Check if the task is already running if !self.is_running.load(Ordering::SeqCst) { // Spawn the background task using the Arc struct let task_handle = SONIC_MONITOR.clone(); tokio::spawn(async move { task_handle.run_background().await; }); } } pub fn start(&self) { self.is_running.store(true, Ordering::SeqCst); } pub fn stop(&self) { self.is_running.store(false, Ordering::SeqCst); } async fn run_background(self: Arc) { loop { if self.is_running.load(Ordering::SeqCst) { check_server().await; } tokio::time::sleep(Duration::from_secs(10)).await; } } } // Static variable to hold the task pub static SONIC_MONITOR: once_cell::sync::Lazy> = once_cell::sync::Lazy::new(|| { Arc::new(SonicMonitor::new()) }); pub async fn check_server(){ match IngestSonic::new(){ Ok(_) => { SONIC_MONITOR.stop(); upload_from_queue().await.unwrap(); } Err(_) => {} }; } pub fn ingest_new_message(message: StrMessage) -> anyhow::Result<()>{ // ingest text from the message to the sonic server // TODO add attachments match IngestSonic::new(){ Ok(mut ingest_channel) => { let data = message.to_ingest(); match ingest_channel.ingest_document("emails", &*message.list.clone(), &*message.id.clone(), &*data) { Ok(_) => {} Err(_) => write_to_queue(message)? }; let _ = ingest_channel.stop_ingest(); } Err(_) => write_to_queue(message)? }; Ok(()) } pub fn write_to_queue(message: StrMessage) -> anyhow::Result<()>{ SONIC_MONITOR.start(); let mut records: Vec = Vec::new(); let file_path = Config::global().out_dir.clone().join(".sonic_search_queue.json"); // Open or create the file if it doesn't exist let mut file = OpenOptions::new() .read(true) .write(true) .create(true) .open(file_path.clone())?; // Read the existing file contents if it's not empty let mut file_contents = String::new(); file.read_to_string(&mut file_contents)?; // Deserialize existing data if the file is not empty if !file_contents.trim().is_empty() { records = serde_json::from_str(&file_contents)?; } // Add the new record to the list records.push(message.clone()); // Serialize the updated records back to JSON let updated_json = serde_json::to_string_pretty(&records)?; // Write the updated JSON back to the file let mut file = File::create(file_path.clone())?; file.write_all(updated_json.as_bytes())?; Ok(()) } pub async fn upload_from_queue() -> anyhow::Result<()>{ let mut records: Vec = Vec::new(); let file_path = Config::global().out_dir.clone().join(".sonic_search_queue.json"); // Open or create the file if it doesn't exist let mut file = OpenOptions::new() .read(true) .write(true) .create(true) .open(file_path.clone())?; // Read the existing file contents if it's not empty let mut file_contents = String::new(); file.read_to_string(&mut file_contents)?; // Deserialize existing data if the file is not empty if !file_contents.trim().is_empty() { records = serde_json::from_str(&file_contents)?; } for record in records { match ingest_new_message(record){ Ok(_) => {} Err(_) => {} } } let mut file = File::create(file_path.clone())?; file.write_all("".as_bytes())?; Ok(()) }