123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- use std::fs::{File, OpenOptions};
- use serde::{Deserialize, Serialize};
- use crate::config::Config;
- use crate::models::StrMessage;
- use crate::sonic::IngestSonic;
- use std::io::{Read, Write};
- use std::sync::{Arc, Mutex, RwLock};
- use std::sync::atomic::{AtomicBool, Ordering};
- use std::time::Duration;
- use tokio::{task, time};
- pub struct SonicMonitor {
- is_running: AtomicBool,
- }
- impl SonicMonitor {
- pub fn new() -> Self {
- SonicMonitor {
- is_running: AtomicBool::new(false),
- }
- }
- pub fn run(&self) {
- // Check if the task is already running
- if !self.is_running.load(Ordering::SeqCst) {
- // Spawn the background task using the Arc struct
- let task_handle = SONIC_MONITOR.clone();
- tokio::spawn(async move {
- task_handle.run_background().await;
- });
- }
- }
- pub fn start(&self) {
- self.is_running.store(true, Ordering::SeqCst);
- }
-
- pub fn stop(&self) {
- self.is_running.store(false, Ordering::SeqCst);
- }
- async fn run_background(self: Arc<Self>) {
- loop {
- if self.is_running.load(Ordering::SeqCst) {
- check_server().await;
- }
- tokio::time::sleep(Duration::from_secs(10)).await;
- }
- }
- }
- // Static variable to hold the task
- pub static SONIC_MONITOR: once_cell::sync::Lazy<Arc<SonicMonitor>> = once_cell::sync::Lazy::new(|| {
- Arc::new(SonicMonitor::new())
- });
- pub async fn check_server(){
- match IngestSonic::new(){
- Ok(_) => {
- SONIC_MONITOR.stop();
- upload_from_queue().await.unwrap();
- }
- Err(_) => {}
- };
- }
- pub fn ingest_new_message(message: StrMessage) -> anyhow::Result<()>{
- // ingest text from the message to the sonic server
- // TODO add attachments
- match IngestSonic::new(){
- Ok(mut ingest_channel) => {
- let data = message.to_ingest();
- match ingest_channel.ingest_document("emails", &*message.list.clone(), &*message.id.clone(), &*data) {
- Ok(_) => {}
- Err(_) => write_to_queue(message)?
- };
- let _ = ingest_channel.stop_ingest();
- }
- Err(_) => write_to_queue(message)?
- };
-
- Ok(())
- }
- pub fn write_to_queue(message: StrMessage) -> anyhow::Result<()>{
- SONIC_MONITOR.start();
-
- let mut records: Vec<StrMessage> = Vec::new();
- let file_path = Config::global().out_dir.clone().join(".sonic_search_queue.json");
-
- // Open or create the file if it doesn't exist
- let mut file = OpenOptions::new()
- .read(true)
- .write(true)
- .create(true)
- .open(file_path.clone())?;
- // Read the existing file contents if it's not empty
- let mut file_contents = String::new();
- file.read_to_string(&mut file_contents)?;
- // Deserialize existing data if the file is not empty
- if !file_contents.trim().is_empty() {
- records = serde_json::from_str(&file_contents)?;
- }
- // Add the new record to the list
- records.push(message.clone());
- // Serialize the updated records back to JSON
- let updated_json = serde_json::to_string_pretty(&records)?;
- // Write the updated JSON back to the file
- let mut file = File::create(file_path.clone())?;
- file.write_all(updated_json.as_bytes())?;
-
- Ok(())
- }
- pub async fn upload_from_queue() -> anyhow::Result<()>{
- let mut records: Vec<StrMessage> = Vec::new();
- let file_path = Config::global().out_dir.clone().join(".sonic_search_queue.json");
- // Open or create the file if it doesn't exist
- let mut file = OpenOptions::new()
- .read(true)
- .write(true)
- .create(true)
- .open(file_path.clone())?;
- // Read the existing file contents if it's not empty
- let mut file_contents = String::new();
- file.read_to_string(&mut file_contents)?;
- // Deserialize existing data if the file is not empty
- if !file_contents.trim().is_empty() {
- records = serde_json::from_str(&file_contents)?;
- }
- for record in records {
- match ingest_new_message(record){
- Ok(_) => {}
- Err(_) => {}
- }
- }
- let mut file = File::create(file_path.clone())?;
- file.write_all("".as_bytes())?;
-
- Ok(())
- }
|