Browse Source

comments added

Yurii Sokolovskyi 3 months ago
parent
commit
cf6dd8a8cf
2 changed files with 115 additions and 111 deletions
  1. 41 32
      src/main.rs
  2. 74 79
      src/sync_dir.rs

+ 41 - 32
src/main.rs

@@ -26,54 +26,57 @@ use crate::sync_dir::SyncDir;
 
 #[tokio::main]
 async fn main() -> anyhow::Result<()> {
-    let watch_rate = 100;
-    let args: Vec<String> = env::args().skip(1).collect();
+    let watch_rate = 100; // each 100ms program check if there is any changes in the sync directory
+    let args: Vec<String> = env::args().skip(1).collect(); // agrs (sync directory and ticket)
 
-    let base_path = env::current_dir().unwrap().join(args[0].parse::<String>().unwrap());
+    let base_path = env::current_dir().unwrap().join(args[0].parse::<String>().unwrap()); // sync dir
     println!("BASE_PATH: {:?}", base_path);
-    let store_path = "./iroh_data/".to_string();
+    let store_path = "./iroh_data/".to_string(); // place where to persist data from doc
 
+    // creating SyncDir
     let sync_dir = if args.len() > 1 {
         SyncDir::new(base_path.clone(), store_path.clone(), Some(args[1].clone())).await?
     }else{
         SyncDir::new(base_path.clone(), store_path.clone(), None).await?
     };
-    let sync_dir = Arc::new(Mutex::new(sync_dir));
+    let sync_dir = Arc::new(Mutex::new(sync_dir)); // wrap it into Arc and Mutex to be able to lock it between threads
 
     let sync_dir_clone = Arc::clone(&sync_dir);
     let sync_dir_lock = sync_dir_clone.lock().await;
-    (*sync_dir_lock).load_local_files().await;
+    (*sync_dir_lock).load_local_files().await; // load local files into the doc
+    // subscribe to the doc events, so we can track any changes(for example if other peer adds a new file or changes sth)
     let mut stream = (*sync_dir_lock).doc_subscribe().await?;
     drop(sync_dir_lock);
 
+    // watching for the events in the doc
     let sync_dir_clone_stream = Arc::clone(&sync_dir);
-    let stream_task = tokio::spawn(async move {
+    let stream_task = tokio::spawn(async move { // creating a coroutine using tokio::spawn
         let mut keys: Vec<Vec<u8>> = Vec::new();
-        while let Some(item) = stream.next().await {
+        while let Some(item) = stream.next().await { // wait for the item in the stream
             match item {
                 Ok(value) => {
                     println!("{:?}", value);
                     match value {
-                        LiveEvent::InsertRemote { content_status, entry, .. } => {
-                            if content_status == ContentStatus::Complete {
-                                let sync_dir_lock = sync_dir_clone_stream.lock().await;
-                                let _ = (*sync_dir_lock).write_new_entry(entry.clone().key().to_vec()).await;
+                        LiveEvent::InsertRemote { content_status, entry, .. } => { // remote peer make a change
+                            if content_status == ContentStatus::Complete { // if change is already loaded. Otherwise, we wait for the ContentReady
+                                let sync_dir_lock = sync_dir_clone_stream.lock().await; // locking syncdir
+                                let _ = (*sync_dir_lock).write_new_entry(entry.clone().key().to_vec()).await; // writing new entry to the local fs
                                 drop(sync_dir_lock);
                             }else{
-                                keys.push(entry.clone().key().to_vec());
+                                keys.push(entry.clone().key().to_vec()); // if change is not ready we store key to use it in ContentReady event
                             }
                         }
-                        LiveEvent::ContentReady { .. } => {
-                            if !keys.is_empty() {
+                        LiveEvent::ContentReady { .. } => { // change loaded (usually invoked after InsertRemote)
+                            if !keys.is_empty() { // check if there are any keys left
                                 let sync_dir_lock = sync_dir_clone_stream.lock().await;
-                                let _ = (*sync_dir_lock).write_new_entry(keys[0].clone()).await;
+                                let _ = (*sync_dir_lock).write_new_entry(keys[0].clone()).await; // writing new entry to the local fs
                                 drop(sync_dir_lock);
-                                keys.remove(0);
+                                keys.remove(0); // removing used key from the list
                             } else {
                                 println!("Got ContentReady, but the key is empty");
                             }
                         }
-                        LiveEvent::PendingContentReady => {
+                        LiveEvent::PendingContentReady => { // usually there is nothing in the keys left up to this point, but it is better to make sure. Just in case.
                             for key in &keys {
                                 let sync_dir_lock = sync_dir_clone_stream.lock().await;
                                 let _ = (*sync_dir_lock).write_new_entry(key.clone()).await;
@@ -89,38 +92,41 @@ async fn main() -> anyhow::Result<()> {
         }
     });
 
+    // creating a watcher to watch changes in the local fs
     let sync_dir_clone_watcher = Arc::clone(&sync_dir);
-    let watcher_task = tokio::spawn(async move {
+    let watcher_task = tokio::spawn(async move { // creating a coroutine using tokio::spawn
         println!("Start watching...");
+        // creating a watcher itself and a channel
         let (tx, rx) = channel();
         let mut watcher = watcher(tx, Duration::from_secs(10)).unwrap();
         watcher.watch(base_path.clone(), RecursiveMode::Recursive).unwrap();
 
         loop {
-            match rx.try_recv() {
+            match rx.try_recv() { // waiting got the next item in the stream
                 Ok(event) => {
                     println!("Event: {:?}", event);
                     let sync_dir_lock = sync_dir_clone_watcher.lock().await;
                     match event {
-                        DebouncedEvent::Create(path) => {
-                            (*sync_dir_lock).get_content_and_load_local_file(path.clone()).await;
+                        DebouncedEvent::Create(path) => { // if new entry(file or folder) was created
+                            (*sync_dir_lock).get_content_and_load_local_file(path.clone()).await; // write its content and path to the doc
                         }
-                        DebouncedEvent::Write(path) => {
-                            (*sync_dir_lock).get_content_and_load_local_file(path.clone()).await;
+                        DebouncedEvent::Write(path) => { // if an existing entry(file or folder) was changed
+                            (*sync_dir_lock).get_content_and_load_local_file(path.clone()).await; // write its content and path to the doc
                         }
-                        DebouncedEvent::Remove(path) => {
-                            let _ = (*sync_dir_lock).remove_file(path).await;
+                        DebouncedEvent::Remove(path) => { // if an entry was removed
+                            let _ = (*sync_dir_lock).remove_file(path).await; // mark file as deleted in the doc
                         }
-                        DebouncedEvent::Rename(src, dst) => {
-                            (*sync_dir_lock).rename_local_file(src, dst).await;
+                        DebouncedEvent::Rename(src, dst) => { // if to rename an entry or move it to the other directory.
+                            // If an entry is moved outside of watching directory, the Remove event will be invoked
+                            (*sync_dir_lock).rename_local_file(src, dst).await; // delete an entry in the doc and write it with a new path
                         }
                         _ => {}
                     }
                     drop(sync_dir_lock);
                 }
                 Err(e) => match e {
-                    TryRecvError::Empty => {
-                        tokio::time::sleep(Duration::from_millis(watch_rate.clone())).await;
+                    TryRecvError::Empty => { // if there is nothing new in the stream
+                        tokio::time::sleep(Duration::from_millis(watch_rate.clone())).await; // wait watch_rate ms to not block the program with a lot of loop iterations
                     }
                     TryRecvError::Disconnected => {
                         println!("Channel disconnected, stopping watcher.");
@@ -133,13 +139,16 @@ async fn main() -> anyhow::Result<()> {
         println!("Stop watching");
     });
 
-    
+    // watching for a user input to stop the program
+    // It is CRUCIAL to stop it this way because it is required to call *sync_dir_lock).shutdown().await; to persist all the data to local fs
     let mut input = String::new();
     while io::stdin().read_line(&mut input).expect("Failed to read line") > 0 {
-        if input.trim() == "q" {
+        if input.trim() == "q" { // if user entered "q"
+            // drop all coroutine
             drop(stream_task);
             drop(watcher_task);
             
+            // shutdown node and persist doc to the local fs
             let sync_dir_clone = Arc::clone(&sync_dir);
             let sync_dir_lock = sync_dir_clone.lock().await;
             (*sync_dir_lock).shutdown().await;

+ 74 - 79
src/sync_dir.rs

@@ -1,37 +1,33 @@
 use std::collections::HashMap;
-use std::{env, fs};
+use std::{fs};
 use std::fs::File;
-use std::future::Future;
 use std::io::{Read, Write};
-use std::path::{Path, PathBuf, StripPrefixError};
-use std::ptr::null;
-use std::sync::{Arc, MutexGuard};
-use std::sync::mpsc::channel;
-use std::time::Duration;
+use std::path::{Path, PathBuf};
+use std::sync::{Arc};
 use futures_lite::{Stream, StreamExt};
 use iroh::base::node_addr::AddrInfoOptions;
 use iroh::blobs::store;
 use iroh::blobs::store::Store;
-use iroh::client::docs::{Client, Entry, LiveEvent, ShareMode};
-use iroh::client::{Doc, Iroh};
-use iroh::docs::{Author, AuthorId};
+use iroh::client::docs::{LiveEvent, ShareMode};
+use iroh::client::{Doc};
+use iroh::docs::{ AuthorId};
 use iroh::docs::store::Query;
 use iroh::node::Node;
-use notify::{DebouncedEvent, RecursiveMode, watcher, Watcher};
-use anyhow::{anyhow, Result};
+use notify::{Watcher};
+use anyhow::{Result};
 use bytes::Bytes;
 use iroh::base::ticket::Ticket;
 use serde::{Deserialize, Serialize};
-use tokio::fs::remove_file;
 use tokio::sync::Mutex;
-use tokio::task;
 
+/// struct that represents entity(file or folder) in the local fs
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct Entity {
     pub content: Vec<u8>,
     pub is_deleted: bool,
 }
 
+// defining required traits for the Entity
 impl Entity {
     fn new(content: Vec<u8>, is_deleted: bool) -> Self{
         Entity{
@@ -51,11 +47,11 @@ impl Entity {
     }
 }
 
+/// struct with all functions to work with Iroh's doc and local fs
 pub struct SyncDir {
-    base_path: PathBuf,
-    store_path: String,
+    base_path: PathBuf, //  sync dir
+    store_path: String, // place where to persist data from doc
     node: Node<store::fs::Store>,
-    // node: Node<store::mem::Store>,
     doc: Arc<Mutex<Doc>>,
     author: AuthorId,
 }
@@ -63,11 +59,10 @@ pub struct SyncDir {
 impl SyncDir{
     pub async fn new(base_path: PathBuf, store_path: String, ticket: Option<String>) -> anyhow::Result<Self>{
         let mut node = iroh::node::Node::persistent(Path::new(&store_path.clone())).await?.spawn().await?;
-        // let mut node = iroh::node::Node::memory().spawn().await?;
         let client = node.client();
         
         let doc = match ticket {
-            None => {
+            None => { // if there is no ticket specified in the args, create one 
                 let d = Arc::new(Mutex::new(client.docs().create().await?));
                 let mut doc_lock = d.lock().await;
                 let ticket = (*doc_lock).share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses).await?;
@@ -75,12 +70,12 @@ impl SyncDir{
                 println!("{}", ticket);
                 d
             }
-            Some(ticket) => {
-                Arc::new(Mutex::new(client.docs().import(ticket.parse()?).await.unwrap()))
+            Some(ticket) => { // import existing doc from the file system and connect to other peers
+                Arc::new(Mutex::new(client.docs().import(ticket.parse()?).await.unwrap())) 
             }
         };
         
-        // let author = client.authors().create().await?;
+        // using a default author
         let author = client.authors().default().await?;
         
         Ok(SyncDir{
@@ -92,14 +87,18 @@ impl SyncDir{
         })
     }
     
+    /// loading all the files from sync dir to the doc
     pub async fn load_local_files(&self){
+        // read all the files/folders from the sync dir
         let directory = self.collect_files_in_dir(self.base_path.clone());
         
+        // load all entries to the doc
         for (key, content) in directory {
             let _ = self.load_local_file(key, content).await;
         }
     }
 
+    /// collecting all the files from sync dir
     pub fn collect_files_in_dir<P: AsRef<Path>>(&self, dir: P) -> HashMap<String, Vec<u8>> {
         let mut file_map = HashMap::new();
         let dir = dir.as_ref();
@@ -127,38 +126,47 @@ impl SyncDir{
         file_map
     }
 
+    /// writes new entry to the doc if it is different from existing one
     async fn write_to_doc_if_different(&self, key: String, content: Entity) -> anyhow::Result<()>{
-        let entity = self.get_last_from_doc(key.clone()).await;
+        let entity = self.get_last_from_doc(key.clone()).await; // get last entry added to the doc
 
         match entity {
-            Some(entry) => {
-                if content.content != entry.content{
+            Some(entry) => { // if there is already entry with the same key in the doc
+                if content.content != entry.content{ // check if contents are different
+                    // if so, write new version to the doc
                     let _ = self.set_to_doc(key.clone(), content.content.clone(), false).await;
                 }
             },
-            None => {
+            None => { // if there is no entry with such key just add new one to the doc
                 let _ = self.set_to_doc(key.clone(), content.content.clone(), false).await;
             }
         };
         Ok(())
     }
 
+    /// load file/folder from the local fs to the doc
     pub async fn load_local_file(&self, key: String, content: Vec<u8>) -> anyhow::Result<()> {
+        // TODO file might have an empty content as well, so we need to check for it
+        // calculate a relative path from sync_dir to the entry
         let relative_path = self.relative_path(PathBuf::from(key.clone()));
 
-        if content.is_empty() {
+        if content.is_empty() { // if the content is empty - it is a folder
+            // add 'd' at the begging of the key to indicate it is a folder TODO we might move it to the Entity struct
             let _ = self.write_to_doc_if_different("d".to_owned() + &*relative_path.display().to_string(), Entity::new(vec![0], false)).await;
         } else {
             let content_clone = content.clone();
             let temp = std::str::from_utf8(&content_clone)?;
             println!("load_local_file: {:?}", temp.clone());
+            // add 'd' at the begging of the key to indicate it is a file TODO we might move it to the Entity struct
             let _ = self.write_to_doc_if_different("f".to_owned() + &*relative_path.display().to_string(), Entity::new(content.clone(), false)).await;
         }
         
         Ok(())
     }
     
+    /// removing file/folder from the local fs
     pub fn remove_local_file(&self, path: PathBuf, is_dir: bool){
+        // just removing entry from fs
         if is_dir{
             let _ = fs::remove_dir_all(path);
         }else{
@@ -166,10 +174,13 @@ impl SyncDir{
         }
     }
 
+    /// called when InsertRemote or ContentReady in the doc stream invoked
     pub async fn write_new_entry(&self, key: Vec<u8>) -> anyhow::Result<()> {
+        // getting the last entry from the doc by a key
         let entity = self.get_last_from_doc(String::from_utf8(key.clone()).unwrap()).await
             .unwrap_or_else(|| { Entity::new(Vec::new(), false) });
 
+        // deleting 'f' or 'd' that indicate whether it is a file or a folder from the key
         let cloned_key = String::from_utf8(key)?;
         let mut char_indices = cloned_key.char_indices();
         let second_char_start_index = char_indices.nth(1)
@@ -178,6 +189,7 @@ impl SyncDir{
         let mut entry_name = self.base_path.clone();
         entry_name = entry_name.join(&cloned_key[second_char_start_index..]);
         
+        // if an entry marked as deleted in the doc, it should be deleted from the local fs as well
         if entity.is_deleted{
             match cloned_key.chars().next() {
                 Some(first_char) if first_char == 'f' => self.remove_local_file(entry_name, false),
@@ -185,23 +197,24 @@ impl SyncDir{
                 Some(_) => {}
                 None => {}
             }
-            return Ok(())
+            return Ok(()) // no need to check other options
         }
         
+        // retrieving content from the entry
         let content_bytes = Bytes::from(entity.content);        
 
         println!("write_new_entry: {:?}", entry_name);
 
         match cloned_key.chars().next() {
-            Some(first_char) if first_char == 'f' => {
-                if let Some(parent) = entry_name.parent() {
+            Some(first_char) if first_char == 'f' => { // if it is a file
+                if let Some(parent) = entry_name.parent() { // creating a missing directory
                     fs::create_dir_all(parent)?;
                 }
-                let mut file = File::create(entry_name)?;
-                file.write_all(&*content_bytes)?;
+                let mut file = File::create(entry_name)?; // creating a new file if not already exists
+                file.write_all(&*content_bytes)?; // writing new content to the file
             }
-            Some(first_char) if first_char == 'd' => {
-                fs::create_dir_all(entry_name)?;
+            Some(first_char) if first_char == 'd' => { // if it is a folder
+                fs::create_dir_all(entry_name)?; // creating new folder
             }
             Some(_) => {}
             None => {}
@@ -210,40 +223,7 @@ impl SyncDir{
         Ok(())
     }
 
-    pub async fn process_stream(&self, mut stream: impl Stream<Item =anyhow::Result<LiveEvent, anyhow::Error>> + Unpin) -> anyhow::Result<()> {
-        let mut keys: Vec<Vec<u8>> = Vec::new();
-        while let Some(item) = stream.next().await {
-            match item {
-                Ok(value) => {
-                    println!("{:?}", value);
-                    match value {
-                        LiveEvent::InsertRemote { entry, .. } => {
-                            keys.push(entry.clone().key().to_vec());
-                        }
-                        LiveEvent::ContentReady { .. } => {
-                            if !keys.is_empty() {
-                                let _ = self.write_new_entry(keys[0].clone()).await;
-                                keys.remove(0);
-                            } else {
-                                println!("Got ContentReady, but the key is empty");
-                            }
-                        }
-                        LiveEvent::PendingContentReady => {
-                            for key in &keys {
-                                let _ = self.write_new_entry(key.clone()).await;
-                            }
-                            keys.clear();
-                        }
-                        _ => {}
-                    }
-                }
-                Err(e) => eprintln!("Error processing stream: {}", e),
-            }
-        }
-
-        Ok(())
-    }
-
+    /// subscribe to the doc events. The function returns a stream
     pub async fn doc_subscribe(&self) -> Result<impl Stream<Item = Result<LiveEvent>>> {
         let doc_clone = Arc::clone(&self.doc);
         let doc_lock = doc_clone.lock().await;
@@ -253,15 +233,19 @@ impl SyncDir{
         event
     }
     
+    /// shuts down a node and persists all the data from the doc to the local fs
     pub async fn shutdown(&self) {
         self.node.clone().shutdown().await.expect("Unable to shutdown node");
     }
     
+    /// get last entry from the doc by a key TODO needs to be removed
     pub async fn get_by_key(&self, key: String) -> Option<Entity>{
         self.get_last_from_doc(key.clone()).await
     }
 
+    /// read content from the local file and load it to the doc
     pub async fn get_content_and_load_local_file(&self, path: PathBuf) {
+        // retrieving content
         let mut content: Vec<u8> = vec![];
         if path.is_file() {
             let mut file_content = Vec::new();
@@ -271,13 +255,17 @@ impl SyncDir{
             let str = String::from_utf8(content.clone()).unwrap();
             println!("local content: {str}");
         }
+        
+        // loading to the doc
         self.load_local_file(path.display().to_string(), content).await.expect("Unable to load local file");
     }
 
+    /// checks if the entry exists in the doc and mark it as deleted
     pub async fn remove_if_exists(&self, key: String) -> anyhow::Result<()> {
-        let entry = self.get_last_from_doc(key.clone()).await;
+        let entry = self.get_last_from_doc(key.clone()).await; // get last entry from the doc
         match entry {
             Some(entity) => {
+                // write to the doc an entity with the same content but marked as deleted
                 let _ = self.set_to_doc(key.clone(), entity.content, true).await;
             },
             None => {}
@@ -287,9 +275,11 @@ impl SyncDir{
         Ok(())
     }
     
+    /// checks if the path is a file or a folder and deletes it from the doc
     pub async fn remove_file(&self, path: PathBuf) -> anyhow::Result<()>{
-        let relative_path = self.relative_path(path.clone());
+        let relative_path = self.relative_path(path.clone()); // calculate relative path
         
+        // considering both options (entry might be a file or a folder)
         let file_path = "f".to_string() + &*relative_path.clone().display().to_string();
         let dir_path = "d".to_string() + &*relative_path.clone().display().to_string();
 
@@ -312,6 +302,7 @@ impl SyncDir{
         Ok(())
     }
     
+    /// calculates relative path from sync dir to any file/folder inside
     pub fn relative_path(&self, path: PathBuf) -> PathBuf{
         match path.strip_prefix(&self.base_path.clone()) {
             Ok(relative_path) => {PathBuf::from(relative_path)}
@@ -319,28 +310,31 @@ impl SyncDir{
         }
     }
     
+    /// write new entry to doc
     pub async fn set_to_doc(&self, key: String, content: Vec<u8>, is_deleted: bool) -> anyhow::Result<()>{
         let doc_clone = Arc::clone(&self.doc);
-        let mut doc_lock = doc_clone.lock().await;
+        let mut doc_lock = doc_clone.lock().await; // locking doc
 
-        let str = String::from_utf8(content.clone()).unwrap();
+        let str = String::from_utf8(content.clone()).unwrap(); // for debug
         println!("SET_TO_DOC: {:?}", str);
         
-        let entity = Entity::new(content.clone(), is_deleted.clone());
-        (*doc_lock).set_bytes(self.author, key, entity.as_bytes()?).await?;
+        let entity = Entity::new(content.clone(), is_deleted.clone()); // creating new instance of an entity
+        (*doc_lock).set_bytes(self.author, key, entity.as_bytes()?).await?; // writing new entity to the doc
         drop(doc_lock);
         
         Ok(())
     }
     
+    /// each author has its own entry in the doc of the same file so this function retrieves the entry that was last added 
     pub async fn get_last_from_doc(&self, key: String) -> Option<Entity> {
         let doc_clone = Arc::clone(&self.doc);
         let mut doc_lock = doc_clone.lock().await;
-        let mut entries = (*doc_lock).get_many(Query::single_latest_per_key().key_exact(key.clone())).await.ok()?;
-        let entry = entries.next().await;
+        // retrieve entry from the doc
+        let mut entries = (*doc_lock).get_many(Query::single_latest_per_key().key_exact(key.clone())).await.ok()?; // (returns a stream)
+        let entry = entries.next().await; // pick the first element from the stream
 
         
-        let content = match entry {
+        let content = match entry { // convert result to Entity and return it
             None => None,
             Some(entry) => {
                 match entry { 
@@ -358,8 +352,9 @@ impl SyncDir{
         content
     }
     
+    /// renames file/dir inside doc
     pub async fn rename_local_file(&self, src: PathBuf, dst: PathBuf){
-        let _ = self.remove_file(src).await;
-        let _ = self.get_content_and_load_local_file(dst).await;
+        let _ = self.remove_file(src).await; // delete existing file/dir
+        let _ = self.get_content_and_load_local_file(dst).await; // write the same file/dir with a new path
     }
 }