Yurii Sokolovskyi 3 місяців тому
батько
коміт
0e9c2fe348
42 змінених файлів з 3282 додано та 11 видалено
  1. 3 1
      Cargo.toml
  2. 4 0
      crabmail.conf
  3. 12 0
      sonic-channel-wasi/.gitignore
  4. 5 0
      sonic-channel-wasi/CODE_OF_CONDUCT.md
  5. 35 0
      sonic-channel-wasi/Cargo.toml
  6. 373 0
      sonic-channel-wasi/LICENSE
  7. 112 0
      sonic-channel-wasi/README.md
  8. 13 0
      sonic-channel-wasi/docker-compose.test.yml
  9. 43 0
      sonic-channel-wasi/flake.lock
  10. 25 0
      sonic-channel-wasi/flake.nix
  11. 66 0
      sonic-channel-wasi/sonic.cfg
  12. 238 0
      sonic-channel-wasi/src/channels.rs
  13. 160 0
      sonic-channel-wasi/src/channels/control.rs
  14. 183 0
      sonic-channel-wasi/src/channels/ingest.rs
  15. 164 0
      sonic-channel-wasi/src/channels/search.rs
  16. 53 0
      sonic-channel-wasi/src/commands.rs
  17. 67 0
      sonic-channel-wasi/src/commands/count.rs
  18. 68 0
      sonic-channel-wasi/src/commands/flush.rs
  19. 69 0
      sonic-channel-wasi/src/commands/list.rs
  20. 22 0
      sonic-channel-wasi/src/commands/ping.rs
  21. 54 0
      sonic-channel-wasi/src/commands/pop.rs
  22. 73 0
      sonic-channel-wasi/src/commands/push.rs
  23. 100 0
      sonic-channel-wasi/src/commands/query.rs
  24. 22 0
      sonic-channel-wasi/src/commands/quit.rs
  25. 44 0
      sonic-channel-wasi/src/commands/start.rs
  26. 63 0
      sonic-channel-wasi/src/commands/suggest.rs
  27. 47 0
      sonic-channel-wasi/src/commands/trigger.rs
  28. 112 0
      sonic-channel-wasi/src/lib.rs
  29. 21 0
      sonic-channel-wasi/src/macroses.rs
  30. 169 0
      sonic-channel-wasi/src/misc.rs
  31. 340 0
      sonic-channel-wasi/src/protocol.rs
  32. 77 0
      sonic-channel-wasi/src/result.rs
  33. 28 0
      sonic-channel-wasi/tests/common.rs
  34. 27 0
      sonic-channel-wasi/tests/list_command.rs
  35. 60 0
      sonic-channel-wasi/tests/push_command.rs
  36. 138 0
      sonic-channel-wasi/tests/query_command.rs
  37. 36 0
      sonic-channel-wasi/tests/suggest_command.rs
  38. 7 1
      src/config.rs
  39. 19 9
      src/main.rs
  40. 20 0
      src/server.rs
  41. 82 0
      src/sonic.rs
  42. 28 0
      src/templates/xml.rs

+ 3 - 1
Cargo.toml

@@ -40,8 +40,9 @@ futures = "0.3.30"
 [target.'cfg(target_os = "wasi")'.dependencies]
 tokio_wasi = {version = "1.25.2", features = ["full", "rt", "rt-multi-thread", "macros", "time"] }
 tokio-rustls-wasi = "0.25.0-alpha"
-async-imap-wasi = {path = "./async-imap-wasi/", default-features = false, features = ["runtime-tokio"]}
 warp_wasi = "0.3.3"
+async-imap-wasi = {path = "./async-imap-wasi", default-features = false, features = ["runtime-tokio"]}
+sonic-channel-wasi = { version = "1.1.0", features = ["ingest"], path = "./sonic-channel-wasi" }
 
 [target.'cfg(not(target_os = "wasi"))'.dependencies]
 maildir = "0.6.4"
@@ -50,4 +51,5 @@ native-tls = "0.2.11"
 async-imap = {version =  "0.9.7" , default-features = false, features = ["runtime-tokio"]}
 tokio-rustls = "0.26.0"
 warp = "0.3.7"
+sonic-channel = { version = "1.1.0", features = ["ingest"] }
 

+ 4 - 0
crabmail.conf

@@ -15,4 +15,8 @@ api_port=3001
 smtp_domain=smtpDomain
 smtp_port=smtpPort
 
+sonic_search_addr=127.0.0.1
+sonic_search_port=5041
+sonic_search_password=SecretPassword
+
 reply_add_link=false

+ 12 - 0
sonic-channel-wasi/.gitignore

@@ -0,0 +1,12 @@
+# misc
+.DS_Store
+
+# result
+/target
+
+# test bin
+/src/main.rs
+
+# direnv
+.envrc
+.direnv/

+ 5 - 0
sonic-channel-wasi/CODE_OF_CONDUCT.md

@@ -0,0 +1,5 @@
+# Contributor Code of Conduct
+
+This project adheres to No Code of Conduct.  We are all adults.  We accept anyone's contributions.  Nothing else matters.
+
+For more information please visit the [No Code of Conduct](https://github.com/domgetter/NCoC) homepage.

+ 35 - 0
sonic-channel-wasi/Cargo.toml

@@ -0,0 +1,35 @@
+[package]
+name = "sonic-channel-wasi"
+version = "1.1.0"
+authors = ["Dmitriy Pleshevskiy <dmitriy@ideascup.me>"]
+description = "Rust client for sonic search backend"
+categories = ["api-bindings"]
+keywords = ["sonic", "search", "client", "elasticsearch", "api"]
+edition = "2021"
+license = "MPL-2.0"
+repository = "https://github.com/pleshevskiy/sonic-channel"
+homepage = "https://github.com/pleshevskiy/sonic-channel"
+documentation = "https://docs.rs/sonic-channel"
+readme = "README.md"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+log = "0.4.17"
+whatlang = "0.16.2"
+wasmedge_wasi_socket = "0.5.4"
+
+[features]
+default = ["search"]
+
+ingest = []
+search = []
+control = []
+
+
+[badges]
+maintenance = { status = "actively-developed" }
+
+# https://docs.rs/about
+[package.metadata.docs.rs]
+all-features = true

+ 373 - 0
sonic-channel-wasi/LICENSE

@@ -0,0 +1,373 @@
+Mozilla Public License Version 2.0
+==================================
+
+1. Definitions
+--------------
+
+1.1. "Contributor"
+    means each individual or legal entity that creates, contributes to
+    the creation of, or owns Covered Software.
+
+1.2. "Contributor Version"
+    means the combination of the Contributions of others (if any) used
+    by a Contributor and that particular Contributor's Contribution.
+
+1.3. "Contribution"
+    means Covered Software of a particular Contributor.
+
+1.4. "Covered Software"
+    means Source Code Form to which the initial Contributor has attached
+    the notice in Exhibit A, the Executable Form of such Source Code
+    Form, and Modifications of such Source Code Form, in each case
+    including portions thereof.
+
+1.5. "Incompatible With Secondary Licenses"
+    means
+
+    (a) that the initial Contributor has attached the notice described
+        in Exhibit B to the Covered Software; or
+
+    (b) that the Covered Software was made available under the terms of
+        version 1.1 or earlier of the License, but not also under the
+        terms of a Secondary License.
+
+1.6. "Executable Form"
+    means any form of the work other than Source Code Form.
+
+1.7. "Larger Work"
+    means a work that combines Covered Software with other material, in
+    a separate file or files, that is not Covered Software.
+
+1.8. "License"
+    means this document.
+
+1.9. "Licensable"
+    means having the right to grant, to the maximum extent possible,
+    whether at the time of the initial grant or subsequently, any and
+    all of the rights conveyed by this License.
+
+1.10. "Modifications"
+    means any of the following:
+
+    (a) any file in Source Code Form that results from an addition to,
+        deletion from, or modification of the contents of Covered
+        Software; or
+
+    (b) any new file in Source Code Form that contains any Covered
+        Software.
+
+1.11. "Patent Claims" of a Contributor
+    means any patent claim(s), including without limitation, method,
+    process, and apparatus claims, in any patent Licensable by such
+    Contributor that would be infringed, but for the grant of the
+    License, by the making, using, selling, offering for sale, having
+    made, import, or transfer of either its Contributions or its
+    Contributor Version.
+
+1.12. "Secondary License"
+    means either the GNU General Public License, Version 2.0, the GNU
+    Lesser General Public License, Version 2.1, the GNU Affero General
+    Public License, Version 3.0, or any later versions of those
+    licenses.
+
+1.13. "Source Code Form"
+    means the form of the work preferred for making modifications.
+
+1.14. "You" (or "Your")
+    means an individual or a legal entity exercising rights under this
+    License. For legal entities, "You" includes any entity that
+    controls, is controlled by, or is under common control with You. For
+    purposes of this definition, "control" means (a) the power, direct
+    or indirect, to cause the direction or management of such entity,
+    whether by contract or otherwise, or (b) ownership of more than
+    fifty percent (50%) of the outstanding shares or beneficial
+    ownership of such entity.
+
+2. License Grants and Conditions
+--------------------------------
+
+2.1. Grants
+
+Each Contributor hereby grants You a world-wide, royalty-free,
+non-exclusive license:
+
+(a) under intellectual property rights (other than patent or trademark)
+    Licensable by such Contributor to use, reproduce, make available,
+    modify, display, perform, distribute, and otherwise exploit its
+    Contributions, either on an unmodified basis, with Modifications, or
+    as part of a Larger Work; and
+
+(b) under Patent Claims of such Contributor to make, use, sell, offer
+    for sale, have made, import, and otherwise transfer either its
+    Contributions or its Contributor Version.
+
+2.2. Effective Date
+
+The licenses granted in Section 2.1 with respect to any Contribution
+become effective for each Contribution on the date the Contributor first
+distributes such Contribution.
+
+2.3. Limitations on Grant Scope
+
+The licenses granted in this Section 2 are the only rights granted under
+this License. No additional rights or licenses will be implied from the
+distribution or licensing of Covered Software under this License.
+Notwithstanding Section 2.1(b) above, no patent license is granted by a
+Contributor:
+
+(a) for any code that a Contributor has removed from Covered Software;
+    or
+
+(b) for infringements caused by: (i) Your and any other third party's
+    modifications of Covered Software, or (ii) the combination of its
+    Contributions with other software (except as part of its Contributor
+    Version); or
+
+(c) under Patent Claims infringed by Covered Software in the absence of
+    its Contributions.
+
+This License does not grant any rights in the trademarks, service marks,
+or logos of any Contributor (except as may be necessary to comply with
+the notice requirements in Section 3.4).
+
+2.4. Subsequent Licenses
+
+No Contributor makes additional grants as a result of Your choice to
+distribute the Covered Software under a subsequent version of this
+License (see Section 10.2) or under the terms of a Secondary License (if
+permitted under the terms of Section 3.3).
+
+2.5. Representation
+
+Each Contributor represents that the Contributor believes its
+Contributions are its original creation(s) or it has sufficient rights
+to grant the rights to its Contributions conveyed by this License.
+
+2.6. Fair Use
+
+This License is not intended to limit any rights You have under
+applicable copyright doctrines of fair use, fair dealing, or other
+equivalents.
+
+2.7. Conditions
+
+Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted
+in Section 2.1.
+
+3. Responsibilities
+-------------------
+
+3.1. Distribution of Source Form
+
+All distribution of Covered Software in Source Code Form, including any
+Modifications that You create or to which You contribute, must be under
+the terms of this License. You must inform recipients that the Source
+Code Form of the Covered Software is governed by the terms of this
+License, and how they can obtain a copy of this License. You may not
+attempt to alter or restrict the recipients' rights in the Source Code
+Form.
+
+3.2. Distribution of Executable Form
+
+If You distribute Covered Software in Executable Form then:
+
+(a) such Covered Software must also be made available in Source Code
+    Form, as described in Section 3.1, and You must inform recipients of
+    the Executable Form how they can obtain a copy of such Source Code
+    Form by reasonable means in a timely manner, at a charge no more
+    than the cost of distribution to the recipient; and
+
+(b) You may distribute such Executable Form under the terms of this
+    License, or sublicense it under different terms, provided that the
+    license for the Executable Form does not attempt to limit or alter
+    the recipients' rights in the Source Code Form under this License.
+
+3.3. Distribution of a Larger Work
+
+You may create and distribute a Larger Work under terms of Your choice,
+provided that You also comply with the requirements of this License for
+the Covered Software. If the Larger Work is a combination of Covered
+Software with a work governed by one or more Secondary Licenses, and the
+Covered Software is not Incompatible With Secondary Licenses, this
+License permits You to additionally distribute such Covered Software
+under the terms of such Secondary License(s), so that the recipient of
+the Larger Work may, at their option, further distribute the Covered
+Software under the terms of either this License or such Secondary
+License(s).
+
+3.4. Notices
+
+You may not remove or alter the substance of any license notices
+(including copyright notices, patent notices, disclaimers of warranty,
+or limitations of liability) contained within the Source Code Form of
+the Covered Software, except that You may alter any license notices to
+the extent required to remedy known factual inaccuracies.
+
+3.5. Application of Additional Terms
+
+You may choose to offer, and to charge a fee for, warranty, support,
+indemnity or liability obligations to one or more recipients of Covered
+Software. However, You may do so only on Your own behalf, and not on
+behalf of any Contributor. You must make it absolutely clear that any
+such warranty, support, indemnity, or liability obligation is offered by
+You alone, and You hereby agree to indemnify every Contributor for any
+liability incurred by such Contributor as a result of warranty, support,
+indemnity or liability terms You offer. You may include additional
+disclaimers of warranty and limitations of liability specific to any
+jurisdiction.
+
+4. Inability to Comply Due to Statute or Regulation
+---------------------------------------------------
+
+If it is impossible for You to comply with any of the terms of this
+License with respect to some or all of the Covered Software due to
+statute, judicial order, or regulation then You must: (a) comply with
+the terms of this License to the maximum extent possible; and (b)
+describe the limitations and the code they affect. Such description must
+be placed in a text file included with all distributions of the Covered
+Software under this License. Except to the extent prohibited by statute
+or regulation, such description must be sufficiently detailed for a
+recipient of ordinary skill to be able to understand it.
+
+5. Termination
+--------------
+
+5.1. The rights granted under this License will terminate automatically
+if You fail to comply with any of its terms. However, if You become
+compliant, then the rights granted under this License from a particular
+Contributor are reinstated (a) provisionally, unless and until such
+Contributor explicitly and finally terminates Your grants, and (b) on an
+ongoing basis, if such Contributor fails to notify You of the
+non-compliance by some reasonable means prior to 60 days after You have
+come back into compliance. Moreover, Your grants from a particular
+Contributor are reinstated on an ongoing basis if such Contributor
+notifies You of the non-compliance by some reasonable means, this is the
+first time You have received notice of non-compliance with this License
+from such Contributor, and You become compliant prior to 30 days after
+Your receipt of the notice.
+
+5.2. If You initiate litigation against any entity by asserting a patent
+infringement claim (excluding declaratory judgment actions,
+counter-claims, and cross-claims) alleging that a Contributor Version
+directly or indirectly infringes any patent, then the rights granted to
+You by any and all Contributors for the Covered Software under Section
+2.1 of this License shall terminate.
+
+5.3. In the event of termination under Sections 5.1 or 5.2 above, all
+end user license agreements (excluding distributors and resellers) which
+have been validly granted by You or Your distributors under this License
+prior to termination shall survive termination.
+
+************************************************************************
+*                                                                      *
+*  6. Disclaimer of Warranty                                           *
+*  -------------------------                                           *
+*                                                                      *
+*  Covered Software is provided under this License on an "as is"       *
+*  basis, without warranty of any kind, either expressed, implied, or  *
+*  statutory, including, without limitation, warranties that the       *
+*  Covered Software is free of defects, merchantable, fit for a        *
+*  particular purpose or non-infringing. The entire risk as to the     *
+*  quality and performance of the Covered Software is with You.        *
+*  Should any Covered Software prove defective in any respect, You     *
+*  (not any Contributor) assume the cost of any necessary servicing,   *
+*  repair, or correction. This disclaimer of warranty constitutes an   *
+*  essential part of this License. No use of any Covered Software is   *
+*  authorized under this License except under this disclaimer.         *
+*                                                                      *
+************************************************************************
+
+************************************************************************
+*                                                                      *
+*  7. Limitation of Liability                                          *
+*  --------------------------                                          *
+*                                                                      *
+*  Under no circumstances and under no legal theory, whether tort      *
+*  (including negligence), contract, or otherwise, shall any           *
+*  Contributor, or anyone who distributes Covered Software as          *
+*  permitted above, be liable to You for any direct, indirect,         *
+*  special, incidental, or consequential damages of any character      *
+*  including, without limitation, damages for lost profits, loss of    *
+*  goodwill, work stoppage, computer failure or malfunction, or any    *
+*  and all other commercial damages or losses, even if such party      *
+*  shall have been informed of the possibility of such damages. This   *
+*  limitation of liability shall not apply to liability for death or   *
+*  personal injury resulting from such party's negligence to the       *
+*  extent applicable law prohibits such limitation. Some               *
+*  jurisdictions do not allow the exclusion or limitation of           *
+*  incidental or consequential damages, so this exclusion and          *
+*  limitation may not apply to You.                                    *
+*                                                                      *
+************************************************************************
+
+8. Litigation
+-------------
+
+Any litigation relating to this License may be brought only in the
+courts of a jurisdiction where the defendant maintains its principal
+place of business and such litigation shall be governed by laws of that
+jurisdiction, without reference to its conflict-of-law provisions.
+Nothing in this Section shall prevent a party's ability to bring
+cross-claims or counter-claims.
+
+9. Miscellaneous
+----------------
+
+This License represents the complete agreement concerning the subject
+matter hereof. If any provision of this License is held to be
+unenforceable, such provision shall be reformed only to the extent
+necessary to make it enforceable. Any law or regulation which provides
+that the language of a contract shall be construed against the drafter
+shall not be used to construe this License against a Contributor.
+
+10. Versions of the License
+---------------------------
+
+10.1. New Versions
+
+Mozilla Foundation is the license steward. Except as provided in Section
+10.3, no one other than the license steward has the right to modify or
+publish new versions of this License. Each version will be given a
+distinguishing version number.
+
+10.2. Effect of New Versions
+
+You may distribute the Covered Software under the terms of the version
+of the License under which You originally received the Covered Software,
+or under the terms of any subsequent version published by the license
+steward.
+
+10.3. Modified Versions
+
+If you create software not governed by this License, and you want to
+create a new license for such software, you may create and use a
+modified version of this License if you rename the license and remove
+any references to the name of the license steward (except to note that
+such modified license differs from this License).
+
+10.4. Distributing Source Code Form that is Incompatible With Secondary
+Licenses
+
+If You choose to distribute Source Code Form that is Incompatible With
+Secondary Licenses under the terms of this version of the License, the
+notice described in Exhibit B of this License must be attached.
+
+Exhibit A - Source Code Form License Notice
+-------------------------------------------
+
+  This Source Code Form is subject to the terms of the Mozilla Public
+  License, v. 2.0. If a copy of the MPL was not distributed with this
+  file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+If it is not possible or desirable to put the notice in a particular
+file, then You may include the notice in a location (such as a LICENSE
+file in a relevant directory) where a recipient would be likely to look
+for such a notice.
+
+You may add additional accurate notices of copyright ownership.
+
+Exhibit B - "Incompatible With Secondary Licenses" Notice
+---------------------------------------------------------
+
+  This Source Code Form is "Incompatible With Secondary Licenses", as
+  defined by the Mozilla Public License, v. 2.0.

+ 112 - 0
sonic-channel-wasi/README.md

@@ -0,0 +1,112 @@
+# Sonic Channel
+
+[![Build](https://github.com/pleshevskiy/sonic-channel/actions/workflows/ci.yml/badge.svg)](https://github.com/pleshevskiy/sonic-channel/actions/workflows/ci.yml)
+[![unsafe forbidden](https://img.shields.io/badge/unsafe-forbidden-success.svg)](https://github.com/rust-secure-code/safety-dance/)
+[![Documentation](https://docs.rs/sonic-channel/badge.svg)](https://docs.rs/sonic-channel)
+[![Crates.io](https://img.shields.io/crates/v/sonic-channel)](https://crates.io/crates/sonic-channel)
+![Crates.io](https://img.shields.io/crates/l/sonic-channel)
+
+Rust client for [sonic] search backend.
+
+We recommend you start with the [documentation].
+
+## Installation
+
+**The MSRV is: 1.58.1**
+
+Add `sonic-channel = { version = "1.1" }` as a dependency in `Cargo.toml`.
+
+`Cargo.toml` example:
+
+```toml
+[package]
+name = "my-crate"
+version = "0.1.0"
+authors = ["Me <user@rust-lang.org>"]
+
+[dependencies]
+sonic-channel = { version = "1.1", features = ["ingest"] }
+```
+
+Add `default-features = false` to dependency, if you want to exclude default
+`search` channel.
+
+## Example usage
+
+### Search channel
+
+Note: This example requires enabling the `search` feature, enabled by default.
+
+```rust
+use sonic_channel::*;
+
+fn main() -> result::Result<()> {
+    let channel = SearchChannel::start(
+        "localhost:1491",
+        "SecretPassword",
+    )?;
+
+    let objects = channel.query(QueryRequest::new(
+        Dest::col_buc("collection", "bucket"),
+        "recipe",
+    ))?;
+    dbg!(objects);
+
+    Ok(())
+}
+```
+
+### Ingest channel
+
+Note: This example requires enabling the `ingest` feature.
+
+```rust
+use sonic_channel::*;
+
+fn main() -> result::Result<()> {
+    let channel = IngestChannel::start(
+        "localhost:1491",
+        "SecretPassword",
+    )?;
+
+    let dest = Dest::col_buc("collection", "bucket").obj("object:1");
+    let pushed = channel.push(PushRequest::new(dest, "my best recipe"))?;
+    // or
+    // let pushed = channel.push(
+    //     PushRequest::new(dest, "Мой лучший рецепт").lang(Lang::Rus)
+    // )?;
+    dbg!(pushed);
+
+    Ok(())
+}
+```
+
+### Control channel
+
+Note: This example requires enabling the `control` feature.
+
+```rust
+use sonic_channel::*;
+
+fn main() -> result::Result<()> {
+    let channel = ControlChannel::start(
+        "localhost:1491",
+        "SecretPassword",
+    )?;
+
+    let result = channel.consolidate()?;
+    assert_eq!(result, ());
+
+    Ok(())
+}
+```
+
+## Available features
+
+- **default** - ["search"]
+- **search** - Add sonic search mode with methods
+- **ingest** - Add sonic ingest mode with methods
+- **control** - Add sonic control mode with methods
+
+[sonic]: https://github.com/valeriansaliou/sonic
+[documentation]: https://docs.rs/sonic-channel

+ 13 - 0
sonic-channel-wasi/docker-compose.test.yml

@@ -0,0 +1,13 @@
+version: '3'
+
+services:
+  sonic:
+    image: valeriansaliou/sonic:v1.4.0
+    ports:
+      - 36999:1491
+    volumes:
+      - sonic_data:/var/lib/sonic/store/
+      - ./sonic.cfg:/etc/sonic.cfg
+
+volumes:
+  sonic_data:

+ 43 - 0
sonic-channel-wasi/flake.lock

@@ -0,0 +1,43 @@
+{
+  "nodes": {
+    "flake-utils": {
+      "locked": {
+        "lastModified": 1659877975,
+        "narHash": "sha256-zllb8aq3YO3h8B/U0/J1WBgAL8EX5yWf5pMj3G0NAmc=",
+        "owner": "numtide",
+        "repo": "flake-utils",
+        "rev": "c0e246b9b83f637f4681389ecabcb2681b4f3af0",
+        "type": "github"
+      },
+      "original": {
+        "owner": "numtide",
+        "repo": "flake-utils",
+        "type": "github"
+      }
+    },
+    "nixpkgs": {
+      "locked": {
+        "lastModified": 1662096612,
+        "narHash": "sha256-R+Q8l5JuyJryRPdiIaYpO5O3A55rT+/pItBrKcy7LM4=",
+        "owner": "NixOS",
+        "repo": "nixpkgs",
+        "rev": "21de2b973f9fee595a7a1ac4693efff791245c34",
+        "type": "github"
+      },
+      "original": {
+        "owner": "NixOS",
+        "ref": "nixpkgs-unstable",
+        "repo": "nixpkgs",
+        "type": "github"
+      }
+    },
+    "root": {
+      "inputs": {
+        "flake-utils": "flake-utils",
+        "nixpkgs": "nixpkgs"
+      }
+    }
+  },
+  "root": "root",
+  "version": 7
+}

+ 25 - 0
sonic-channel-wasi/flake.nix

@@ -0,0 +1,25 @@
+{
+  inputs = {
+    nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
+    flake-utils.url = "github:numtide/flake-utils";
+  };
+
+  outputs = { self, nixpkgs, flake-utils, ... }:
+    flake-utils.lib.eachDefaultSystem (system:
+      let
+        pkgs = import nixpkgs { inherit system; };
+      in
+      {
+        devShell = pkgs.mkShell {
+          packages = with pkgs; [
+            cargo
+            cargo-watch
+            rustc
+            rustfmt
+            clippy
+            rust-analyzer
+          ];
+          RUST_SRC_PATH = pkgs.rustPlatform.rustLibSrc;
+        };
+      });
+}

+ 66 - 0
sonic-channel-wasi/sonic.cfg

@@ -0,0 +1,66 @@
+# Sonic
+# Fast, lightweight and schema-less search backend
+# Configuration file
+# Example: https://github.com/valeriansaliou/sonic/blob/master/config.cfg
+
+
+[server]
+
+log_level = "error"
+
+
+[channel]
+
+inet = "0.0.0.0:1491"
+tcp_timeout = 300
+
+auth_password = "SecretPassword1234"
+
+[channel.search]
+
+query_limit_default = 10
+query_limit_maximum = 100
+query_alternates_try = 4
+
+suggest_limit_default = 5
+suggest_limit_maximum = 20
+
+
+[store]
+
+[store.kv]
+
+path = "/var/lib/sonic/store/kv/"
+
+retain_word_objects = 1000
+
+[store.kv.pool]
+
+inactive_after = 1800
+
+[store.kv.database]
+
+flush_after = 900
+
+compress = true
+parallelism = 2
+max_files = 100
+max_compactions = 1
+max_flushes = 1
+write_buffer = 16384
+write_ahead_log = true
+
+[store.fst]
+
+path = "/var/lib/sonic/store/fst/"
+
+[store.fst.pool]
+
+inactive_after = 300
+
+[store.fst.graph]
+
+consolidate_after = 180
+
+max_size = 2048
+max_words = 250000

+ 238 - 0
sonic-channel-wasi/src/channels.rs

@@ -0,0 +1,238 @@
+#[cfg(feature = "search")]
+mod search;
+#[cfg(feature = "search")]
+pub use search::*;
+
+#[cfg(feature = "ingest")]
+mod ingest;
+#[cfg(feature = "ingest")]
+pub use ingest::*;
+
+#[cfg(feature = "control")]
+mod control;
+#[cfg(feature = "control")]
+pub use control::*;
+
+use std::cell::RefCell;
+use std::io::{Read, Write};
+use wasmedge_wasi_socket::{TcpStream, ToSocketAddrs};
+
+use crate::commands::{StartCommand, StreamCommand};
+use crate::protocol::{self, Protocol};
+use crate::result::*;
+
+const UNINITIALIZED_MODE_MAX_BUFFER_SIZE: usize = 200;
+
+/// Channel modes supported by sonic search backend.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ChannelMode {
+    /// Sonic server search channel mode.
+    ///
+    /// In this mode you can use `query`, `pag_query`, `suggest`, `lim_suggest`, `ping`
+    /// and `quit` commands.
+    ///
+    /// Note: This mode requires enabling the `search` feature.
+    #[cfg(feature = "search")]
+    Search,
+
+    /// Sonic server ingest channel mode.
+    ///
+    /// In this mode you can use `push`, `pop`, `flush`, `count` `ping` and `quit` commands.
+    ///
+    /// Note: This mode requires enabling the `ingest` feature.
+    #[cfg(feature = "ingest")]
+    Ingest,
+
+    /// Sonic server control channel mode.
+    ///
+    /// In this mode you can use `trigger`, `consolidate`, `backup`, `restore`,
+    /// `ping` and `quit` commands.
+    ///
+    /// Note: This mode requires enabling the `control` feature.
+    #[cfg(feature = "control")]
+    Control,
+}
+
+impl ChannelMode {
+    /// Converts enum to &str
+    pub fn as_str(&self) -> &str {
+        match self {
+            #[cfg(feature = "search")]
+            ChannelMode::Search => "search",
+
+            #[cfg(feature = "ingest")]
+            ChannelMode::Ingest => "ingest",
+
+            #[cfg(feature = "control")]
+            ChannelMode::Control => "control",
+        }
+    }
+}
+
+impl std::fmt::Display for ChannelMode {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.as_str())
+    }
+}
+
+/// Root and Heart of this library.
+///
+/// You can connect to the sonic search backend and run all supported protocol methods.
+///
+#[derive(Debug)]
+pub struct SonicStream {
+    stream: RefCell<TcpStream>,
+    mode: Option<ChannelMode>, // None – Uninitialized mode
+    max_buffer_size: usize,
+    protocol: Protocol,
+}
+
+impl SonicStream {
+    fn send<SC: StreamCommand>(&self, command: &SC) -> Result<()> {
+        let buf = self
+            .protocol
+            .format_request(command.request())
+            .map_err(|_| Error::WriteToStream)?;
+        self.stream
+            .borrow_mut()
+            .write_all(&buf)
+            .map_err(|_| Error::WriteToStream)?;
+        Ok(())
+    }
+
+
+    fn read_line(&self) -> Result<protocol::Response> {
+        let mut buffer = Vec::with_capacity(self.max_buffer_size);
+        let mut stream = self.stream.borrow_mut();
+
+        loop {
+            let mut byte = [0u8; 1];
+            match stream.read(&mut byte) {
+                Ok(0) => {
+                    break;
+                }
+                Ok(1) => {
+                    buffer.push(byte[0]);
+                    if byte[0] == b'\n' {
+                        break;
+                    }
+                }
+                Ok(_) => {
+                    return Err(Error::ReadStream);
+                }
+                Err(_) => {
+                    return Err(Error::ReadStream);
+                }
+            }
+        }
+
+        let line_string = String::from_utf8(buffer.to_vec()).map_err(|e| {
+            println!("Invalid UTF-8 sequence: {:?}", e);
+            Error::ReadStream
+        })?;
+        
+        log::debug!("[channel] {}", &line_string);
+        self.protocol.parse_response(&line_string)
+    }
+
+    pub(crate) fn run_command<SC: StreamCommand>(&self, command: SC) -> Result<SC::Response> {
+        self.send(&command)?;
+        let res = loop {
+            let res = self.read_line()?;
+            if !matches!(&res, protocol::Response::Pending(_)) {
+                break res;
+            }
+        };
+        command.receive(res)
+    }
+
+    fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
+        let stream = TcpStream::connect(addr).map_err(|_| Error::ConnectToServer)?;
+
+        let channel = SonicStream {
+            stream: RefCell::new(stream),
+            mode: None,
+            max_buffer_size: UNINITIALIZED_MODE_MAX_BUFFER_SIZE,
+            protocol: Default::default(),
+        };
+
+        let res = channel.read_line()?;
+        if matches!(res, protocol::Response::Connected) {
+            Ok(channel)
+        } else {
+            Err(Error::ConnectToServer)
+        }
+    }
+
+    fn start<S: ToString>(&mut self, mode: ChannelMode, password: S) -> Result<()> {
+        if self.mode.is_some() {
+            return Err(Error::RunCommand);
+        }
+
+        let res = self.run_command(StartCommand {
+            mode,
+            password: password.to_string(),
+        })?;
+
+        self.max_buffer_size = res.max_buffer_size;
+        self.protocol = Protocol::from(res.protocol_version);
+        self.mode = Some(res.mode);
+
+        Ok(())
+    }
+
+    /// Connect to the search backend in chosen mode.
+    ///
+    /// I think we shouldn't separate commands connect and start because we haven't
+    /// possibility to change channel in sonic server, if we already chosen one of them. 🤔
+    pub(crate) fn connect_with_start<A, S>(mode: ChannelMode, addr: A, password: S) -> Result<Self>
+        where
+            A: ToSocketAddrs,
+            S: ToString,
+    {
+        let mut channel = Self::connect(addr)?;
+        channel.start(mode, password)?;
+        Ok(channel)
+    }
+}
+
+/// This trait should be implemented for all supported sonic channels
+pub trait SonicChannel {
+    /// Sonic channel struct
+    type Channel;
+
+    /// Returns reference for sonic stream of connection
+    fn stream(&self) -> &SonicStream;
+
+    /// Connects to sonic backend and run start command.
+    ///
+    /// ```rust,no_run
+    /// # use sonic_channel::*;
+    /// # fn main() -> result::Result<()> {
+    /// let search_channel = SearchChannel::start(
+    ///     "localhost:1491",
+    ///     "SecretPassword",
+    /// )?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    fn start<A, S>(addr: A, password: S) -> Result<Self::Channel>
+        where
+            A: ToSocketAddrs,
+            S: ToString;
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn format_channel_enums() {
+        #[cfg(feature = "search")]
+        assert_eq!(format!("{}", ChannelMode::Search), String::from("search"));
+        #[cfg(feature = "ingest")]
+        assert_eq!(format!("{}", ChannelMode::Ingest), String::from("ingest"));
+        #[cfg(feature = "control")]
+        assert_eq!(format!("{}", ChannelMode::Control), String::from("control"));
+    }
+}

+ 160 - 0
sonic-channel-wasi/src/channels/control.rs

@@ -0,0 +1,160 @@
+use super::{ChannelMode, SonicChannel, SonicStream};
+use crate::commands::*;
+use crate::result::Result;
+use wasmedge_wasi_socket::{ToSocketAddrs};
+
+/// The Sonic Channel Control mode is used for administration purposes.
+/// Once in this mode, you cannot switch to other modes or gain access
+/// to commands from other modes.
+///
+/// ### Available commands
+///
+/// In this mode you can use `consolidate`, `backup`, `restore`,
+/// `ping` and `quit` commands.
+///
+/// **Note:** This mode requires enabling the `control` feature.
+#[derive(Debug)]
+pub struct ControlChannel(SonicStream);
+
+impl SonicChannel for ControlChannel {
+    type Channel = ControlChannel;
+
+    fn stream(&self) -> &SonicStream {
+        &self.0
+    }
+
+    fn start<A, S>(addr: A, password: S) -> Result<Self::Channel>
+    where
+        A: ToSocketAddrs,
+        S: ToString,
+    {
+        SonicStream::connect_with_start(ChannelMode::Control, addr, password).map(Self)
+    }
+}
+
+impl ControlChannel {
+    init_command!(
+        /// Stop connection.
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let channel = ControlChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// channel.quit()?;
+        /// # Ok(())
+        /// # }
+        use QuitCommand for fn quit();
+    );
+
+    init_command!(
+        /// Ping server.
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let channel = ControlChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// channel.ping()?;
+        /// # Ok(())
+        /// # }
+        use PingCommand for fn ping();
+    );
+}
+
+impl ControlChannel {
+    init_command!(
+        /// Trigger control action.
+        ///
+        /// Note: This method requires enabling the `control` feature and start connection in
+        /// Control mode
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let control_channel = ControlChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// control_channel.trigger(TriggerRequest::Consolidate)?;
+        /// # Ok(())
+        /// # }
+        use TriggerCommand for fn trigger(
+            req: TriggerRequest,
+        )
+    );
+
+    /// Consolidate indexed search data instead of waiting for the next automated
+    /// consolidation tick.
+    ///
+    /// Note: This method requires enabling the `control` feature and start
+    /// connection in Control mode.
+    ///
+    /// ```rust,no_run
+    /// # use sonic_channel::*;
+    /// # fn main() -> result::Result<()> {
+    /// let control_channel = ControlChannel::start(
+    ///     "localhost:1491",
+    ///     "SecretPassword",
+    /// )?;
+    ///
+    /// control_channel.consolidate()?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn consolidate(&self) -> Result<()> {
+        self.trigger(TriggerRequest::Consolidate)
+    }
+
+    /// Backup KV + FST to <path>/<BACKUP_{KV/FST}_PATH>
+    /// See [sonic backend source code](https://github.com/valeriansaliou/sonic/blob/master/src/channel/command.rs#L808)
+    /// for more information.
+    ///
+    /// Note: This method requires enabling the `control` feature and start
+    /// connection in Control mode.
+    ///
+    /// ```rust,no_run
+    /// # use sonic_channel::*;
+    /// # fn main() -> result::Result<()> {
+    /// let control_channel = ControlChannel::start(
+    ///     "localhost:1491",
+    ///     "SecretPassword",
+    /// )?;
+    ///
+    /// control_channel.backup("2020-08-07T23-48")?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn backup(&self, path: &str) -> Result<()> {
+        self.trigger(TriggerRequest::Backup(path))
+    }
+
+    /// Restore KV + FST from <path> if you already have backup with the same name.
+    ///
+    /// Note: This method requires enabling the `control` feature and start
+    /// connection in Control mode.
+    ///
+    /// ```rust,no_run
+    /// # use sonic_channel::*;
+    /// # fn main() -> result::Result<()> {
+    /// let control_channel = ControlChannel::start(
+    ///     "localhost:1491",
+    ///     "SecretPassword",
+    /// )?;
+    ///
+    /// let result = control_channel.restore("2020-08-07T23-48")?;
+    /// assert_eq!(result, ());
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn restore(&self, path: &str) -> Result<()> {
+        self.trigger(TriggerRequest::Restore(path))
+    }
+}

+ 183 - 0
sonic-channel-wasi/src/channels/ingest.rs

@@ -0,0 +1,183 @@
+use super::{ChannelMode, SonicChannel, SonicStream};
+use crate::commands::*;
+use crate::result::Result;
+use wasmedge_wasi_socket::{ToSocketAddrs};
+
+/// The Sonic Channel Ingest mode is used for altering the search index
+/// (push, pop and flush). Once in this mode, you cannot switch to other
+/// modes or gain access to commands from other modes.
+///
+/// ### Available commands
+///
+/// In this mode you can use `push`, `pop`, `flushc`, `flushb`, `flusho`,
+/// `bucket_count`, `object_count`, `word_count`, `ping` and `quit` commands.
+///
+/// **Note:** This mode requires enabling the `ingest` feature.
+#[derive(Debug)]
+pub struct IngestChannel(SonicStream);
+
+impl SonicChannel for IngestChannel {
+    type Channel = IngestChannel;
+
+    fn stream(&self) -> &SonicStream {
+        &self.0
+    }
+
+    fn start<A, S>(addr: A, password: S) -> Result<Self::Channel>
+    where
+        A: ToSocketAddrs,
+        S: ToString,
+    {
+        SonicStream::connect_with_start(ChannelMode::Ingest, addr, password).map(Self)
+    }
+}
+
+impl IngestChannel {
+    init_command!(
+        /// Stop connection.
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let channel = IngestChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// channel.quit()?;
+        /// # Ok(())
+        /// # }
+        use QuitCommand for fn quit();
+    );
+
+    init_command!(
+        /// Ping server.
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let channel = IngestChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// channel.ping()?;
+        /// # Ok(())
+        /// # }
+        use PingCommand for fn ping();
+    );
+}
+
+impl IngestChannel {
+    init_command!(
+        /// Push search data in the index.
+        ///
+        /// Note: This method requires enabling the `ingest` feature and start
+        /// connection in Ingest mode.
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let ingest_channel = IngestChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// let result = ingest_channel.push(PushRequest::new(
+        ///     Dest::col("search").obj("recipe:295"),
+        ///     "Sweet Teriyaki Beef Skewers"
+        /// ))?;
+        /// assert_eq!(result, ());
+        /// # Ok(())
+        /// # }
+        /// ```
+        use PushCommand for fn push<'a>(
+            req: PushRequest,
+        );
+    );
+
+    init_command!(
+        /// Pop search data from the index. Returns removed words count as usize type.
+        ///
+        /// Note: This method requires enabling the `ingest` feature and start
+        /// connection in Ingest mode.
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let ingest_channel = IngestChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// let dest = Dest::col("search").obj("recipe:295");
+        /// let result = ingest_channel.pop(PopRequest::new(dest, "beef"))?;
+        /// assert_eq!(result, 1);
+        /// # Ok(())
+        /// # }
+        /// ```
+        use PopCommand for fn pop(
+            req: PopRequest,
+        );
+    );
+
+    init_command!(
+        /// Flush all indexed data from collections.
+        ///
+        /// Note: This method requires enabling the `ingest` feature and start
+        /// connection in Ingest mode.
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let ingest_channel = IngestChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// let flushc_count = ingest_channel.flush(FlushRequest::collection("search"))?;
+        /// dbg!(flushc_count);
+        /// let flushb_count = ingest_channel.flush(FlushRequest::bucket("search", "default"))?;
+        /// dbg!(flushb_count);
+        /// let flusho_count = ingest_channel.flush(
+        ///     FlushRequest::object("search", "default", "recipe:295")
+        /// )?;
+        /// dbg!(flusho_count);
+        /// # Ok(())
+        /// # }
+        /// ```
+        use FlushCommand for fn flush(
+            req: FlushRequest,
+        );
+    );
+
+    init_command!(
+        /// Count indexed search data of your collection.
+        ///
+        /// Note: This method requires enabling the `ingest` feature and start
+        /// connection in Ingest mode.
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let ingest_channel = IngestChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// let bucket_count = ingest_channel.count(CountRequest::buckets("search"))?;
+        /// dbg!(bucket_count);
+        /// let object_count = ingest_channel.count(CountRequest::objects("search", "default"))?;
+        /// dbg!(object_count);
+        /// let word_count = ingest_channel.count(
+        ///     CountRequest::words("search", "default", "recipe:256")
+        /// )?;
+        /// dbg!(object_count);
+        /// # Ok(())
+        /// # }
+        /// ```
+        use CountCommand for fn count(
+            req: CountRequest,
+        );
+    );
+}

+ 164 - 0
sonic-channel-wasi/src/channels/search.rs

@@ -0,0 +1,164 @@
+use super::{ChannelMode, SonicChannel, SonicStream};
+use crate::commands::*;
+use crate::result::Result;
+use wasmedge_wasi_socket::{ToSocketAddrs};
+
+/// The Sonic Channel Search mode is used for querying the search index.
+/// Once in this mode, you cannot switch to other modes or gain access
+/// to commands from other modes.
+///
+/// ### Available commands
+///
+/// In this mode you can use `query`, `suggest`, `ping` and `quit` commands.
+///
+/// **Note:** This mode requires enabling the `search` feature.
+#[derive(Debug)]
+pub struct SearchChannel(SonicStream);
+
+impl SonicChannel for SearchChannel {
+    type Channel = SearchChannel;
+
+    fn stream(&self) -> &SonicStream {
+        &self.0
+    }
+
+    fn start<A, S>(addr: A, password: S) -> Result<Self::Channel>
+    where
+        A: ToSocketAddrs,
+        S: ToString,
+    {
+        SonicStream::connect_with_start(ChannelMode::Search, addr, password).map(Self)
+    }
+}
+
+impl SearchChannel {
+    init_command!(
+        /// Stop connection.
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let channel = SearchChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// channel.quit()?;
+        /// # Ok(())
+        /// # }
+        use QuitCommand for fn quit();
+    );
+
+    init_command!(
+        /// Ping server.
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let channel = SearchChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// channel.ping()?;
+        /// # Ok(())
+        /// # }
+        use PingCommand for fn ping();
+    );
+}
+
+impl SearchChannel {
+    init_command!(
+        /// Query objects in database.
+        ///
+        /// Note: This method requires enabling the `search` feature and start
+        /// connection in Search mode.
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let search_channel = SearchChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// let result = search_channel.query(QueryRequest::new(
+        ///     Dest::col("search"),
+        ///     "Beef",
+        /// ))?;
+        /// dbg!(result);
+        ///
+        /// let result = search_channel.query(
+        ///     QueryRequest::new(Dest::col("search"), "Beef").limit(10)
+        /// )?;
+        /// dbg!(result);
+        /// # Ok(())
+        /// # }
+        /// ```
+        use QueryCommand for fn query(
+            req: QueryRequest,
+        );
+    );
+
+    init_command!(
+        /// Suggest auto-completes words.
+        ///
+        /// Note: This method requires enabling the `search` feature and start
+        /// connection in Search mode.
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let search_channel = SearchChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// let result = search_channel.suggest(
+        ///     SuggestRequest::new(Dest::col("search"), "Beef")
+        /// )?;
+        /// dbg!(result);
+        ///
+        /// let result = search_channel.suggest(
+        ///     SuggestRequest::new(Dest::col("search"), "Beef").limit(2)
+        /// )?;
+        /// dbg!(result);
+        /// # Ok(())
+        /// # }
+        /// ```
+        use SuggestCommand for fn suggest(
+            req: SuggestRequest,
+        );
+    );
+
+    init_command!(
+        /// Enumerates all words in an index.
+        ///
+        /// Note: This method requires enabling the `search` feature and start
+        /// connection in Search mode.
+        ///
+        /// ```rust,no_run
+        /// # use sonic_channel::*;
+        /// # fn main() -> result::Result<()> {
+        /// let search_channel = SearchChannel::start(
+        ///     "localhost:1491",
+        ///     "SecretPassword",
+        /// )?;
+        ///
+        /// let result = search_channel.list(
+        ///     ListRequest::new(Dest::col("search"))
+        /// )?;
+        /// dbg!(result);
+        ///
+        /// let result = search_channel.list(
+        ///     ListRequest::new(Dest::col("search")).limit(2)
+        /// )?;
+        /// dbg!(result);
+        /// # Ok(())
+        /// # }
+        /// ```
+        use ListCommand for fn list(
+            req: ListRequest,
+        );
+    );
+}

+ 53 - 0
sonic-channel-wasi/src/commands.rs

@@ -0,0 +1,53 @@
+mod ping;
+mod quit;
+mod start;
+
+#[cfg(feature = "ingest")]
+mod count;
+#[cfg(feature = "ingest")]
+mod flush;
+#[cfg(feature = "ingest")]
+mod pop;
+#[cfg(feature = "ingest")]
+mod push;
+
+#[cfg(feature = "search")]
+mod list;
+#[cfg(feature = "search")]
+mod query;
+#[cfg(feature = "search")]
+mod suggest;
+
+#[cfg(feature = "control")]
+mod trigger;
+
+pub(crate) use self::{ping::PingCommand, quit::QuitCommand, start::StartCommand};
+
+#[cfg(feature = "ingest")]
+pub(crate) use self::{
+    count::CountCommand, flush::FlushCommand, pop::PopCommand, push::PushCommand,
+};
+#[cfg(feature = "ingest")]
+pub use self::{count::CountRequest, flush::FlushRequest, pop::PopRequest, push::PushRequest};
+
+#[cfg(feature = "search")]
+pub(crate) use self::{list::ListCommand, query::QueryCommand, suggest::SuggestCommand};
+#[cfg(feature = "search")]
+pub use self::{list::ListRequest, query::QueryRequest, suggest::SuggestRequest};
+
+#[cfg(feature = "control")]
+pub(crate) use trigger::TriggerCommand;
+#[cfg(feature = "control")]
+pub use trigger::TriggerRequest;
+
+use crate::protocol;
+use crate::result::Result;
+
+#[doc(hidden)]
+pub trait StreamCommand {
+    type Response;
+
+    fn request(&self) -> protocol::Request;
+
+    fn receive(&self, res: protocol::Response) -> Result<Self::Response>;
+}

+ 67 - 0
sonic-channel-wasi/src/commands/count.rs

@@ -0,0 +1,67 @@
+use super::StreamCommand;
+use crate::misc::*;
+use crate::protocol;
+use crate::result::*;
+
+/// Parameters for the `count` command.
+#[derive(Debug)]
+pub struct CountRequest(OptDest);
+
+impl CountRequest {
+    /// Creates a new request to get the number of buckets in the collection.
+    pub fn buckets(collection: impl ToString) -> CountRequest {
+        Self(OptDest::col(collection))
+    }
+
+    /// Creates a new request to get the number of objects in the collection bucket.
+    pub fn objects(collection: impl ToString, bucket: impl ToString) -> CountRequest {
+        Self(OptDest::col_buc(collection, bucket))
+    }
+
+    /// Creates a new request to get the number of words in the collection bucket object.
+    pub fn words(
+        collection: impl ToString,
+        bucket: impl ToString,
+        object: impl ToString,
+    ) -> CountRequest {
+        Self(OptDest::col_buc_obj(collection, bucket, object))
+    }
+}
+
+impl From<Dest> for CountRequest {
+    fn from(d: Dest) -> Self {
+        Self(OptDest::from(d))
+    }
+}
+
+impl From<ObjDest> for CountRequest {
+    fn from(d: ObjDest) -> Self {
+        Self(OptDest::from(d))
+    }
+}
+
+#[derive(Debug)]
+pub struct CountCommand {
+    pub(crate) req: CountRequest,
+}
+
+impl StreamCommand for CountCommand {
+    type Response = usize;
+
+    fn request(&self) -> protocol::Request {
+        let dest = &self.req.0;
+        protocol::Request::Count {
+            collection: dest.collection.clone(),
+            bucket: dest.bucket.clone(),
+            object: dest.object.clone(),
+        }
+    }
+
+    fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
+        if let protocol::Response::Result(count) = res {
+            Ok(count)
+        } else {
+            Err(Error::WrongResponse)
+        }
+    }
+}

+ 68 - 0
sonic-channel-wasi/src/commands/flush.rs

@@ -0,0 +1,68 @@
+use super::StreamCommand;
+use crate::misc::*;
+use crate::protocol;
+use crate::result::*;
+
+/// Parameters for the `flush` command.
+#[derive(Debug)]
+pub struct FlushRequest(OptDest);
+
+impl FlushRequest {
+    /// Creates a new request to flush all data in the collection.
+    pub fn collection(collection: impl ToString) -> FlushRequest {
+        Self(OptDest::col(collection))
+    }
+
+    /// Creates a new request to flush all data in the collection bucket.
+    pub fn bucket(collection: impl ToString, bucket: impl ToString) -> FlushRequest {
+        Self(OptDest::col_buc(collection, bucket))
+    }
+
+    /// Creates a new request to flush all data in the collection bucket object.
+    pub fn object(
+        collection: impl ToString,
+        bucket: impl ToString,
+        object: impl ToString,
+    ) -> FlushRequest {
+        Self(OptDest::col_buc_obj(collection, bucket, object))
+    }
+}
+
+impl From<Dest> for FlushRequest {
+    fn from(d: Dest) -> Self {
+        Self(OptDest::from(d))
+    }
+}
+
+impl From<ObjDest> for FlushRequest {
+    fn from(d: ObjDest) -> Self {
+        Self(OptDest::from(d))
+    }
+}
+
+#[derive(Debug)]
+pub struct FlushCommand {
+    pub(crate) req: FlushRequest,
+}
+
+impl StreamCommand for FlushCommand {
+    type Response = usize;
+
+    fn request(&self) -> protocol::Request {
+        let dest = &self.req.0;
+
+        protocol::Request::Flush {
+            collection: dest.collection.clone(),
+            bucket: dest.bucket.clone(),
+            object: dest.object.clone(),
+        }
+    }
+
+    fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
+        if let protocol::Response::Result(count) = res {
+            Ok(count)
+        } else {
+            Err(Error::WrongResponse)
+        }
+    }
+}

+ 69 - 0
sonic-channel-wasi/src/commands/list.rs

@@ -0,0 +1,69 @@
+use super::StreamCommand;
+use crate::misc::Dest;
+use crate::protocol;
+use crate::result::*;
+
+/// Parameters for the `suggest` command.
+#[derive(Debug)]
+pub struct ListRequest {
+    /// Collection and bucket where we should enumerate all words in index.
+    pub dest: Dest,
+    /// Limit of result words.
+    pub limit: Option<usize>,
+    /// Offset of result words.
+    pub offset: Option<usize>,
+}
+
+impl ListRequest {
+    /// Creates a base suggest request.
+    pub fn new(dest: Dest) -> Self {
+        Self {
+            dest,
+            limit: None,
+            offset: None,
+        }
+    }
+
+    /// Set a limit for the request.
+    pub fn limit(mut self, limit: usize) -> Self {
+        self.limit = Some(limit);
+        self
+    }
+
+    /// Set an offset for the request.
+    pub fn offset(mut self, offset: usize) -> Self {
+        self.offset = Some(offset);
+        self
+    }
+}
+
+#[derive(Debug)]
+pub struct ListCommand {
+    pub(crate) req: ListRequest,
+}
+
+impl StreamCommand for ListCommand {
+    type Response = Vec<String>;
+
+    fn request(&self) -> protocol::Request {
+        let dest = &self.req.dest;
+
+        protocol::Request::List {
+            collection: dest.collection().clone(),
+            bucket: dest
+                .bucket_opt()
+                .cloned()
+                .unwrap_or_else(|| String::from("default")),
+            limit: self.req.limit,
+            offset: self.req.offset,
+        }
+    }
+
+    fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
+        if let protocol::Response::Event(protocol::EventKind::List, _id, words) = res {
+            Ok(words)
+        } else {
+            Err(Error::WrongResponse)
+        }
+    }
+}

+ 22 - 0
sonic-channel-wasi/src/commands/ping.rs

@@ -0,0 +1,22 @@
+use super::StreamCommand;
+use crate::protocol;
+use crate::result::*;
+
+#[derive(Debug)]
+pub struct PingCommand;
+
+impl StreamCommand for PingCommand {
+    type Response = ();
+
+    fn request(&self) -> protocol::Request {
+        protocol::Request::Ping
+    }
+
+    fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
+        if matches!(res, protocol::Response::Pong) {
+            Ok(())
+        } else {
+            Err(Error::WrongResponse)
+        }
+    }
+}

+ 54 - 0
sonic-channel-wasi/src/commands/pop.rs

@@ -0,0 +1,54 @@
+use super::StreamCommand;
+use crate::misc::ObjDest;
+use crate::protocol;
+use crate::result::*;
+
+/// Parameters for the `pop` command.
+#[derive(Debug)]
+pub struct PopRequest {
+    /// Collection, bucket and object where we should pop search data from index.
+    pub dest: ObjDest,
+    /// Search data to be deleted
+    pub text: String,
+}
+
+impl PopRequest {
+    /// Creates a base pop request.
+    pub fn new(dest: ObjDest, text: impl ToString) -> Self {
+        Self {
+            dest,
+            text: text.to_string(),
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct PopCommand {
+    pub(crate) req: PopRequest,
+}
+
+impl StreamCommand for PopCommand {
+    type Response = usize;
+
+    fn request(&self) -> protocol::Request {
+        let dest = &self.req.dest;
+        protocol::Request::Pop {
+            collection: dest.collection().clone(),
+            bucket: dest
+                .bucket_opt()
+                .cloned()
+                // TODO: use a global context for default bucket value
+                .unwrap_or_else(|| String::from("default")),
+            object: dest.object().clone(),
+            terms: self.req.text.to_string(),
+        }
+    }
+
+    fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
+        if let protocol::Response::Result(count) = res {
+            Ok(count)
+        } else {
+            Err(Error::WrongResponse)
+        }
+    }
+}

+ 73 - 0
sonic-channel-wasi/src/commands/push.rs

@@ -0,0 +1,73 @@
+use super::StreamCommand;
+use crate::misc::ObjDest;
+use crate::protocol;
+use crate::result::*;
+
+/// Parameters for the `push` command.
+#[derive(Debug)]
+pub struct PushRequest {
+    /// Collection, bucket and object where we should push search data in the index.
+    pub dest: ObjDest,
+    /// Search data to be added
+    pub text: String,
+    /// Language of the search data. If None, the client will try to determine based on the `text`.
+    pub lang: Option<whatlang::Lang>,
+}
+
+impl PushRequest {
+    /// Creates a base push request
+    pub fn new(dest: ObjDest, text: impl ToString) -> Self {
+        Self {
+            dest,
+            text: text.to_string(),
+            lang: None,
+        }
+    }
+
+    /// Set a language for the request.
+    pub fn lang(mut self, lang: whatlang::Lang) -> Self {
+        self.lang = Some(lang);
+        self
+    }
+}
+
+#[derive(Debug)]
+pub struct PushCommand {
+    pub(crate) req: PushRequest,
+}
+
+impl StreamCommand for PushCommand {
+    type Response = ();
+
+    fn request(&self) -> protocol::Request {
+        let req = &self.req;
+
+        let lang = req
+            .lang
+            .or_else(|| {
+                whatlang::detect(&req.text).and_then(|i| (i.confidence() == 1.0).then(|| i.lang()))
+            })
+            .map(|l| l.code());
+
+        protocol::Request::Push {
+            collection: req.dest.collection().clone(),
+            bucket: req
+                .dest
+                .bucket_opt()
+                .cloned()
+                // TODO: use a global context for default bucket value
+                .unwrap_or_else(|| String::from("default")),
+            object: req.dest.object().clone(),
+            terms: req.text.to_string(),
+            lang,
+        }
+    }
+
+    fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
+        if matches!(res, protocol::Response::Ok) {
+            Ok(())
+        } else {
+            Err(Error::WrongResponse)
+        }
+    }
+}

+ 100 - 0
sonic-channel-wasi/src/commands/query.rs

@@ -0,0 +1,100 @@
+use super::StreamCommand;
+use crate::misc::Dest;
+use crate::protocol;
+use crate::result::*;
+
+/// Parameters for the `query` command
+#[derive(Debug, Clone)]
+pub struct QueryRequest {
+    /// Collection and bucket where we should search for objects.
+    pub dest: Dest,
+    /// Searchable terms.
+    pub terms: String,
+    /// Language of the search data. If None, the client will try to determine based on the `terms`.
+    pub lang: Option<whatlang::Lang>,
+    /// Limit of result objects.
+    pub limit: Option<usize>,
+    /// The number of result objects we want to skip.
+    pub offset: Option<usize>,
+}
+
+impl QueryRequest {
+    /// Creates base query request.
+    pub fn new(dest: Dest, terms: impl ToString) -> Self {
+        Self {
+            dest,
+            terms: terms.to_string(),
+            lang: None,
+            limit: None,
+            offset: None,
+        }
+    }
+
+    /// Set a language for the request.
+    pub fn lang(mut self, lang: whatlang::Lang) -> Self {
+        self.lang = Some(lang);
+        self
+    }
+
+    /// Set a limit for the request.
+    pub fn limit(mut self, limit: usize) -> Self {
+        self.limit = Some(limit);
+        self
+    }
+
+    /// Set an offset for the request.
+    pub fn offset(mut self, offset: usize) -> Self {
+        self.offset = Some(offset);
+        self
+    }
+
+    /// Set the pagination for the request. Automatic offset calculation based on provided
+    /// limit and page.
+    ///
+    /// Note: the first page is 0;
+    pub fn pag(self, page: usize, limit: usize) -> Self {
+        let offset = page * limit;
+        self.offset(offset).limit(limit)
+    }
+}
+
+#[derive(Debug)]
+pub struct QueryCommand {
+    pub(crate) req: QueryRequest,
+}
+
+impl StreamCommand for QueryCommand {
+    type Response = Vec<String>;
+
+    fn request(&self) -> protocol::Request {
+        let dest = &self.req.dest;
+        let lang = self
+            .req
+            .lang
+            .or_else(|| {
+                whatlang::detect(&self.req.terms)
+                    .and_then(|i| (i.confidence() == 1.0).then(|| i.lang()))
+            })
+            .map(|l| l.code());
+
+        protocol::Request::Query {
+            collection: dest.collection().clone(),
+            bucket: dest
+                .bucket_opt()
+                .cloned()
+                .unwrap_or_else(|| String::from("default")),
+            terms: self.req.terms.clone(),
+            offset: self.req.offset,
+            limit: self.req.limit,
+            lang,
+        }
+    }
+
+    fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
+        if let protocol::Response::Event(protocol::EventKind::Query, _id, objects) = res {
+            Ok(objects)
+        } else {
+            Err(Error::WrongResponse)
+        }
+    }
+}

+ 22 - 0
sonic-channel-wasi/src/commands/quit.rs

@@ -0,0 +1,22 @@
+use super::StreamCommand;
+use crate::protocol;
+use crate::result::*;
+
+#[derive(Debug)]
+pub struct QuitCommand;
+
+impl StreamCommand for QuitCommand {
+    type Response = ();
+
+    fn request(&self) -> protocol::Request {
+        protocol::Request::Quit
+    }
+
+    fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
+        if matches!(res, protocol::Response::Ended) {
+            Ok(())
+        } else {
+            Err(Error::WrongResponse)
+        }
+    }
+}

+ 44 - 0
sonic-channel-wasi/src/commands/start.rs

@@ -0,0 +1,44 @@
+use super::StreamCommand;
+use crate::channels::ChannelMode;
+use crate::protocol;
+use crate::result::*;
+
+#[derive(Debug)]
+pub struct StartCommand {
+    pub(crate) mode: ChannelMode,
+    pub(crate) password: String,
+}
+
+#[derive(Debug)]
+pub struct StartCommandResponse {
+    pub protocol_version: protocol::Version,
+    pub max_buffer_size: usize,
+    pub mode: ChannelMode,
+}
+
+impl StreamCommand for StartCommand {
+    type Response = StartCommandResponse;
+
+    fn request(&self) -> protocol::Request {
+        protocol::Request::Start {
+            mode: self.mode,
+            password: self.password.to_string(),
+        }
+    }
+
+    fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
+        if let protocol::Response::Started(payload) = res {
+            Ok(StartCommandResponse {
+                protocol_version: payload
+                    .protocol_version
+                    .try_into()
+                    // TODO: better error
+                    .map_err(|_| Error::SwitchMode)?,
+                max_buffer_size: payload.max_buffer_size,
+                mode: self.mode,
+            })
+        } else {
+            Err(Error::SwitchMode)
+        }
+    }
+}

+ 63 - 0
sonic-channel-wasi/src/commands/suggest.rs

@@ -0,0 +1,63 @@
+use super::StreamCommand;
+use crate::misc::Dest;
+use crate::protocol;
+use crate::result::*;
+
+/// Parameters for the `suggest` command.
+#[derive(Debug)]
+pub struct SuggestRequest {
+    /// Collection and bucket where we should search for suggested words.
+    pub dest: Dest,
+    /// Base word.
+    pub word: String,
+    /// Limit of result words.
+    pub limit: Option<usize>,
+}
+
+impl SuggestRequest {
+    /// Creates a base suggest request.
+    pub fn new(dest: Dest, word: impl ToString) -> Self {
+        Self {
+            dest,
+            word: word.to_string(),
+            limit: None,
+        }
+    }
+
+    /// Set a limit for the request.
+    pub fn limit(mut self, limit: usize) -> Self {
+        self.limit = Some(limit);
+        self
+    }
+}
+
+#[derive(Debug)]
+pub struct SuggestCommand {
+    pub(crate) req: SuggestRequest,
+}
+
+impl StreamCommand for SuggestCommand {
+    type Response = Vec<String>;
+
+    fn request(&self) -> protocol::Request {
+        let dest = &self.req.dest;
+
+        protocol::Request::Suggest {
+            collection: dest.collection().clone(),
+            bucket: dest
+                .bucket_opt()
+                .cloned()
+                .unwrap_or_else(|| String::from("default")),
+            word: self.req.word.to_string(),
+            limit: self.req.limit,
+        }
+    }
+
+    fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
+        if let protocol::Response::Event(protocol::EventKind::Suggest, _id, words) = res {
+            Ok(words)
+        } else {
+            Err(Error::WrongResponse)
+        }
+    }
+}

+ 47 - 0
sonic-channel-wasi/src/commands/trigger.rs

@@ -0,0 +1,47 @@
+use super::StreamCommand;
+use crate::protocol;
+use crate::result::*;
+use std::path::PathBuf;
+
+/// Parameters for the `trigger` command.
+#[derive(Debug)]
+pub enum TriggerRequest<'a> {
+    /// Consolidate indexed search data instead of waiting for the next automated
+    /// consolidation tick.
+    Consolidate,
+
+    /// Backup KV + FST to <path>/<BACKUP_{KV/FST}_PATH>
+    /// See [sonic backend source code](https://github.com/valeriansaliou/sonic/blob/master/src/channel/command.rs#L808)
+    /// for more information.
+    Backup(&'a str),
+
+    /// Restore KV + FST from <path> if you already have backup with the same name.
+    Restore(&'a str),
+}
+
+#[derive(Debug)]
+pub struct TriggerCommand<'a> {
+    pub(crate) req: TriggerRequest<'a>,
+}
+
+impl StreamCommand for TriggerCommand<'_> {
+    type Response = ();
+
+    fn request(&self) -> protocol::Request {
+        let req = match self.req {
+            TriggerRequest::Consolidate => protocol::TriggerRequest::Consolidate,
+            TriggerRequest::Backup(path) => protocol::TriggerRequest::Backup(PathBuf::from(path)),
+            TriggerRequest::Restore(path) => protocol::TriggerRequest::Restore(PathBuf::from(path)),
+        };
+
+        protocol::Request::Trigger(req)
+    }
+
+    fn receive(&self, res: protocol::Response) -> Result<Self::Response> {
+        if matches!(res, protocol::Response::Ok) {
+            Ok(())
+        } else {
+            Err(Error::WrongResponse)
+        }
+    }
+}

+ 112 - 0
sonic-channel-wasi/src/lib.rs

@@ -0,0 +1,112 @@
+//! # Sonic Channel
+//! Rust client for [sonic] search backend.
+//!
+//!
+//! ## Example usage
+//!
+//! ### Search channel
+//!
+//! Note: This example requires enabling the `search` feature, enabled by default.
+//!
+//! ```rust,no_run
+//! use sonic_channel::*;
+//!
+//! fn main() -> result::Result<()> {
+//!     let channel = SearchChannel::start(
+//!         "localhost:1491",
+//!         "SecretPassword",
+//!     )?;
+//!
+//!     let objects = channel.query(QueryRequest::new(
+//!         Dest::col_buc("collection", "bucket"),
+//!         "recipe",
+//!     ))?;
+//!     dbg!(objects);
+//!
+//!     Ok(())
+//! }
+//! ```
+//!
+//! ### Ingest channel
+//!
+//! Note: This example requires enabling the `ingest` feature.
+//!
+//! ```rust,no_run
+//! use sonic_channel::*;
+//!
+//! fn main() -> result::Result<()> {
+//!     let channel = IngestChannel::start(
+//!         "localhost:1491",
+//!         "SecretPassword",
+//!     )?;
+//!
+//!     let dest = Dest::col_buc("collection", "bucket").obj("object:1");
+//!     let pushed = channel.push(PushRequest::new(dest, "my best recipe"))?;
+//!     // or
+//!     // let pushed = channel.push(
+//!     //     PushRequest::new(dest, "Мой лучший рецепт").lang(Lang::Rus)
+//!     // )?;
+//!     dbg!(pushed);
+//!
+//!     Ok(())
+//! }
+//! ```
+//!
+//! ### Control channel
+//!
+//! Note: This example requires enabling the `control` feature.
+//!
+//! ```rust,no_run
+//! use sonic_channel::*;
+//!
+//! fn main() -> result::Result<()> {
+//!     let channel = ControlChannel::start(
+//!         "localhost:1491",
+//!         "SecretPassword",
+//!     )?;
+//!
+//!     let result = channel.consolidate()?;
+//!     assert_eq!(result, ());
+//!
+//!     Ok(())
+//! }
+//! ```
+//!
+//! [sonic]: https://github.com/valeriansaliou/sonic
+
+// Rustc lints.
+#![deny(
+    missing_debug_implementations,
+    unsafe_code,
+    unstable_features,
+    unused_imports,
+    unused_qualifications
+)]
+#![warn(missing_docs)]
+// Clippy lints
+#![deny(clippy::all)]
+
+#[cfg(not(any(feature = "ingest", feature = "search", feature = "control")))]
+compile_error!(
+    r#"Either features "ingest" or "search" or "control" must be enabled for "sonic-channel" crate"#
+);
+
+#[macro_use]
+mod macroses;
+mod misc;
+
+pub(crate) mod protocol;
+
+mod channels;
+
+/// Contains the request parameters for each command to the sonic server.
+pub mod commands;
+
+/// Contains sonic channel error type and custom Result type for easy configure your functions.
+pub mod result;
+
+pub use channels::*;
+pub use commands::*;
+pub use misc::*;
+
+pub use whatlang::Lang;

+ 21 - 0
sonic-channel-wasi/src/macroses.rs

@@ -0,0 +1,21 @@
+macro_rules! init_command {
+    (
+        $(#[$outer:meta])*
+        use $cmd_name:ident
+        for fn $fn_name:ident $(<$($lt:lifetime)+>)? (
+            $($arg_name:ident : $arg_type:ty $( => $arg_value:expr)?,)*
+        )
+        $(;)?
+    ) => {
+        $(#[$outer])*
+        pub fn $fn_name $(<$($lt)+>)? (
+            &self,
+            $($arg_name: $arg_type),*
+        ) -> $crate::result::Result<
+            <$cmd_name as $crate::commands::StreamCommand>::Response,
+        > {
+            let command = $cmd_name { $($arg_name $(: $arg_value)?,)* };
+            self.stream().run_command(command)
+        }
+    };
+}

+ 169 - 0
sonic-channel-wasi/src/misc.rs

@@ -0,0 +1,169 @@
+/// Search data destination. Contains collection, bucket and object.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct ObjDest(Dest, String);
+
+impl ObjDest {
+    /// Creates a new object destination from base destination (`Dest`) and object id.
+    ///
+    /// ```rust
+    /// # use sonic_channel::{Dest, ObjDest};
+    /// let base_dest = Dest::col_buc("wiki", "user:1");
+    /// let dest = ObjDest::new(base_dest, "article:1");
+    /// assert_eq!(dest.collection(), "wiki");
+    /// assert_eq!(dest.bucket_opt(), Some(&String::from("user:1")));
+    /// assert_eq!(dest.object(), "article:1");
+    /// ```
+    pub fn new(cb: Dest, o: impl ToString) -> Self {
+        Self(cb, o.to_string())
+    }
+
+    /// Returns the collection.
+    #[inline]
+    pub fn collection(&self) -> &String {
+        self.0.collection()
+    }
+
+    /// Returns the optional bucket.
+    #[inline]
+    pub fn bucket_opt(&self) -> Option<&String> {
+        self.0.bucket_opt()
+    }
+
+    /// Returns the object id.
+    #[inline]
+    pub fn object(&self) -> &String {
+        &self.1
+    }
+}
+
+/// Search objects destination. Contains collection and bucket.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct Dest {
+    collection: String,
+    bucket: Option<String>,
+}
+
+impl Dest {
+    /// Creates a new destination with collection and bucket.
+    ///
+    /// ```rust
+    /// # use sonic_channel::Dest;
+    /// let dest = Dest::col_buc("wiki", "user:1");
+    /// assert_eq!(dest.collection(), "wiki");
+    /// assert_eq!(dest.bucket_opt(), Some(&String::from("user:1")));
+    /// ```
+    pub fn col_buc(c: impl ToString, b: impl ToString) -> Self {
+        Self::col(c).buc(b)
+    }
+
+    /// Creates a new destination with collection.
+    ///
+    /// ```rust
+    /// # use sonic_channel::Dest;
+    /// let dest = Dest::col("wiki");
+    /// assert_eq!(dest.collection(), "wiki");
+    /// ```
+    pub fn col(c: impl ToString) -> Self {
+        Self {
+            collection: c.to_string(),
+            bucket: None,
+        }
+    }
+
+    /// Set bucket for the destination.
+    ///
+    /// ```rust
+    /// # use sonic_channel::Dest;
+    /// let dest = Dest::col("wiki").buc("user:1");
+    /// assert_eq!(dest.collection(), "wiki");
+    /// assert_eq!(dest.bucket_opt(), Some(&String::from("user:1")));
+    /// ```
+    pub fn buc(mut self, b: impl ToString) -> Self {
+        self.bucket = Some(b.to_string());
+        self
+    }
+
+    /// Set object id to the destination and transform to object destination (`ObjDest`).
+    ///
+    /// Short for `ObjDest::new(dest, object_id)`
+    ///
+    /// ```rust
+    /// # use sonic_channel::Dest;
+    /// let dest = Dest::col_buc("wiki", "user:1").obj("article:1");
+    /// assert_eq!(dest.collection(), "wiki");
+    /// assert_eq!(dest.bucket_opt(), Some(&String::from("user:1")));
+    /// assert_eq!(dest.object(), "article:1");
+    /// ```
+    pub fn obj(self, o: impl ToString) -> ObjDest {
+        ObjDest::new(self, o)
+    }
+
+    /// Returns the collection.
+    #[inline]
+    pub fn collection(&self) -> &String {
+        &self.collection
+    }
+
+    /// Returns the optional bucket.
+    #[inline]
+    pub fn bucket_opt(&self) -> Option<&String> {
+        self.bucket.as_ref()
+    }
+}
+
+#[cfg(feature = "ingest")]
+#[derive(Debug)]
+pub(crate) struct OptDest {
+    pub(crate) collection: String,
+    pub(crate) bucket: Option<String>,
+    pub(crate) object: Option<String>,
+}
+
+#[cfg(feature = "ingest")]
+impl OptDest {
+    pub(crate) fn col(c: impl ToString) -> Self {
+        Self {
+            collection: c.to_string(),
+            bucket: None,
+            object: None,
+        }
+    }
+
+    pub(crate) fn col_buc(c: impl ToString, b: impl ToString) -> Self {
+        Self {
+            collection: c.to_string(),
+            bucket: Some(b.to_string()),
+            object: None,
+        }
+    }
+
+    pub(crate) fn col_buc_obj(c: impl ToString, b: impl ToString, o: impl ToString) -> Self {
+        Self {
+            collection: c.to_string(),
+            bucket: Some(b.to_string()),
+            object: Some(o.to_string()),
+        }
+    }
+}
+
+#[cfg(feature = "ingest")]
+impl From<Dest> for OptDest {
+    fn from(d: Dest) -> Self {
+        Self {
+            collection: d.collection,
+            bucket: d.bucket,
+            object: None,
+        }
+    }
+}
+
+#[cfg(feature = "ingest")]
+impl From<ObjDest> for OptDest {
+    fn from(ObjDest(dest, obj): ObjDest) -> Self {
+        Self {
+            collection: dest.collection,
+            bucket: dest.bucket,
+            object: Some(obj),
+        }
+    }
+}

+ 340 - 0
sonic-channel-wasi/src/protocol.rs

@@ -0,0 +1,340 @@
+use std::io::{self, BufWriter, Write};
+use std::{path::PathBuf, str::FromStr};
+
+use crate::{result::*, ChannelMode};
+
+#[derive(Debug, Default)]
+pub struct Protocol {
+    #[allow(dead_code)]
+    version: Version,
+}
+
+impl From<Version> for Protocol {
+    fn from(version: Version) -> Self {
+        Self { version }
+    }
+}
+
+impl Protocol {
+    pub fn format_request(&self, req: Request) -> io::Result<Vec<u8>> {
+        let mut res = BufWriter::new(Vec::new());
+
+        match req {
+            Request::Quit => write!(res, "QUIT")?,
+
+            Request::Ping => write!(res, "PING")?,
+
+            Request::Start { mode, password } => write!(res, "START {} {}", mode, password)?,
+
+            #[rustfmt::skip]
+            Request::Count { collection, bucket, object } => match (bucket, object) {
+                (Some(b), Some(o)) => write!(res, "COUNT {} {} {}", collection, b, o)?,
+                (Some(b), None) => write!(res, "COUNT {} {}", collection, b)?,
+                (None, None) => write!(res, "COUNT {}", collection)?,
+                _ => panic!("Wrong protocol format"),
+            },
+
+            #[rustfmt::skip]
+            Request::Flush { collection, bucket, object } => match (bucket, object) {
+                (Some(b), Some(o)) => write!(res, "FLUSHO {} {} {}", collection, b, o)?,
+                (Some(b), None) => write!(res, "FLUSHB {} {}", collection, b)?,
+                (None, None) => write!(res, "FLUSHC {}", collection)?,
+                _ => panic!("Wrong protocol format"),
+            },
+
+            #[rustfmt::skip]
+            Request::Pop { collection, bucket, object, terms } => {
+                write!(res, "POP {} {} {} \"{}\"", collection, bucket, object, terms)?
+            },
+            #[rustfmt::skip]
+            Request::Push { collection, bucket, object, terms, lang } => {
+                let oneline_terms = remove_multiline(&terms);
+                write!(res, "PUSH {} {} {} \"{}\"", collection, bucket, object, oneline_terms)?;
+                if let Some(lang) = lang {
+                    write!(res, " LANG({})", lang)?
+                }
+            }
+
+            #[rustfmt::skip]
+            Request::Query { collection, bucket, terms, offset, limit, lang } => {
+                write!(res, "QUERY {} {} \"{}\"", collection, bucket, terms)?;
+                if let Some(limit) = limit {
+                    write!(res, " LIMIT({})", limit)?;
+                }
+                if let Some(offset) = offset {
+                    write!(res, " OFFSET({})", offset)?;
+                }
+                if let Some(lang) = lang {
+                    write!(res, " LANG({})", lang)?;
+                }
+            }
+            #[rustfmt::skip]
+            Request::Suggest { collection, bucket, word, limit } => {
+                write!(res, "SUGGEST {} {} \"{}\"", collection, bucket, word)?;
+                if let Some(limit) = limit {
+                    write!(res, " LIMIT({})", limit)?;
+                }
+            }
+
+            #[rustfmt::skip]
+            Request::List { collection, bucket, limit, offset } => {
+                write!(res, "LIST {} {}", collection, bucket)?;
+                if let Some(limit) = limit {
+                    write!(res, " LIMIT({})", limit)?;
+                }
+                if let Some(offset) = offset {
+                    write!(res, " OFFSET({})", offset)?;
+                }
+            }
+
+            Request::Trigger(triger_req) => match triger_req {
+                TriggerRequest::Consolidate => write!(res, "TRIGGER consolidate")?,
+                TriggerRequest::Backup(path) => {
+                    write!(res, "TRIGGER backup {}", path.to_str().unwrap())?
+                }
+                TriggerRequest::Restore(path) => {
+                    write!(res, "TRIGGER restore {}", path.to_str().unwrap())?
+                }
+            },
+        }
+
+        write!(res, "\r\n")?;
+        res.flush()?;
+
+        Ok(res.into_inner()?)
+    }
+
+    pub fn parse_response(&self, line: &str) -> Result<Response> {
+        let mut segments = line.split_whitespace();
+        match segments.next() {
+            Some("STARTED") => match (segments.next(), segments.next(), segments.next()) {
+                (Some(_raw_mode), Some(raw_protocol), Some(raw_buffer_size)) => {
+                    Ok(Response::Started(StartedPayload {
+                        protocol_version: parse_server_config(raw_protocol)?,
+                        max_buffer_size: parse_server_config(raw_buffer_size)?,
+                    }))
+                }
+                _ => Err(Error::WrongResponse),
+            },
+            Some("PENDING") => {
+                let event_id = segments
+                    .next()
+                    .map(String::from)
+                    .ok_or(Error::WrongResponse)?;
+                Ok(Response::Pending(event_id))
+            }
+            Some("RESULT") => match segments.next() {
+                Some(num) => num
+                    .parse()
+                    .map(Response::Result)
+                    .map_err(|_| Error::WrongResponse),
+                _ => Err(Error::WrongResponse),
+            },
+            Some("EVENT") => {
+                let event_kind = match segments.next() {
+                    Some("SUGGEST") => Ok(EventKind::Suggest),
+                    Some("QUERY") => Ok(EventKind::Query),
+                    Some("LIST") => Ok(EventKind::List),
+                    _ => Err(Error::WrongResponse),
+                }?;
+
+                let event_id = segments
+                    .next()
+                    .map(String::from)
+                    .ok_or(Error::WrongResponse)?;
+
+                let objects = segments.map(String::from).collect();
+
+                Ok(Response::Event(event_kind, event_id, objects))
+            }
+            Some("OK") => Ok(Response::Ok),
+            Some("ENDED") => Ok(Response::Ended),
+            Some("CONNECTED") => Ok(Response::Connected),
+            Some("ERR") => match segments.next() {
+                Some(message) => Err(Error::SonicServer(String::from(message))),
+                _ => Err(Error::WrongResponse),
+            },
+            _ => Err(Error::WrongResponse),
+        }
+    }
+}
+
+//===========================================================================//
+// Primitives                                                                //
+//===========================================================================//
+
+#[derive(Debug, PartialEq, Eq)]
+#[repr(u8)]
+pub enum Version {
+    V1 = 1,
+}
+
+impl Default for Version {
+    fn default() -> Self {
+        Self::V1
+    }
+}
+
+impl TryFrom<u8> for Version {
+    type Error = ();
+
+    fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
+        match value {
+            1 => Ok(Self::V1),
+            _ => Err(()),
+        }
+    }
+}
+
+//===========================================================================//
+// Response                                                                  //
+//===========================================================================//
+
+pub type EventId = String;
+
+#[derive(Debug)]
+pub enum Response {
+    Ok,
+    Ended,
+    Connected,
+    Pending(EventId),
+    Pong,
+    Started(StartedPayload),
+    Result(usize),
+    Event(EventKind, EventId, Vec<String>),
+}
+
+#[derive(Debug)]
+pub struct StartedPayload {
+    pub protocol_version: u8,
+    pub max_buffer_size: usize,
+}
+
+#[derive(Debug)]
+pub enum EventKind {
+    Suggest,
+    Query,
+    List,
+}
+
+//===========================================================================//
+// Request                                                                   //
+//===========================================================================//
+
+#[derive(Debug)]
+pub enum Request {
+    Start {
+        mode: ChannelMode,
+        password: String,
+    },
+    Quit,
+    Ping,
+    Trigger(TriggerRequest),
+    Suggest {
+        collection: String,
+        bucket: String,
+        word: String,
+        limit: Option<usize>,
+    },
+    List {
+        collection: String,
+        bucket: String,
+        limit: Option<usize>,
+        offset: Option<usize>,
+    },
+    Query {
+        collection: String,
+        bucket: String,
+        terms: String,
+        offset: Option<usize>,
+        limit: Option<usize>,
+        lang: Option<&'static str>,
+    },
+    Push {
+        collection: String,
+        bucket: String,
+        object: String,
+        terms: String,
+        lang: Option<&'static str>,
+    },
+    Pop {
+        collection: String,
+        bucket: String,
+        object: String,
+        terms: String,
+    },
+    Flush {
+        collection: String,
+        bucket: Option<String>,
+        object: Option<String>,
+    },
+    Count {
+        collection: String,
+        bucket: Option<String>,
+        object: Option<String>,
+    },
+}
+
+#[derive(Debug)]
+pub enum TriggerRequest {
+    Consolidate,
+    Backup(PathBuf),
+    Restore(PathBuf),
+}
+
+//===========================================================================//
+// Utils                                                                     //
+//===========================================================================//
+
+fn parse_server_config<T: FromStr>(raw: &str) -> Result<T> {
+    raw.split_terminator(&['(', ')'])
+        .nth(1)
+        .ok_or(Error::WrongResponse)?
+        .parse()
+        .map_err(|_| Error::WrongResponse)
+}
+
+fn remove_multiline(text: &str) -> String {
+    text.lines()
+        .enumerate()
+        .fold(String::new(), |mut acc, (i, line)| {
+            if i != 0 && !line.is_empty() && !acc.is_empty() && !acc.ends_with(' ') {
+                acc.push(' ');
+            }
+
+            acc.push_str(line);
+            acc
+        })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn should_parse_protocol() {
+        match parse_server_config::<u8>("protocol(1)") {
+            Ok(protocol) => assert_eq!(protocol, 1),
+            _ => unreachable!(),
+        }
+    }
+
+    #[test]
+    fn should_parse_buffer_size() {
+        match parse_server_config::<usize>("buffer_size(20000)") {
+            Ok(buffer_size) => assert_eq!(buffer_size, 20000),
+            _ => unreachable!(),
+        }
+    }
+
+    #[test]
+    fn should_make_single_line() {
+        let text = "
+Hello
+World
+";
+
+        let expected_text = "Hello World";
+        assert_eq!(remove_multiline(text), expected_text);
+    }
+}

+ 77 - 0
sonic-channel-wasi/src/result.rs

@@ -0,0 +1,77 @@
+use crate::channels::ChannelMode;
+
+/// Sugar if you expect only sonic-channel error type in result
+pub type Result<T> = std::result::Result<T, Error>;
+
+/// Wrap for sonic channel error kind. This type has std::error::Error
+/// implementation and you can use boxed trait for catch other errors
+/// like this.
+
+/// All error kinds that you can see in sonic-channel crate.
+#[derive(Debug)]
+pub enum Error {
+    /// Cannot connect to the sonic search backend.
+    ConnectToServer,
+
+    /// Cannot write message to stream.
+    WriteToStream,
+
+    /// Cannot read message in stream.
+    ReadStream,
+
+    /// Cannot switch channel mode from uninitialized.
+    SwitchMode,
+
+    /// Cannot run command in current mode.
+    RunCommand,
+
+    /// Error in query response with additional message.
+    QueryResponse(&'static str),
+
+    /// Response from sonic server are wrong! Actually it may happen if you use
+    /// unsupported sonic backend version. Please write issue to the github repo.
+    WrongResponse,
+
+    /// You cannot run the command in current channel.
+    UnsupportedCommand((&'static str, Option<ChannelMode>)),
+
+    /// This error appears if the error occurred on the server side
+    SonicServer(String),
+}
+
+impl std::fmt::Display for Error {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        use Error::*;
+        match self {
+            ConnectToServer => f.write_str("Cannot connect to server"),
+            WriteToStream => f.write_str("Cannot write data to stream"),
+            ReadStream => f.write_str("Cannot read sonic response from stream"),
+            SwitchMode => f.write_str("Cannot switch channel mode"),
+            RunCommand => f.write_str("Cannot run command in current mode"),
+            QueryResponse(message) => {
+                write!(f, "Error in query response: {}", message)
+            }
+            WrongResponse => {
+                write!(f, "Client cannot parse response from sonic server. Please write an issue to github (https://github.com/pleshevskiy/sonic-channel).")
+            }
+            UnsupportedCommand((command_name, channel_mode)) => {
+                if let Some(channel_mode) = channel_mode {
+                    write!(
+                        f,
+                        "You cannot use `{}` command in {} sonic channel mode",
+                        command_name, channel_mode
+                    )
+                } else {
+                    write!(
+                        f,
+                        "You need to connect to sonic channel before use {} command",
+                        command_name
+                    )
+                }
+            }
+            SonicServer(message) => write!(f, "Sonic Server-side error: {}", message),
+        }
+    }
+}
+
+impl std::error::Error for Error {}

+ 28 - 0
sonic-channel-wasi/tests/common.rs

@@ -0,0 +1,28 @@
+#![allow(dead_code)]
+
+pub use sonic_channel::*;
+
+pub const HOST: &str = "localhost:36999";
+pub const PASS: &str = "SecretPassword1234";
+
+pub fn ingest_start() -> IngestChannel {
+    IngestChannel::start(HOST, PASS).expect("The Sonic server must be running")
+}
+
+pub fn search_start() -> SearchChannel {
+    SearchChannel::start(HOST, PASS).expect("The Sonic server must be running")
+}
+
+pub fn control_start() -> ControlChannel {
+    ControlChannel::start(HOST, PASS).expect("The Sonic server must be running")
+}
+
+pub fn consolidate() {
+    control_start().consolidate().unwrap();
+}
+
+pub fn flush_bucket(collection: &str, bucket: &str) {
+    ingest_start()
+        .flush(FlushRequest::bucket(collection, bucket))
+        .unwrap();
+}

+ 27 - 0
sonic-channel-wasi/tests/list_command.rs

@@ -0,0 +1,27 @@
+mod common;
+use common::*;
+
+const COLLECTION: &str = "Search";
+
+#[test]
+fn should_list_all_words() {
+    let bucket = "suggest_nearest";
+    let title = "Sweet Teriyaki Beef Skewers";
+
+    let dest = Dest::col_buc(COLLECTION, bucket);
+
+    let ingest_channel = ingest_start();
+    ingest_channel
+        .push(PushRequest::new(dest.clone().obj("1"), title))
+        .unwrap();
+
+    consolidate();
+
+    let search_channel = search_start();
+    match search_channel.list(ListRequest::new(dest.clone())) {
+        Ok(object_ids) => assert_eq!(object_ids, vec!["beef", "skewers", "sweet", "teriyaki"]),
+        Err(_) => unreachable!(),
+    }
+
+    flush_bucket(COLLECTION, bucket);
+}

+ 60 - 0
sonic-channel-wasi/tests/push_command.rs

@@ -0,0 +1,60 @@
+mod common;
+use common::*;
+
+const COLLECTION: &str = "Ingest";
+
+#[test]
+fn should_push_new_object_to_sonic() {
+    let bucket = "push_simple";
+
+    let dest = Dest::col_buc(COLLECTION, bucket);
+
+    let ingest_channel = ingest_start();
+    match ingest_channel.push(PushRequest::new(
+        dest.obj("1"),
+        "Sweet Teriyaki Beef Skewers",
+    )) {
+        Ok(()) => {}
+        _ => unreachable!(),
+    }
+
+    flush_bucket(COLLECTION, bucket);
+}
+
+#[test]
+fn should_push_new_object_to_sonic_with_russian_locale() {
+    let bucket = "push_locale";
+
+    let dest = Dest::col_buc(COLLECTION, bucket);
+
+    let ingest_channel = ingest_start();
+    match ingest_channel.push(
+        PushRequest::new(dest.obj("1"), "Открытый пирог с орехами и сгущенкой").lang(Lang::Rus),
+    ) {
+        Ok(()) => {}
+        _ => unreachable!(),
+    }
+
+    flush_bucket(COLLECTION, bucket);
+}
+
+#[test]
+fn should_push_multiline_text() {
+    let bucket = "push_multiline";
+    let multiline_text = "
+Sweet
+Teriyaki
+Beef
+Skewers
+";
+
+    let dest = Dest::col_buc(COLLECTION, bucket);
+
+    let ingest_channel = ingest_start();
+    match ingest_channel.push(PushRequest::new(dest.obj("1"), multiline_text)) {
+        Ok(()) => {}
+        _ => unreachable!(),
+    }
+
+    flush_bucket(COLLECTION, bucket);
+}

+ 138 - 0
sonic-channel-wasi/tests/query_command.rs

@@ -0,0 +1,138 @@
+mod common;
+use common::*;
+
+const COLLECTION: &str = "Search";
+
+#[test]
+fn should_find_object_by_exact_match() {
+    let bucket = "query_by_exact_match";
+    let title = "Sweet Teriyaki Beef Skewers";
+
+    let dest = Dest::col_buc(COLLECTION, bucket);
+
+    let ingest_channel = ingest_start();
+    ingest_channel
+        .push(PushRequest::new(dest.clone().obj("1"), title))
+        .unwrap();
+
+    consolidate();
+
+    let search_channel = search_start();
+    match search_channel.query(QueryRequest::new(dest, title)) {
+        Ok(object_ids) => assert_eq!(object_ids, vec![String::from("1")]),
+        Err(_) => unreachable!(),
+    }
+
+    flush_bucket(COLLECTION, bucket);
+}
+
+#[test]
+fn should_find_multiline_object_by_partial_match() {
+    let bucket = "query_multiline";
+    let multiline_text = "
+Sweet
+Teriyaki
+Beef
+Skewers
+None";
+
+    let dest = Dest::col_buc(COLLECTION, bucket);
+
+    let ingest_channel = ingest_start();
+    ingest_channel
+        .push(PushRequest::new(dest.clone().obj("1"), multiline_text))
+        .unwrap();
+
+    consolidate();
+
+    let words = ["Sweet", "Teriyaki", "Beef", "Skewers"];
+    let search_channel = search_start();
+    for word in words {
+        match search_channel.query(QueryRequest::new(dest.clone(), word)) {
+            Ok(object_ids) => assert_eq!(object_ids, vec![String::from("1")]),
+            Err(_) => unreachable!(),
+        }
+    }
+
+    flush_bucket(COLLECTION, bucket);
+}
+
+#[test]
+fn should_find_many_objects() {
+    let bucket = "query_many_objects";
+
+    let dest = Dest::col_buc(COLLECTION, bucket);
+
+    let ingest_channel = ingest_start();
+    ingest_channel
+        .push(PushRequest::new(
+            dest.clone().obj("1"),
+            "Sweet Teriyaki Beef Skewers",
+        ))
+        .unwrap();
+    ingest_channel
+        .push(PushRequest::new(
+            dest.clone().obj("2"),
+            "Slow Cooker Beef Stew I",
+        ))
+        .unwrap();
+    ingest_channel
+        .push(PushRequest::new(
+            dest.clone().obj("3"),
+            "Christmas Prime Rib",
+        ))
+        .unwrap();
+
+    consolidate();
+
+    let search_channel = search_start();
+    match search_channel.query(QueryRequest::new(dest, "Beef")) {
+        Ok(object_ids) => assert_eq!(object_ids, vec!["2", "1"]),
+        Err(_) => unreachable!(),
+    }
+
+    flush_bucket(COLLECTION, bucket);
+}
+
+#[test]
+fn should_find_limited_objects() {
+    let bucket = "query_limited_objects";
+
+    let dest = Dest::col_buc(COLLECTION, bucket);
+
+    let ingest_channel = ingest_start();
+    ingest_channel
+        .push(PushRequest::new(
+            dest.clone().obj("1"),
+            "Sweet Teriyaki Beef Skewers",
+        ))
+        .unwrap();
+    ingest_channel
+        .push(PushRequest::new(
+            dest.clone().obj("2"),
+            "Slow Cooker Beef Stew I",
+        ))
+        .unwrap();
+    ingest_channel
+        .push(PushRequest::new(
+            dest.clone().obj("3"),
+            "Christmas Prime Rib",
+        ))
+        .unwrap();
+
+    consolidate();
+
+    let search_channel = search_start();
+    match search_channel.query(QueryRequest::new(dest.clone(), "Beef").limit(1)) {
+        Ok(object_ids) => assert_eq!(object_ids, vec!["2"]),
+        Err(_) => unreachable!(),
+    }
+
+    let search_channel = search_start();
+    match search_channel.query(QueryRequest::new(dest, "Beef").pag(1, 1)) {
+        Ok(object_ids) => assert_eq!(object_ids, vec!["1"]),
+        Err(_) => unreachable!(),
+    }
+
+    flush_bucket(COLLECTION, bucket);
+}

+ 36 - 0
sonic-channel-wasi/tests/suggest_command.rs

@@ -0,0 +1,36 @@
+mod common;
+use common::*;
+
+const COLLECTION: &str = "Search";
+
+#[test]
+fn should_suggest_nearest_word() {
+    let bucket = "suggest_nearest";
+    let title = "Sweet Teriyaki Beef Skewers";
+
+    let dest = Dest::col_buc(COLLECTION, bucket);
+
+    let ingest_channel = ingest_start();
+    ingest_channel
+        .push(PushRequest::new(dest.clone().obj("1"), title))
+        .unwrap();
+
+    consolidate();
+
+    let pairs = [
+        ("Sweat", "sweet"),
+        ("teriaki", "teriyaki"),
+        ("Beff", "beef"),
+        ("skwers", "skewers"),
+    ];
+
+    let search_channel = search_start();
+    for (input, expected) in pairs {
+        match search_channel.suggest(SuggestRequest::new(dest.clone(), input)) {
+            Ok(object_ids) => assert_eq!(object_ids, vec![expected]),
+            Err(_) => unreachable!(),
+        }
+    }
+
+    flush_bucket(COLLECTION, bucket);
+}

+ 7 - 1
src/config.rs

@@ -36,9 +36,12 @@ pub struct Config {
     pub username: String,
     pub password: String,
     pub api_addr: String,
-    pub api_port: String,
+    pub api_port: String, // TODO make int instead of string?
     pub smtp_domain: String,
     pub smtp_port: u32,
+    pub sonic_search_addr: String,
+    pub sonic_search_port: u32,
+    pub sonic_search_password: String,
 }
 
 impl Config {
@@ -58,6 +61,9 @@ impl Config {
             "api_port" => self.api_port = value.to_string(),
             "smtp_domain" => self.smtp_domain = value.to_string(),
             "smtp_port" => self.smtp_port = value.parse().unwrap(),
+            "sonic_search_addr" => self.sonic_search_addr = value.to_string(),
+            "sonic_search_port" => self.sonic_search_port = value.parse().unwrap(),
+            "sonic_search_password" => self.sonic_search_password = value.to_string(),
             _ => {}
         }
     }

+ 19 - 9
src/main.rs

@@ -20,6 +20,7 @@ use crate::util::{compress_and_save_file};
 use crate::imap::{delete_folder, rename_folder, create_folder, check_for_updates};
 use crate::js::email_scripts;
 use warp::Filter;
+use crate::sonic::{IngestSonic, SearchSonic};
 
 mod smtp_client;
 
@@ -32,6 +33,7 @@ mod indexes;
 mod imap;
 mod server;
 mod js;
+mod sonic;
 
 pub fn append_ext(ext: impl AsRef<OsStr>, path: &PathBuf) -> PathBuf {
     let mut os_string: OsString = path.into();
@@ -161,12 +163,22 @@ fn persist_email(msg: &Message, uid: u32, list: String, original_path: PathBuf)
         };
     }
 
-    // xml
-    let xml = append_ext("xml", &message_xml_dir.join(message.hash.clone()));
-    let mut file = File::create(xml)?;
-    let data = message.to_xml();
-    file.write_all(data.as_bytes())?;
-    file.flush()?;
+    // // xml
+    // let xml = append_ext("xml", &message_xml_dir.join(message.hash.clone()));
+    // let mut file = File::create(xml)?;
+    // let data = message.to_xml();
+    // file.write_all(data.as_bytes())?;
+    // file.flush()?;
+    
+    // sonic
+    let mut ingest_channel = IngestSonic::new()?;
+    let data = message.to_ingest();
+    match ingest_channel.ingest_document("emails", &*list, &*message.hash.clone(), &*data) { // message.hash to id
+        Ok(_) => {}
+        Err(_) => {println!("Unable to ingest {} email to sonic search", message.hash.clone())}
+    };
+    let _ = ingest_channel.stop_ingest();
+    
     
     // indexing
     let _ = Indexes::add(message.clone());
@@ -343,8 +355,7 @@ async fn main() -> anyhow::Result<()> {
     let start = Instant::now();
     
     run().await;
-
-
+    
     // let email = LettreMessage::builder()
     //     .from("sokolovskiiyura@outlook.com".parse().unwrap())
     //     .to("sokolovskiiyura@gmail.com".parse().unwrap())
@@ -393,7 +404,6 @@ async fn main() {
     
     run().await;
     
-    
     // let email = LettreMessage::builder()
     //     .from("sokolovskiiyura@outlook.com".parse().unwrap())
     //     .to("sokolovskiiyura@gmail.com".parse().unwrap())

+ 20 - 0
src/server.rs

@@ -6,6 +6,7 @@ use std::io::{BufReader};
 use warp::Filter;
 use warp::http::StatusCode;
 use crate::{create_folder_lar, delete_folder_lar, rename_folder_lar};
+use crate::sonic::SearchSonic;
 use crate::util::read_and_decompress_file;
 
 pub async fn run_api() {
@@ -29,6 +30,10 @@ pub async fn run_api() {
             .and(warp::get())
             .and(warp::query::<GetEmailQuery>())
             .and_then(get_email_handle))
+        .or(warp::path("search")
+            .and(warp::get())
+            .and(warp::query::<SearchQuery>())
+            .and_then(search_handle))
         .or(warp::path("create_folder")
             .and(warp::post())
             .and(warp::body::json())
@@ -262,4 +267,19 @@ async fn rename_folder_handle(data: RenameFolderRequest) -> Result<impl warp::Re
             Ok(warp::reply::with_status(warp::reply::json(&error_response), StatusCode::BAD_REQUEST))
         }
     }
+}
+
+#[derive(Deserialize)]
+struct SearchQuery {
+    list: String,
+    limit: i32,
+    offset: i32,
+    query: String
+}
+
+async fn search_handle(query: SearchQuery) -> Result<impl warp::Reply, warp::Rejection> {
+    match SearchSonic::search_document("emails", &*query.list, &*query.query, query.limit, query.offset){
+        Ok(result) => Ok(warp::reply::json(&result)),
+        Err(_) => Ok(warp::reply::json(&Vec::<String>::new()))
+    }
 }

+ 82 - 0
src/sonic.rs

@@ -0,0 +1,82 @@
+use anyhow::anyhow;
+
+#[cfg(target_os = "wasi")]
+use sonic_channel_wasi::{Dest, IngestChannel, PushRequest, QueryRequest, SearchChannel, SonicChannel};
+#[cfg(not(target_os = "wasi"))]
+use sonic_channel::{Dest, IngestChannel, PushRequest, QueryRequest, SearchChannel, SonicChannel};
+use crate::config::Config;
+
+pub struct IngestSonic{
+    ingest_channel: IngestChannel
+}
+
+impl IngestSonic{
+    pub fn new() -> anyhow::Result<IngestSonic> {
+        let ingest_channel = IngestChannel::start(
+            format!("{}:{}", Config::global().sonic_search_addr, Config::global().sonic_search_port),
+            Config::global().sonic_search_password.clone()
+        )?;
+        Ok(IngestSonic {
+            ingest_channel,
+        })
+    }
+    
+    pub fn stop_ingest(&mut self) -> anyhow::Result<()>{
+        self.ingest_channel.quit().map_err(|e| anyhow!("Unable to quit the sonic ingest channel"))?;
+        Ok(())
+    }
+
+    pub fn ingest_document(&self, collection: &str, bucket: &str, object: &str, text: &str) -> anyhow::Result<()> {
+        // TODO may be we need to store ingested email names and make a queue  
+        let contents = Self::slice_string_into_chunks(&*text);
+        for c in contents {
+            let dest = Dest::col_buc(collection, bucket).obj(object);
+            let p = PushRequest::new(dest, c.to_string());
+            self.ingest_channel.push(p)?;        
+        }
+        
+        Ok(())
+    }
+
+    fn slice_string_into_chunks(s: &str) -> Vec<&str> {
+        let mut chunks = Vec::new();
+        let mut start = 0;
+        let len = s.len();
+
+        while start < len {
+            let end = if start + 15000 > len {
+                len
+            } else {
+                let mut end = start + 15000;
+                while !s.is_char_boundary(end) && end > start {
+                    end -= 1;
+                }
+                end
+            };
+
+            if start != end {
+                chunks.push(&s[start..end]);
+            }
+            start = end;
+        }
+
+        chunks
+    }
+}
+
+pub struct SearchSonic{}
+
+impl SearchSonic{
+    pub fn search_document(collection: &str, bucket: &str, query: &str, limit: i32, offset: i32) -> anyhow::Result<Vec<String>> {
+        let search = SearchChannel::start(
+            format!("{}:{}", Config::global().sonic_search_addr, Config::global().sonic_search_port), 
+            Config::global().sonic_search_password.clone()
+        )?;
+        let results: Vec<String> = search.query(QueryRequest::new(
+            Dest::col_buc(collection, bucket),
+            query
+        ).limit(limit as usize).offset(offset as usize)).unwrap_or_else(|_| { vec![] });
+        search.quit()?;
+        Ok(results)
+    }
+}

+ 28 - 0
src/templates/xml.rs

@@ -70,4 +70,32 @@ impl StrMessage {
             &x(&text),
         )
     }
+    
+    pub fn to_ingest(&self) -> String{
+        let msg = self;
+
+        let data = match fs::read_to_string(self.original_path.clone()) {
+            Ok(content) => content,
+            Err(e) => {
+                eprintln!("Error reading file: {}", e);
+                return "Error loading content".to_string();
+            }
+        };
+
+        let parsed_mail = match parse_mail(&data.as_bytes()) {
+            Ok(mail) => mail,
+            Err(e) => return format!("Error parsing email: {}", e),
+        };
+        let html = parse_email(&parsed_mail).unwrap_or_else(|_| msg.body.clone());
+
+        let document = scraper_html::parse_document(&*html);
+        let body = Selector::parse("body").unwrap();
+        let text = document.select(&body)
+            .next().unwrap()
+            .text()
+            .collect::<Vec<_>>()
+            .join(" ");
+        
+        text
+    }
 }