search.rs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. use std::fs::{File, OpenOptions};
  2. use serde::{Deserialize, Serialize};
  3. use crate::config::Config;
  4. use crate::models::StrMessage;
  5. use crate::sonic::IngestSonic;
  6. use std::io::{Read, Write};
  7. use std::sync::{Arc, Mutex, RwLock};
  8. use std::sync::atomic::{AtomicBool, Ordering};
  9. use std::time::Duration;
  10. use tokio::{task, time};
  11. pub struct SonicMonitor {
  12. is_running: AtomicBool,
  13. }
  14. impl SonicMonitor {
  15. pub fn new() -> Self {
  16. SonicMonitor {
  17. is_running: AtomicBool::new(false),
  18. }
  19. }
  20. pub fn run(&self) {
  21. // Check if the task is already running
  22. if !self.is_running.load(Ordering::SeqCst) {
  23. // Spawn the background task using the Arc struct
  24. let task_handle = SONIC_MONITOR.clone();
  25. tokio::spawn(async move {
  26. task_handle.run_background().await;
  27. });
  28. }
  29. }
  30. pub fn start(&self) {
  31. self.is_running.store(true, Ordering::SeqCst);
  32. }
  33. pub fn stop(&self) {
  34. self.is_running.store(false, Ordering::SeqCst);
  35. }
  36. async fn run_background(self: Arc<Self>) {
  37. loop {
  38. if self.is_running.load(Ordering::SeqCst) {
  39. check_server().await;
  40. }
  41. tokio::time::sleep(Duration::from_secs(10)).await;
  42. }
  43. }
  44. }
  45. // Static variable to hold the task
  46. pub static SONIC_MONITOR: once_cell::sync::Lazy<Arc<SonicMonitor>> = once_cell::sync::Lazy::new(|| {
  47. Arc::new(SonicMonitor::new())
  48. });
  49. pub async fn check_server(){
  50. match IngestSonic::new(){
  51. Ok(_) => {
  52. SONIC_MONITOR.stop();
  53. upload_from_queue().await.unwrap();
  54. }
  55. Err(_) => {}
  56. };
  57. }
  58. pub fn ingest_new_message(message: StrMessage) -> anyhow::Result<()>{
  59. // ingest text from the message to the sonic server
  60. // TODO add attachments
  61. match IngestSonic::new(){
  62. Ok(mut ingest_channel) => {
  63. let data = message.to_ingest();
  64. match ingest_channel.ingest_document("emails", &*message.list.clone(), &*message.id.clone(), &*data) {
  65. Ok(_) => {}
  66. Err(_) => write_to_queue(message)?
  67. };
  68. let _ = ingest_channel.stop_ingest();
  69. }
  70. Err(_) => write_to_queue(message)?
  71. };
  72. Ok(())
  73. }
  74. pub fn write_to_queue(message: StrMessage) -> anyhow::Result<()>{
  75. SONIC_MONITOR.start();
  76. let mut records: Vec<StrMessage> = Vec::new();
  77. let file_path = Config::global().out_dir.clone().join(".sonic_search_queue.json");
  78. // Open or create the file if it doesn't exist
  79. let mut file = OpenOptions::new()
  80. .read(true)
  81. .write(true)
  82. .create(true)
  83. .open(file_path.clone())?;
  84. // Read the existing file contents if it's not empty
  85. let mut file_contents = String::new();
  86. file.read_to_string(&mut file_contents)?;
  87. // Deserialize existing data if the file is not empty
  88. if !file_contents.trim().is_empty() {
  89. records = serde_json::from_str(&file_contents)?;
  90. }
  91. // Add the new record to the list
  92. records.push(message.clone());
  93. // Serialize the updated records back to JSON
  94. let updated_json = serde_json::to_string_pretty(&records)?;
  95. // Write the updated JSON back to the file
  96. let mut file = File::create(file_path.clone())?;
  97. file.write_all(updated_json.as_bytes())?;
  98. Ok(())
  99. }
  100. pub async fn upload_from_queue() -> anyhow::Result<()>{
  101. let mut records: Vec<StrMessage> = Vec::new();
  102. let file_path = Config::global().out_dir.clone().join(".sonic_search_queue.json");
  103. // Open or create the file if it doesn't exist
  104. let mut file = OpenOptions::new()
  105. .read(true)
  106. .write(true)
  107. .create(true)
  108. .open(file_path.clone())?;
  109. // Read the existing file contents if it's not empty
  110. let mut file_contents = String::new();
  111. file.read_to_string(&mut file_contents)?;
  112. // Deserialize existing data if the file is not empty
  113. if !file_contents.trim().is_empty() {
  114. records = serde_json::from_str(&file_contents)?;
  115. }
  116. for record in records {
  117. match ingest_new_message(record){
  118. Ok(_) => {}
  119. Err(_) => {}
  120. }
  121. }
  122. let mut file = File::create(file_path.clone())?;
  123. file.write_all("".as_bytes())?;
  124. Ok(())
  125. }