rust learn

Table of Contents

1. Rust

1.1. Rust基础 知识:

1.1.1. std::fmt:

  1. format! 的使用 方法: positional params:, named params: , formating params:
  2. 示例
    format!("Hello, {}!", "world");   // => "Hello, world!"
    format!("{1} {} {0} {}", 1, 2); // => "2 1 1 2"
    format!("{argument}", argument = "test");
    println!("Hello {:1$}!", "x", 5);
    
  3. formating trait: 实践显式 对于外部的Type {}确实需要impl Display, 而{:?} 需要 impl Debug
    1. {} => Display
    2. {:?} => Debug
    3. {:o} => Octal
    4. {:p} => Pointer
  4. fmt::Display vs fmt::Debug
    1. Display: 断言 实现者 总是返回 UTF-8 的字符串, 并非所有的 都实现了 Display
    2. Debug: 应该为所有pub type 实现, 输出为 内部状态, 该Trait 的目的是为了方便Rust Debug, 可以 使用#[derive(Debug)] 来使用默认的内部实现

1.1.2. array & Slice

  1. array的类型为: [T: len], let mut array: [i32; 3] = [0; 3];
  2. Slice: [T]

1.1.3. structures: 有三种类型: 1) Tuple struct, 2) classic struct 3) Unit structs

  1. struct Pair(i32, f32);
  2. struct Person { name: String, age: u8}
  3. struct Unit; unit Struct 没有任何的 field

1.1.4. Enums: 包含多个 变体 的 组合项, 任何一个变体 都是一个 正确的 enum 类型

enum WebEvent {
    // An `enum` may either be `unit-like`,
    PageLoad,
    PageUnload,
    // like tuple structs,
    KeyPress(char),
    Paste(String),
    // or c-like structures.
    Click { x: i64, y: i64 },
}

// A function which takes a `WebEvent` enum as an argument and
// returns nothing.
fn inspect(event: WebEvent) {
    match event {
        WebEvent::PageLoad => println!("page loaded"),
        WebEvent::PageUnload => println!("page unloaded"),
        // Destructure `c` from inside the `enum`.
        WebEvent::KeyPress(c) => println!("pressed '{}'.", c),
        WebEvent::Paste(s) => println!("pasted \"{}\".", s),
        // Destructure `Click` into `x` and `y`.
        WebEvent::Click { x, y } => {
            println!("clicked at x={}, y={}.", x, y);
        },
    }
}

1.1.5. Type Alias:type 关键字能够 使用 type Name = ExistingType; 语法来 使用 Name 代替 ExistingType 使用。 Self 是一种Type Alias

1.1.6. const, static

1.1.7. Variable Bindings: 1)变量默认 是不可修改的, 使用mut 改变 2) 可以在内部的scope中 其同样的名字来shadow 外部的变量 3)可以使用 先声明 后设定数值的形式 使用变量,但是rust 会检查 使用 未定义变量的错误, 来预防因此产生的问题

1.1.8. Types: 1)转换 as关键字 2) type alias: type NanoSecond = u64; 3) 数值的类型,可以添加到 后面最为后缀使用, 例如: 42i32

1.1.9. Conversion: rust 的struct 以及 enum 等自定义类型的 type转换

  1. From & Into:
    1. From 为一个类型定义, 如何create self 从 另一个type中转变
    2. Into 则是From 的 调用者, From<T> for U 自动实现了 Into<U> for T, blank implement
  2. TryFrom & TryInto: 类似于 From & Into 不同的是, 转换可能失败,返回Result
  3. ToString & FromStr:
    1. ToString: 单独为 String 类型 定义了一个 ToString Trait,但是并不需要直接实现 ToString,而是实现了 fmt::Display 之后 就自动了提供了 ToString 中的to_string 方法
              #[stable(feature = "rust1", since = "1.0.0")]
      impl<T: fmt::Display + ?Sized> ToString for T {
          // A common guideline is to not inline generic functions. However,
          // removing `#[inline]` from this method causes non-negligible regressions.
          // See <https://github.com/rust-lang/rust/pull/74852>, the last attempt
          // to try to remove it.
          #[inline]
          default fn to_string(&self) -> String {
              use fmt::Write;
              let mut buf = String::new();
              buf.write_fmt(format_args!("{}", self))
                  .expect("a Display implementation returned an error unexpectedly");
              buf
          }
      }
      
    2. FromStr & parse: 将String 转换为其他类型, 只需要实现了 FromStr for struct, 而String 中的parse 方法 只是 对FromStr::from_str(&string) 的调用

1.1.10. Expression: 程序是由一系列表达式组成的, 1) 赋值表达式 用; 结尾, 2) {} 也是表达式, 如果最后一个表达式 以; 结尾,则返回 (), 否则为最后一个表达式的 结果

1.1.11. Flow of control:

  1. if-else 也是表达式, 所有的分支必须返回同样的类型
  2. loop: loop break continue。 break 用来随时中断退出loop, continue 则用于 用于 跳过剩下的 代码,重新开始一个 循环
    1. loop 是可以嵌套的,并起名字, break ,以及continue 可以使用名字来进行 break, 或者continue
      #![allow(unreachable_code)]
      
      fn main() {
          'outer: loop {
              println!("Entered the outer loop");
      
              'inner: loop {
                  println!("Entered the inner loop");
      
                  // This would break only the inner loop
                  //break;
      
                  // This breaks the outer loop
                  break 'outer;
              }
      
              println!("This point will never be reached");
          }
      
          println!("Exited the outer loop");
      }
      
      fn main() {
          let mut counter = 0;
      
          let result = loop {
              counter += 1;
      
              if counter == 10 {
                  break counter * 2;
              }
          };
      
          assert_eq!(result, 20);
      }
      
      
    2. loop 也是可以 返回数值的, 放到break 后面
  3. while
  4. for: for in 结构用来 遍历 所有实现了 IntoIterator 的对象, 比如简单 range形式: a..b, a..=b
    1. for loop 会自动调用 into_iter 在参数上,我们可以主动产生下面几类实现IntoIterator 的 Iterator:
      1. iter: 产生 引用的 Iterator, 对 ownership不产生影响
      2. into_iter: 将ownership 交给 Iterator, 调用过之后的对象,将不再可用。产生
      3. iter_mut: 产生mut 引用的Iterator, 可以进行修改
  5. match:
    1. c like 方式: 即 match number
    2. 解构对象:
      1. Tuples: 使用.. 来 忽略剩余所有的 tuple
      2. Enums:
      3. Pointers: * & ref ref mut 见下面示例
      4. Structs: struct 同样可以被match
    3. Guards: 在match 对象的arm中,同样可以 使用 if 条件判断 即是 guards
    4. Bindings: match 在 arm中,除了解构对象的同时 可以将变量整体绑定 到一个变量上
      fn main() {
          let triple = (0, -2, 3);
          // TODO ^ Try different values for `triple`
      
          println!("Tell me about {:?}", triple);
          // Match can be used to destructure a tuple
          match triple {
              // Destructure the second and third elements
              (0, y, z) => println!("First is `0`, `y` is {:?}, and `z` is {:?}", y, z),
              (1, ..)  => println!("First is `1` and the rest doesn't matter"),
              // `..` can be the used ignore the rest of the tuple
              _      => println!("It doesn't matter what they are"),
              // `_` means don't bind the value to a variable
          }
      }
      
      //point s 
      
      fn main() {
          // Assign a reference of type `i32`. The `&` signifies there
          // is a reference being assigned.
          let reference = &4;
      
          match reference {
              // If `reference` is pattern matched against `&val`, it results
              // in a comparison like:
              // `&i32`
              // `&val`
              // ^ We see that if the matching `&`s are dropped, then the `i32`
              // should be assigned to `val`.
              &val => println!("Got a value via destructuring: {:?}", val),
          }
      
          // To avoid the `&`, you dereference before matching.
          match *reference {
              val => println!("Got a value via dereferencing: {:?}", val),
          }
      
          // What if you don't start with a reference? `reference` was a `&`
          // because the right side was already a reference. This is not
          // a reference because the right side is not one.
          let _not_a_reference = 3;
      
          // Rust provides `ref` for exactly this purpose. It modifies the
          // assignment so that a reference is created for the element; this
          // reference is assigned.
          let ref _is_a_reference = 3;
      
          // Accordingly, by defining 2 values without references, references
          // can be retrieved via `ref` and `ref mut`.
          let value = 5;
          let mut mut_value = 6;
      
          // Use `ref` keyword to create a reference.
          match value {
              ref r => println!("Got a reference to a value: {:?}", r),
          }
      
          // Use `ref mut` similarly.
          match mut_value {
              ref mut m => {
                  // Got a reference. Gotta dereference it before we can
                  // add anything to it.
                  *m += 10;
                  println!("We added 10. `mut_value`: {:?}", m);
              },
          }
      }
      
      fn main() {
          struct Foo {
              x: (u32, u32),
              y: u32,
          }
      
          // Try changing the values in the struct to see what happens
          let foo = Foo { x: (1, 2), y: 3 };
      
          match foo {
              Foo { x: (1, b), y } => println!("First of x is 1, b = {},  y = {} ", b, y),
      
              // you can destructure structs and rename the variables,
              // the order is not important
              Foo { y: 2, x: i } => println!("y is 2, i = {:?}", i),
      
              // and you can also ignore some variables:
              Foo { y, .. } => println!("y = {}, we don't care about x", y),
              // this will give an error: pattern does not mention field `x`
              //Foo { y } => println!("y = {}", y),
          }
      }
      
      
      fn main() {
          let pair = (2, -2);
          // TODO ^ Try different values for `pair`
      
          println!("Tell me about {:?}", pair);
          match pair {
              (x, y) if x == y => println!("These are twins"),
              // The ^ `if condition` part is a guard
              (x, y) if x + y == 0 => println!("Antimatter, kaboom!"),
              (x, _) if x % 2 == 1 => println!("The first one is odd"),
              _ => println!("No correlation..."),
          }
      }
      
      
      // A function `age` which returns a `u32`.
      fn age() -> u32 {
          15
      }
      
      fn main() {
          println!("Tell me what type of person you are");
      
          match age() {
              0             => println!("I haven't celebrated my first birthday yet"),
              // Could `match` 1 ..= 12 directly but then what age
              // would the child be? Instead, bind to `n` for the
              // sequence of 1 ..= 12. Now the age can be reported.
              n @ 1  ..= 12 => println!("I'm a child of age {:?}", n),
              n @ 13 ..= 19 => println!("I'm a teen of age {:?}", n),
              // Nothing bound. Return the result.
              n             => println!("I'm an old person of age {:?}", n),
          }
      }
      
      
  6. if let: 在判断的同时 进行match
  7. while let

1.1.12. Functions:

  1. methods: 依附于 对象的函数, 在methods 的block中,能够 通过self使用 对象的 数据
  2. closures: |val| {val + x}
    1. Capturing: 捕获, 其可以 捕获环境中的 变量, 可以是: &T, &mut T , T(by value)
    2. 作为参数: 分为三类: Fn, FnMut, FnOnce, 对应ownership 的 &T, &mut T, T. Fn 可以无限次执行, FnMut 则要求 capture 变量的 mut 引用, FnOnce 则 只能执行一次
      1. 疑问:
        1. 1. 如何 区分 Fn(i64, i64) -> i64 与Fn(&String) -> i64
        2. 2. FnOnce 是如何确定的, 有些函数,即便将变量 move 到了block中, 然而依然可以调用多次, 这些优势如何判断的? 根据内部函数调用的 Fn 属性吗? 比如mem::drop(p) 则 包含其调用的函数 则为 FnOnce?
        3. 3. 如何断定 Cloosure 为 Fn or FnMut ?
          struct Point {
              x: f64,
              y: f64,
          }
          
          // Implementation block, all `Point` methods go in here
          impl Point {
              // This is a static method
              // Static methods don't need to be called by an instance
              // These methods are generally used as constructors
              fn origin() -> Point {
                  Point { x: 0.0, y: 0.0 }
              }
          
              // Another static method, taking two arguments:
              fn new(x: f64, y: f64) -> Point {
                  Point { x: x, y: y }
              }
          }
          

1.1.13. Modules:

  1. mod visibility: mod 的可见性, mod 默认只能在本mod 可见, 需要使用 pub 来对外可见, 定义其中的fn, struct 使用同样的规则, 因为mod 可以nest, 所以 存在pub(self) (等同于private) pub(super) 即让super mod 可见, 而pub(crate)则让crate 可见
  2. struct visibility: struct 中包含fn 以及 field,默认都为 对 所在 定义的mod可见, pub 则开放为对外部的 mod可见
  3. mod vs struct: struct 的控制比较弱, mod的控制则相对复杂, struct 可能并不需要如此复杂的规则吧
  4. 关键字 use: 我们可以使用 use mod::struct as another_struct 来 减少路径的拼写, 使用as 更可以 启用别名
  5. mod 的 使用类似于 Unix下的 目录 安排, super 代表 .. self 则代表 本mod

1.1.14. Attributes: 可以用来作什么? 1) 条件编译, 2)设定crate 属性, 3)关闭 warning 4)启用编译器特性(maros etc) 5) link to a foreign library 6) 设定unit test

  1. 形式: 当应用到 整个crate: #![crate_attribute], 应用到module 或者 item : #[item_attribute]
  2. 还可以接受参数: 1) #[attribute = "value"] 2) #[attribute(key = "value")] 3) #[attribute(value)]
  3. 示例:
    1. #[allow(dead_code)]: 关闭rust 关于没有调用函数的提示
    2. #![crate_name = "rary"]
    3. cfg(Configuration): 1) #[cfg(…)] 条件编译 2) cfg!(…) 在运行阶段的条件判断, 返回bool值
      // This function only gets compiled if the target OS is linux
      #[cfg(target_os = "linux")]
      fn are_you_on_linux() {
          println!("You are running linux!");
      }
      
      // And this function only gets compiled if the target OS is *not* linux
      #[cfg(not(target_os = "linux"))]
      fn are_you_on_linux() {
          println!("You are *not* running linux!");
      }
      
      fn main() {
          are_you_on_linux();
      
          println!("Are you sure?");
          if cfg!(target_os = "linux") {
              println!("Yes. It's definitely linux!");
          } else {
              println!("Yes. It's definitely *not* linux!");
          }
      }
      

1.1.15. Smart Pointer:

  1. Box: heap store
  2. Rc<T>: Reference Counter
    1. Rc::clone(&rc) || rc.clone()
    2. Rc::strong_count
    3. Rc::weak_count

1.1.16. OIBITs:

1.1.17. stand for “opt-in builtin trait”: 比如 Send, Sync, 该种 Trait 默认 为所有的 struct enum 提供默认实现, 即: 如果 struct A 中的field 都impl Trait 则 struct A 也同样 impl Trait。

1.1.19. impl Trait: 目前 可以在 fn xxx() -> impl Trait

1.1.20. Box<Trait> vs Box<T> where T: Trait vs &Trait 的区别:

  • Box<Trait>: 并非 Generic Type, 而是 Trait的 fat Pointer, 在 stack over flow 有明显的比较, 在 the rust book 中 在讲解了 Trait为 dynamic dispatch 的fat pointer & 其中可能包含 destructor 指针
  • Box<T> where T: Trait 是 Generic Type
  • 代码如下

    // Box<T> where T: Trait
    use std::fmt::Debug;
    
    struct Wrapper<T> {
        contents: Option<Box<T>>,
    }
    
    impl<T: Debug> Wrapper<T> {
        fn new() -> Wrapper<T> {
            Wrapper { contents: None }
        }
    
        fn insert(&mut self, val: Box<T>) {
        }
    }
    
    fn main() {
        let mut w = Wrapper::new();
    
        // makes T for w be an integer type, e.g. Box<i64>
        w.insert(Box::new(5));
    
        // type error, &str is not an integer type
        // w.insert(Box::new("hello"));
    }
    
    
    // Box<Trait>
    use std::fmt::Debug;
    
    struct Wrapper {
        contents: Option<Box<Debug>>,
    }
    
    impl Wrapper {
        fn new() -> Wrapper {
            Wrapper { contents: None }
        }
    
        fn insert(&mut self, val: Box<Debug>) {
        }
    }
    
    fn main() {
        let mut w = Wrapper::new();
        w.insert(Box::new(5));
        w.insert(Box::new("hello"));
    }
    
    

1.1.21. dyn trait:

1.1.22. rust stackoverflow 问题: 使用 lldb target/debug/deps/smoke-910c8a3208aec5c7 ,跟gdb 调试一样,定义定位到 callstack的相关信息

1.2. rust mut var & mut ref 的含义

let mut a = xx; 是否 意味着 a 可以被替换为 其他的同类型的 var, 还是仅仅意味着 可以 a.xx 调用xx mut方法?

mut x: &mut i32 x = &y *x = 123 mut x: &i32 x = &y ---–— x: &mut i32 -–— *x = 123 x: &i32 -–— ---–—

int *x; cont int *x; int * const x; const int * const x;

let mut ix = 10;
let mut iy = 20;
{
    let mut x = &mut ix;
    *x = 100;
    x = &mut iy;
    *x = 200;
}
println!("x: {}, iy: {}", ix, iy);

1.3. const & static : 参考

1.4. Rust 周边工具

1.4.1. rustup: rust complier 的管理工具, 可以方便的切换 stable, beta, and nightly

1.4.2. cargo: 是rust的包管理工具, 用来 下载 rust的依赖, 编译, 以及 分发 到 crates.io

  1. 在安装 rust 之后, cargo 也会被自动安装上,
  2. cargo 提供了一些有用的工具有:
    1. cargo new package # default –bin 生成 可执行 program, 可以pass –lib 来产生库程序
    2. cargo build
    3. cargo 存在的意义:
      1. 剥离 rustc 的复杂度, 类似 make 与 c 一样
      2. cargo 最终调用rustc 来编译 项目, 当然可以 直接使用 rustc 来编译项目,但是 需要出入 复杂的参数 来 添加项目 依赖关系, 编译文件, 依赖关系 等,并精心安排顺序 来进行调用。
      3. 所以使用cargo: make工具, cargo 的功能
        1. 1. 使用两个文件 来 包含 package的信息
        2. 2. 拉取,构建 package 依赖
        3. 3. 使用正确的参数 来调用rustc 或者其他 tool, 来构建项目
        4. 4. 引用约定,方便package 构建
      4. cargo 使用笔记:
        1. cargo new hello_world –bin

          1. $ cd hello_world
            $ tree .
            .
            ├── Cargo.toml
            └── src
                └── main.rs
            
            1 directory, 2 files
            
        2. 其中 Cargo.toml 被称为 manifest,包含package的元数据

          1. fn main() {
                println!("Hello, world!");
            }
            
            
            $ cargo build
               Compiling hello_world v0.1.0 (file:///path/to/package/hello_world)
            
            $ ./target/debug/hello_world
            Hello, world!
            
            
            $ cargo run
               Compiling hello_world v0.1.0 (file:///path/to/package/hello_world)
                 Running `target/debug/hello_world`
            Hello, world!
            
            
            
            $ cargo build --release
               Compiling hello_world v0.1.0 (file:///path/to/package/hello_world)
            
            
        3. cargo build 将会 构建 package
        4. cargo run 则 构建并运行它
        5. cargo build –release 将 构建 优化的代码
        6. cargo 默认的构建 代码优化级别 为 debug, 存在的目录为 target/debug, 构建 优化后的代码需要 显式传递 参数 –release 生成的文件目录为 target/release
        7. Dependencies:
          1. crate.io 为 rust 中间的 package 机构, 用于发现 下载 更新package
          2. 添加依赖关系: 在 Cargo.toml 中 的dependencies 下,添加项目

            1. [package]
              name = "hello_world"
              version = "0.1.0"
              authors = ["Your Name <you@example.com>"]
              edition = "2018"
              
              [dependencies]
              time = "0.1.12"
              regex = "0.1.41"
              
        8. 之后的 cargo build 过程

          1. $ cargo build
                  Updating crates.io index
               Downloading memchr v0.1.5
               Downloading libc v0.1.10
               Downloading regex-syntax v0.2.1
               Downloading memchr v0.1.5
               Downloading aho-corasick v0.3.0
               Downloading regex v0.1.41
                 Compiling memchr v0.1.5
                 Compiling libc v0.1.10
                 Compiling regex-syntax v0.2.1
                 Compiling memchr v0.1.5
                 Compiling aho-corasick v0.3.0
                 Compiling regex v0.1.41
                 Compiling hello_world v0.1.0 (file:///path/to/package/hello_world)
            
        9. package 构成:

          1. .
            ├── Cargo.lock
            ├── Cargo.toml
            ├── src/
            │   ├── lib.rs
            │   ├── main.rs
            │   └── bin/
            │       ├── named-executable.rs
            │       ├── another-executable.rs
            │       └── multi-file-executable/
            │           ├── main.rs
            │           └── some_module.rs
            ├── benches/
            │   ├── large-input.rs
            │   └── multi-file-bench/
            │       ├── main.rs
            │       └── bench_module.rs
            ├── examples/
            │   ├── simple.rs
            │   └── multi-file-example/
            │       ├── main.rs
            │       └── ex_module.rs
            └── tests/
                ├── some-integration-tests.rs
                └── multi-file-test/
                    ├── main.rs
                    └── test_module.rs
            
          2. Cargo.toml and Cargo.lock 在项目的根目录
          3. src 下 为源代码
          4. 默认的 library file 为 src/lib.rs
          5. 默认的 executable file 是 src/main.rc, 其他的 放在 src/bin/
          6. 基准测试 放在benches 目录下
          7. 示例代码放在examples 目录下
          8. 集成测试 放在 tests 目录下
          9. 其他详细的需要参看 https://doc.rust-lang.org/book/ch07-00-managing-growing-projects-with-packages-crates-and-modules.html

        10. Cargo.toml 与 Cargo.lock: 两种目的,
          1. cargo.toml 描述 大概的依赖关系 并不准确,是由 人来确定的
          2. Cargo.lock 包含准确的依赖关系, 由cargo 来维护
          3. https://doc.rust-lang.org/cargo/faq.html#why-do-binaries-have-cargolock-in-version-control-but-not-libraries
        11. 示例:

          1. Cargo.toml
            
            [package]
            name = "hello_world"
            version = "0.1.0"
            authors = ["Your Name <you@example.com>"]
            
            [dependencies]
            rand = { git = "https://github.com/rust-lang-nursery/rand.git", rev = "9f35b8e" }
            
            
            Cargo.lock
            
            
            [[package]]
            name = "hello_world"
            version = "0.1.0"
            dependencies = [
             "rand 0.1.0 (git+https://github.com/rust-lang-nursery/rand.git#9f35b8e439eeedd60b9414c58f389bdc6a3284f9)",
            ]
            
            [[package]]
            name = "rand"
            version = "0.1.0"
            source = "git+https://github.com/rust-lang-nursery/rand.git#9f35b8e439eeedd60b9414c58f389bdc6a3284f9"
            
        12. Cargo.lock 中包含 依赖的确定的 version, 当其他人使用的时候, 他们将使用相同的 sha,即便我们并没有在Cargo.toml 中使用
        13. cargo update 更新全部的依赖, cargo update -p rand 只更新依赖 rand
        14. Test:
          1. cargo test 执行 package中的所有test, test主要有两种: 1) 在每个 src 目录中的文件, 2) tests/ 目录下的所有文件。 1)中的为单元测试, 2)则为 集成测试,

          2. $ cargo test
               Compiling rand v0.1.0 (https://github.com/rust-lang-nursery/rand.git#9f35b8e)
               Compiling hello_world v0.1.0 (file:///path/to/package/hello_world)
                 Running target/test/hello_world-9c2b65bbb79eabce
            
            running 0 tests
            
            test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out
            
          3. cargo test foo 可以单独执行 名字为foo的测试,
          4. cargo test 其实还会执行 额外的测试,包含在 src中的部分 文档中的测试(并不重要,为补充部分)
          5. Cargo Home: 当build package的时候, cargo 将下载的依赖package 存储到 Cargo home 下。当做cache 使用。
          6. 可以通过 改变 环境变量 CARGO_HOME 来改变 cargo home的值, 默认 为 $HOME/.cargo/
          7. Cargo Home 目录下的 数据:
            1. bin 目录: 可执行crate, 包括cargo install 或者 rustup 安装的
            2. git/db: crate 依赖git 项目时, cargo clone 项目到 该目录下
            3. git/checkouts: git/db 项目中的检出到该文件, 比如 依赖于特定的commit
            4. registry: 项目依赖于 crate.io 中的crate 存放在该目录下
              1. registry/index: crate 的原数据, 包括: version, dependencies 等
              2. registry/cache: 下载的crate 储存到该目录下, 存储形式为 .crate 的gzip压缩文件
              3. registry/src: cache 的解压形式 存放在 该文件中
        15. 指定 Dependencies:
          1. 依赖版本: 版本号各个位置数字的含义: SemVer compatible 与 link 中讲的同样, major.minor.patch
            1. Caret requirements: 指定 可以使用 一个 major 版本号 不变的更新, 但是 0 是 一个特殊的数字,标识不与 任何 数字兼容。即是: 0.0.1, 与 0.1.x 不兼容
              1. 下面为兼容样例:

              2. ^1.2.3  :=  >=1.2.3, <2.0.0
                ^1.2    :=  >=1.2.0, <2.0.0
                ^1      :=  >=1.0.0, <2.0.0
                ^0.2.3  :=  >=0.2.3, <0.3.0
                ^0.2    :=  >=0.2.0, <0.3.0
                ^0.0.3  :=  >=0.0.3, <0.0.4
                ^0.0    :=  >=0.0.0, <0.1.0
                ^0      :=  >=0.0.0, <1.0.0
                
            2. Tilde requirements: 分为以下情况:
              1. 有 major.minor.patch 或者 major.minor 只有patch version 的升级是允许的
              2. 有major情况下, minor patch verison的升级 是允许的
              3. for example

              4. ~1.2.3  := >=1.2.3, <1.3.0
                ~1.2    := >=1.2.0, <1.3.0
                ~1      := >=1.0.0, <2.0.0
                
            3. Wildcard requirements:
              1. 允许所在位置上的 任何版本
              2. for example

              3. \*     := >=0.0.0
                1.*   := >=1.0.0, <2.0.0
                1.2.* := >=1.2.0, <1.3.0
                
            4. Workspaces: workspace 下的一系列package 共享同样的Cargo.lock, output dir, 多样的配置(比如profile), workspace下的packages 被称为 workspace members
              1. 存在两种形式: 1) [package] 与 [workspace] 共存在 Cargo.toml 中 2) 只有 [workspace] 存在Cargo.toml 中, 被称作 Virtual manifest
              2. 主要作用:
                1. 1. 共享Cargo.lock
                2. 2. 共享output dir, Cargo.toml 中 [target]
                3. 3. [patch], [replace] and [profile.*] sections in Cargo.toml 只能在 识别 workspace 中的 manifest,member package 中的被忽略
              3. workspace section 中的配置

              4. [workspace]
                members = ["member1", "path/to/member2", "crates/*"]
                exclude = ["crates/foo", "path/to/other"]
                
                1. members 为 成员package 列表, exclude 排除 member
              5. workspace 寻找: Cargo 自动的 向上目录 寻找 含有 [workspace] 的Cargo.toml, 在 member中可以指定 package.workspace 来 直接指定 workspace 的位置, 来防止自动查找, 这个对于 没有在 workspace 目录下的member package 非常有用
              6. member package: cargo commadn -p package 可以指定 package 来 执行命令, 如果没有指定 package, 则 选择当前所在目录的package, default-members = ["path/to/member2", "path/to/member3/"] 可以指定默认的 操作的member package

1.4.3. rustup: 从官方下载rustc, 使能够随意的在 stable, beta, nightly 中切换。 让 cross-compiling 编译变的简单

  1. 工作原理: rustup 通过 ~/.cargo/bin 下的工具 来实现其功能, 比如 安装在~/.cargo/bin 下的 rustc cargo 只是一个 到真正执行工具的 代理,
    1. rustup 提供了一个方便的机制来 控制 这些代理的行为, 比如 通过执行rustup default nightly 来切换 nightly 下的工具
  2. 概念:
    1. channel: rustc 按照 beta, night, stable 三个 channel 进行发布, channel 并没有什么用, 只是个概念而已。
    2. toolchain: rustc and cargo 等相关工具。 因为能够控制rustc 即是channel概念下的实际应用。
    3. target: rustc 可以为多个平台生成代码。 默认的 rustc 使用host 作为target, 为了生成不同target的代码,我们需要 使用rustup target 来安装目标target
    4. component: 每个rust版本的发布,都会包含一些 组件, 包括,rustc, clippy 等
    5. profile: 为了更好的与component 工作, profile 定义了 一组component,
  3. toolchain: rustup 不仅可以 安装stable, beta, nightly 三个channel, 还可以安装 其他的 官方 历史版本
    1. channel 的命令规则:
      <channel>[-<date>][-<host>]
      
      <channel>       = stable|beta|nightly|<major.minor>|<major.minor.patch>
      <date>          = YYYY-MM-DD
      <host>          = <target-triple>
      
      
  4. 其他命令: 保持 rust 更新:
    $ rustup update
    info: syncing channel updates for 'stable'
    info: downloading component 'rustc'
    info: downloading component 'rust-std'
    info: downloading component 'rust-docs'
    info: downloading component 'cargo'
    info: installing component 'rustc'
    info: installing component 'rust-std'
    info: installing component 'rust-docs'
    info: installing component 'cargo'
    info: checking for self-updates
    info: downloading self-updates
    
      stable updated: rustc 1.7.0 (a5d1e7a59 2016-02-29)
    
  5. 如上, rustup update 会更新 stable, component, 以及 rustup self, 可以使用 rustup self update 来手动更新rustup

1.5. 工具 Trait:

1.5.1. Drop

  1. 调用时机: Value drop的时候, 在struct 内部 field drop之前, drop func 内 可以使用 field, struct drop之后,内部的field 的drop func 依次调用
  2. 何时需要: struct 存在rust 不能够理解的资源时

1.5.2. Sized: marker trait , rust 用来标记 在 compile时期, size 能够确定的 type

  1. unsized de type 有: str, [T] (变长, 但 &str, &[T] 指针 依然是 Sized)
  2. unsized 的type 不能够存储变量, 所以不能够作为 function 的 args 使用, 需要 使用 指针引用
  3. 可以使用 T:Sized, T:?Sized 来限制type 的类型, ?Sized 表示 可以为 unsized type

1.5.3. Clone:

fn clone(&self) -> Self;
fn clone_from(&mut slef, source: &Self);  //用 source 变量中的data 来装备自己, 比 clone 要快
  1. #[derive(Clone)]
  2. std::fs::File, sync::mutex 不支持 clone

1.5.4. Copy: 基本类型 i32, i64 etc

  1. A = B 不是 Ownership的转移 , 而是 clone B -> A
  2. Copy: Clone, Clone 是copy的 super trait
  3. #[derive(Copy, Clone)]
  4. Clone 为 复制 复杂数据结构,耗时操作 提供 clone 方法 #[derive(Clone)]
  5. Copy 有一些特殊,除非member 包含简单的数据结构类型,否则不能够实现 Copy
  6. Copy 只能够 通过标记 #[derive(Copy)] 来实现

1.5.5. Deref & DerefMut : rust 自动尝试 插入 deref() 代码,当类型不匹配时, 可以经过多次插入

trait Deref {
  type target: ?Sized;

  fn deref(&self) -> &Self::target;
}
  1. 1. 为 Smart Pointer 设计. Box, Rc, Arc
  2. 2. 模版参数, 不提供代码插入。

1.5.6. Default

trait Default {
  fn  default() -> Self;
}
  1. String, Collections 都实现了Default;
  2. #[derive(Default)] 如果 struct 各个field 都实现了 Default

1.5.7. AsRef & AsMut:

  1. 定义
    trait AsRef<T: Sized> { 
        fn as_ref(&self) -> &T;
    }
    
  2. 对比 Deref, rust不提供代码插入
  3. 从 A 中 as_ref B 出来, 扩大参数接受范围
    fn open<P: AsRef<Path>> (path: P) -> Result<File> {
       let path: Path = path.as_ref():
       //....
    }
    let fs = std::fs::File::open("xxxxxx.txt");
    
  4. 减少 具体的 type 定义, 在写 Trait AsFoo之前 应该尝试 AsRef<Foo>

1.5.8. Borrow & BorrowMut

  1. 定义:
    trait Borrow(Borrowed: ?Sized)    {
        fn borrow(&self)-> &Borrowed:
    }
    
    impl HashMap<K, V> where K: Eq + Hash {
        fn get<Q: ?Sized> (&self, key: &Q) -> Option<&V> {
           where K: Borrow<Q>,
                 Q: Eq + Hash
                // ...
         }
    }
    
    
  2. 与AsRef 相似, 但是添加了 Hash Trait impl 的限制
  3. 限制 HashMap<K, V>.get(key: &Q) Q为 HashMap 中的 key 能够 Borrow 出来,不影响 HashMap中的K 的ownership, 从而能够 loopup 到对应的key, 扩大 get(&Q) 的参数范围。
  4. 比如 HashMap<String, String> 可以 hashmap.get("xx")

1.5.9. From & Into:

  1. 定义:
    trait Into<T: Sized> {
       fn into(&self) -> T;
    }
    
  2. 消耗 Type A ownership 返回 type B
  3. if A imp From<B> 则 rust 自动impl B impl Into<A>
  4. 使用场景: Into 作为 fun 的args, 使 args 更加灵活, From 作为 构造函数 从多个 类型 -> Self
  5. 因为 Into 需要消耗 ownership 所以 “重”, 因为可能会产生内存分配

1.5.10. ToOwned:

  1. 定义:
    trait ToOwned {
      type Owned: Borrow<Self>; // 主意该限制
      fn to_owned(&self) -> Self::Owned;
    }
    
  2. 解决 clone limit, 因为 &A.clone() -> A, 所以 &str 不能够 impl Clone Trait (因为 无法返回 str: unsized 类型)
  3. 所以 &str 可以 .to_owned -> String

1.5.11. Send, Sync

  1. Send: 1. safe pass value between threads 2. move ownership cross threads
  2. Sync: 1. safe pass none mut ref between threads. 2. share cross threads, only &T is Send, then T is Sync, 所以Sync 与 Send之间必然存在一些联系。
  3. Send 应该 > Sync
  4. ? 如果实现 &T Send, 则 T 必须 Sync, 说明 在 不同的thread 之间 Send的时候, 至少 可能跟 Rc 一样,会改变计数,来决定 哪一个Thread会进行释放, 但是 Arc<T> if T impl Sync, 才能 impl Send, 所以T: Sync 到底意味着什么?
  5. 不需要 #[derive] rust 自动提供 marker
  6. 但依然可以 手动 impl for Struct,即自由的控制type 是否可以Send, Sync:
    1. unsafe impl Send for MyBox {} ;
    2. unsafe impl Sync for MyBox {} ;
    3. impl !Send for MyBox {};
    4. impl !Sync for MyBox {};

1.5.12. Ref Pointer:

  1. Cell, RefCell: 增加 borrow checker(runtime)
    1. 内部可变, 使用场景: 1. 内部可变化,外部不可变 2. Rc.clone(&ref) 方法, ref 不可变,但是内部增加了 引用计数
  2. Rc: reference counter, smart pointer (管理 Heap Memory)
    let rc = Rec::new(data);
    let rc1 = Rc::clone(&rc);
    
  3. Arc: atomicaly Reference counter (多线程 版本Rc)
    1. 默认 unmut, Arc 需要 配合 Mutex, RwLock, Atomic 使用
    2. Arc<T> if T impl Send & Sync 则 Arc<T> impl Send & Sync
    3. Arc 永远 thread safe 指的是其中的counter, 但是没有 add thread safe to <T> (内部的data)
    4. 例如 Arc<RefCell<T>> 并不是 thread safe Arc 是, 但RefCell 并不是,所以 Arc<RefCell<T> 并不是 thread safe
  4. Reference Counter(包括 Rc, Arc) 都存在对应的 Weak 类型,已解决环问题: downgrade -> Weak<T> -> upgrade -> Option<Rc<T>>

1.6. rust std 阅读

1.6.1. doc 中经常出现 primitive type 与 std:: 的问题, 是因为 primitive type是 compiler 实现的。 而 std 也实现了同样名字的一部分。所以是此现象

1.6.2. rust prelude:

是 避免 繁琐的 use std:: 的一个 包装,如此可以简单的 use std::prelude:: 即可包含 起 包含的集合

  1. 存在多种 mod 下的 prelude, 如: std::io::prelude, core::prelude, std::os::unix::prelude etc..

1.6.3. iter module: 是一个 关于 iter 的模块,包含了 一系列的 trait, struct 用于支持 Iterator

1.6.4. Vev Iter, Iterator, Enumerate 函数中 find, map block中的参数类型,是否 为 &T, &mut T

  1. 其中 &T 可以 被 x 或者 &x 匹配,&mut T 可以被 x 或者 &mut x 匹配, 如果其中的 T 不能够borrow 出来的话,
  2. 需要使用 ref x 或者 ref mut x , 这时候 x 的类型为 && T 或者 &&mut T。ref 的含义为: 设定x为 目标的引用
  3. 所以并不会如 &x 一样 将 &T 中的T borrow出来。
  4. 下面的代码可以查看 对应的type_of(&var) 的具体类型。
    use std::any::type_name;
    
    fn type_of<T>(_: T) -> &'static str {
        type_name::<T>()
    }
    
    
    // 需要注意,这里面 iter 被find之后,players 被消耗掉了.
    let mut players = player::Player::get_many(&realm_mysql, &ary)?.into_iter();
    println!("players is ---------- {:?}, {:?}, {:?}", players, sender_id, recipient_id);
    let i_data = players.find(|x| x.id == sender_id);
    println!("players is ---------- {:?}", i_data);
    let sender = match i_data {
      Some(data) => data,
      None => return Ok(None),
    };
    
    println!("players is ---------- {:?}", players);
    let i_data = players.find(|x| x.id == recipient_id);
    println!("players is ---------- {:?}", i_data);
    

1.6.5. Box: Box::new(T) 方法

1.6.6. Borrow Trait 的错误使用: 下面为代码的 正确使用示例: 为什么会如此设计?

  1. 1. HashMap<K, V> 因为K 能为一个 heap分配的对象,比较 heavy,所以 我们采用 Borrow<Q> for K, 来实现 .get(Q) , 例如 Borrow<str> for String
  2. 2. 对于 Hash的 impl, 对于 使用于HashMap的K,实现 Borrow<Q> for K 比较 特殊,需要保证 Q 与 K hash 数值的相同。
  3. 3. 点 2 在 hash map inner impl 中 有所体现 , 即 HashMap.get(Q) 中 get_inner<Q: ?Sized>(&self, k: &Q) -> Option<&(K, V)> 中 let hash = make_hash::<K, Q, S>(&self.hash_builder, k); 直接 使用了 k: &Q 做了 hash, 所以 &Q 与 K 的hash 必然一致才行
    use std::borrow::Borrow;
    use std::collections::HashMap;
    use std::hash::{Hash, Hasher};
    
    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    pub enum AbType {
        Rss,
        Gold,
    }
    
    impl Borrow<str> for AbType {
        fn borrow(&self) -> &str {
            match self {
                &AbType::Gold => "gold",
                &AbType::Rss => "rss",
            }
        }
    }
    
    impl Hash for AbType {
        fn hash<H: Hasher>(&self, state: &mut H) {
            let s: &str = self.borrow();
            s.hash(state);
        }
    }
    
    fn main() {
        let mut hash = HashMap::new();
        hash.insert(AbType::Gold, 10);
    
        let val = hash.get("gold");
        let gold_val = hash.get(&AbType::Gold);
        println!("val: {:?}, goldVal: {:?}", val, gold_val);
    }
    

1.6.7. FromIterator: 定义如下:

pub trait FromIterator<A>: Sized {
    fn from_iter<T>(iter: T) -> Self
    where
        T: IntoIterator<Item = A>;
}
  1. 主要用于 提供 从 Iterator 构建 Self 的一种方法, T 需要 impl IntoIterator<Item>
  2. 主要实现有: String, HashMap, Vec, 以及 Option<V> , Result<V, E>

1.6.8. Vec: 的使用

rust Vec<&str>.join(",") 与 Vec<String> 存在一些问题, 这里面没有问题, 问题在于 String.split 会出现 "".split(',') [""] 的情况, 即 rust 使用 String::default.split(",") 会出现 ['']情况* rust 使用

1.6.9. rust thread unwrap 的问题

(之前的概念存在错误, rust 中的 Error就是Exception, Error/Exception 是可以被处理,return/throw 的, 并不会造成 thread 的崩溃, 只有panic/unwrap才会造成thread的崩溃,而导致, thread 进行stack 的unwind, 并在JoinHandler.join 时候返回 Result 所以 如何防止 thread 代码中依赖的库 中不存在panic ? 答案是没有方法, 但是存在 panic::catch_unwind / std::panic::set_hook 来对panic 进行处理, catch_unwind 能够防止 thread 的panic , set_hook 则定义你 panic 时的callback 虽然有 panic::catch_unwind 但是 更好的解决方法是 return Result(返回Error) 交给 Appliation level 处理,而不是 panic

1.7. Atomic: rust 使用c++ 20 的 atomic 内存模型(并非c++ 非常好,而是 atomic 非常复杂,使用现有的有利于 学习使用), 为了 弥补 complier 与 hardwre 之间优化的差异。

1.7.1. Compiler Reordering: 编译器 将分析 程序,并重新排列 指令 以优化 程序性能, example:

  x = 1;
  y = 3;
  x = 2;
//------------
  x = 2;
  y = 3;

1.7.2. Hardware Reordering: 因为cpu的内存层次结构设计, 比如 每个Cpu都存在L1,共用 L2, 同样将导致问题。example:

#+begin_src c initial state: x = 0, y = 1

THREAD 1 THREAD2 y = 3; if x = 1 { x = 1; y * 2; }

// 理想情况下: 将存在两种情况: y = 3: (thread 2 did the check before thread 1 completed) y = 6: (thread 2 did the check after thread 1 completed)

/ 然而 因为 hardware优化问题,可能存在 下面情况: y = 2: (thread 2 saw x = 1, but not y = 3, and then overwrote y = 3) / 即 运行两个 thread 的两个cpu, L1, L2 缓存问题,导致 两边看到的数据不一致(没有同步) #+end_center

1.7.3. 不同的 cpu 存在不同的 保证: strongly-ordered(Inter) and weakly-ordered(Arm)

  1. 在提供 strongly-ordered 的cpu 上,要求强order,花费非常小, 在weakly-ordered 的cpu上,要求强order, 则代价非常大。

1.7.4. C++ 内存模型通过 因果一致 (causality result) 来弥补 cpu/ compiler 之间的差异。

1.7.5. data access:

  1. 基础的数据访问是 不同步的, compiler 可以自由的优化,cpu 可以自由的 将 数据更改 传播到 其他线程,
  2. 关键的是: 数据访问是 竞争(race)发生的重要方式。
  3. 在没有限制的数据访问 基础之上 是不可能构建出 sync 代码的。
  4. Atomic 告诉 hardware/compiler 程序是 多线程的。
  5. 通过 限制 compiler / hardware 不能做的事情, 来实现 正确的数据访问。 分为 限制 compiler 指令重排 , 以及 hardward 如何将变量更改传播到其他 thread

1.7.6. rust 的 Ordering: 类型

  1. Sequentially Consistent (SeqCst)
  2. Release
  3. Acquire
  4. Relaxed

1.7.7. Sequentially Consistent: 最为安全的模式,将限制Compiler 对于指令的重排,Hardware则将更改完全同步到 其他 thread。代价: 即便是 strongly-ordered cpu 依然 可能需要使用 momory fences(内存栅栏) 来实现

1.7.8. Acquire-Release: 非常适合 lock 的 acquire/release, 确保关键部分内容 不会重叠(overlap)

  1. Acquire: 保证之后的每个访问 都发生在 之后(compiler 不会重排它之后的指令到 之前, 但是可以 将 它之前的访问指令 重排到 它之后)
  2. Release: 保证之前的指令 都发生在它之前(compiler 不会重排它 之前的指令到 之后, 但是可以将 它 之后发生的指令 重排 到它之前)
  3. hardware 保证 对于 相同变量的更改 将由所有 thread 观察到
    use std::sync::Arc;
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::thread;
    
    fn main() {
        let lock = Arc::new(AtomicBool::new(false)); // value answers "am I locked?"
    
        // ... distribute lock to threads somehow ...
    
        // Try to acquire the lock by setting it to true
        while lock.compare_and_swap(false, true, Ordering::Acquire) { }
        // broke out of the loop, so we successfully acquired the lock!
    
        // ... scary data accesses ...
    
        // ok we're done, release the lock
        lock.store(false, Ordering::Release);
    }
    
    
  4. 使用 Acquire 以及 Release 的组合,能够保证在 范围内的 代码 始终在 acquire/realse 范围包围内。

1.7.9. Relaxed: 最弱的atomic order, 但依然是 atomic, read-modify-write 依然是原子操作, 不能够提供 Acquire/Release 提供的保证。对于 weakly-order cpu 非常好用

1.7.10. fense: 栅栏,rust还提供了 fense 操作, 在Arc 的Drop 操作中,使用 来保证 代码顺序的发生

impl<T> Drop for Arc<T> {
    fn drop(&mut self) {
        let inner = unsafe { self.ptr.as_ref() };
        if inner.rc.fetch_sub(1, Ordering::Release) != 1 {
            return;
        }
        // This fence is needed to prevent reordering of the use and deletion
        // of the data.
        atomic::fence(Ordering::Acquire);
        // This is safe as we know we have the last pointer to the `ArcInner`
        // and that its pointer is valid.
        unsafe { Box::from_raw(self.ptr.as_ptr()); }
    }
}

1.8. STD::Thread 标准库中的thread

1.8.1. 1. Builder 用来 自定义Thread name, stack_size 参数配置

1.8.2. 2. ::spawn(f) 经过层层调用, 最终为: spawn_unchecked_ 其 内部 组装 main函数, __rust_begin_short_backtrace(f), 并调用 imp::Thread::new(stack_size, Box::new(main)) //可能为 系统native调用


// just build a Thread struct ,no spawn, 其中 包含 Parker Object
let my_thread = Thread::new(name.map(|name| {
    CString::new(name).expect("thread name may not contain interior null bytes")
}));

let main = move || {
    if let Some(name) = their_thread.cname() {
        imp::Thread::set_name(name);
    }

    crate::io::set_output_capture(output_capture);

    // SAFETY: the stack guard passed is the one for the current thread.
    // This means the current thread's stack and the new thread's stack
    // are properly set and protected from each other.
    thread_info::set(unsafe { imp::guard::current() }, their_thread);
    let try_result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
        crate::sys_common::backtrace::__rust_begin_short_backtrace(f)
    }));
    // SAFETY: `their_packet` as been built just above and moved by the
    // closure (it is an Arc<...>) and `my_packet` will be stored in the
    // same `JoinInner` as this closure meaning the mutation will be
    // safe (not modify it and affect a value far away).
    unsafe { *their_packet.result.get() = Some(try_result) };
};

if let Some(scope_data) = &my_packet.scope {
    scope_data.increment_num_running_threads();
}

Ok(JoinInner {
    // SAFETY:
    //
    // `imp::Thread::new` takes a closure with a `'static` lifetime, since it's passed
    // through FFI or otherwise used with low-level threading primitives that have no
    // notion of or way to enforce lifetimes.
    //
    // As mentioned in the `Safety` section of this function's documentation, the caller of
    // this function needs to guarantee that the passed-in lifetime is sufficiently long
    // for the lifetime of the thread.
    //
    // Similarly, the `sys` implementation must guarantee that no references to the closure
    // exist after the thread has terminated, which is signaled by `Thread::join`
    // returning.
    native: unsafe {
        imp::Thread::new(
            stack_size,
            mem::transmute::<Box<dyn FnOnce() + 'a>, Box<dyn FnOnce() + 'static>>(
                Box::new(main),
            ),
        )?
    },
    thread: my_thread,
    packet: my_packet,
})

1.8.3. 3. thread::park() 的实现, Parker 结构如下:

pub struct Parker {
    state: AtomicUsize, // aotmic 变量
    lock: UnsafeCell<libc::pthread_mutex_t>,// 锁,  对于park()的实现 尤为重要
    cvar: UnsafeCell<libc::pthread_cond_t>,//  条件变量
    // The `pthread` primitives require a stable address, so make this struct `!Unpin`.
    _pinned: PhantomPinned,
}
  1. 所以 Thread::spawn 的整体过程为: 1) builder & spawn, spawn 过程中, native::spawn + 构建 Thread 对象 2) park: 使用 Thread 对象 中的 Parker 对象, 进行 lock 操作 3) unpark
  2. 同 Linux Programing Interface 中的 基本一致 , 只是 将 cvar lock 封装到了 同一个 Thread 中, 方便了 操作, 并扩展了 原生 Thread 的 功能

1.9. STD: mpsc;:channel : 是如何 实现的

1.9.1. 代码整体

pub struct Sender<T> {
  inner: UnsafeCell<Flavor<T>>,
}

pub struct Receiver<T> {
    inner: UnsafeCell<Flavor<T>>,
}

enum Flavor<T> {
    Oneshot(Arc<oneshot::Packet<T>>),
    Stream(Arc<stream::Packet<T>>),
    Shared(Arc<shared::Packet<T>>),
    Sync(Arc<sync::Packet<T>>),
}

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let a = Arc::new(oneshot::Packet::new());
    (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
}

// stream
pub struct Packet<T> {
    // internal queue for all messages
    queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
}

1.9.2. mpsc::channel 将创建 一个 onshot::Packet(), 所以为何怒在 Flavor的多种enum?

1.9.3. Flavor::OneShot() 为 只send(T) 一次 的优化版本, 当第二次 进行send调用时候, 将进行 upgrade 到 Flavor<Stream>

pub fn send(&self, t: T) -> Result<(), SendError<T>> {
    let (new_inner, ret) = match *unsafe { self.inner() } {
        Flavor::Oneshot(ref p) => {
            if !p.sent() { // 只能通过一次 
                return p.send(t).map_err(SendError);
            } else {
                let a = Arc::new(stream::Packet::new()); // Receiver 升级到 Stream::Packet, 这里面 的 memory layout设计 应该比较复杂,难以理解, 是否涉及到 memory 的增长方向呢? 
                let rx = Receiver::new(Flavor::Stream(a.clone()));
                match p.upgrade(rx) { // 进行upgrade
                    oneshot::UpSuccess => {
                        let ret = a.send(t);
                        (a, ret)
                    }
                    oneshot::UpDisconnected => (a, Err(t)),
                    oneshot::UpWoke(token) => {
                        // This send cannot panic because the thread is
                        // asleep (we're looking at it), so the receiver
                        // can't go away.
                        a.send(t).ok().unwrap();
                        token.signal();
                        (a, Ok(()))
                    }
                }
            }
        }
        Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
        Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
        Flavor::Sync(..) => unreachable!(),
    };

    unsafe {
        let tmp = Sender::new(Flavor::Stream(new_inner)); // Sender 进行升级 
        mem::swap(self.inner_mut(), tmp.inner_mut());
    }
    ret.map_err(SendError)
}

1.9.4. 整体逻辑为: 将 替换 Receiver, Sender 中的 inner Receiver { inner: UnsafeCell<Flavor::Oneshot(Arc<oneshot::Packet<T>>)> } -> Receiver { inner: UnsafeCell<Flavor::Stream(Arc<stream::Packet<T>>)> } , sender 类似

1.9.5. stream 即为 queue: 需要详细分析一下:

1.9.6. 代码如下:

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let a = Arc::new(oneshot::Packet::new());
    (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
}

pub struct Sender<T> {
    inner: UnsafeCell<Flavor<T>>,
}

enum Flavor<T> {
    Oneshot(Arc<oneshot::Packet<T>>),
    Stream(Arc<stream::Packet<T>>),
    Shared(Arc<shared::Packet<T>>),
    Sync(Arc<sync::Packet<T>>),
}

// stream::Packet<T>
pub struct Packet<T> {
    // internal queue for all messages
    queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
}

pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> {
    // consumer fields
    consumer: CacheAligned<Consumer<T, ConsumerAddition>>,

    // producer fields
    producer: CacheAligned<Producer<T, ProducerAddition>>,
}
pub(super) struct CacheAligned<T>(pub T);

struct Consumer<T, Addition> {
    tail: UnsafeCell<*mut Node<T>>, // where to pop from
    tail_prev: AtomicPtr<Node<T>>,  // where to pop from
    cache_bound: usize,             // maximum cache size
    cached_nodes: AtomicUsize,      // number of nodes marked as cacheable
    addition: Addition,
}

struct Producer<T, Addition> {
    head: UnsafeCell<*mut Node<T>>,      // where to push to
    first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
    tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
    addition: Addition,
}

struct ProducerAddition {
    cnt: AtomicIsize,       // How many items are on this channel
    to_wake: AtomicPtr<u8>, // SignalToken for the blocked thread to wake up

    port_dropped: AtomicBool, // flag if the channel has been destroyed.
}

struct ConsumerAddition {
    steals: UnsafeCell<isize>, // How many times has a port received without blocking?
}

// so the sender & receiver 's type is:

Sender {
 inner: UnsafeCell<Flavor::Stream<Arc<stream::Packet {
     queue: spsc::Queue {
        consumer: CacheAligned(pub Consumer<T, ConsumerAddition> {
           tail: UnsafeCell<*mut Node<T>>,
           tail_prev: AtomicPtr<Node<T>>,
           cache_bound: usize,
           cached_nodes: AotmicUsize,
           addition: ConsumerAddition,
        })
        producer: Producer<T, ProducerAddition> {
          head: UnsafeCell<*mut Node<T>>,      // where to push to
          first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
          tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
          addition: ProducerAddition,
        }
      },
    }>>>
}

Receiver {
 inner: UnsafeCell<Flavor::Stream<Arc<stream::Packet {
     queue: spsc::Queue {
        consumer: CacheAligned(pub Consumer<T, ConsumerAddition> {
           tail: UnsafeCell<*mut Node<T>>,
           tail_prev: AtomicPtr<Node<T>>,
           cache_bound: usize,
           cached_nodes: AotmicUsize,
           addition: ConsumerAddition,
        })
        producer: Producer<T, ProducerAddition> {
          head: UnsafeCell<*mut Node<T>>,      // where to push to
          first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
          tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
          addition: ProducerAddition,
        }
      },
    }>>>

impl<T> Packet<T> {
    pub fn new() -> Packet<T> {
        Packet {
            queue: unsafe {
                spsc::Queue::with_additions(
                    128,
                    ProducerAddition {
                        cnt: AtomicIsize::new(0),
                        to_wake: AtomicPtr::new(EMPTY),

                        port_dropped: AtomicBool::new(false),
                    },
                    ConsumerAddition { steals: UnsafeCell::new(0) },
                )
            },
        }
    }
  }
}

1.9.7. Sender & Receiver 共享 inner 中的 Arc<stream::Packet>, 主要为queue

1.9.8. Queue: 中 consumer & producer 使用链表, 共享 同样的内存空间

1.9.9. ConsumerAddition, ProducerAddition: ProducerAddition 中比较重要,

  1. 1. 其中的 to_wake 为 SignalToken的指针, 当 Receiver 在 recv() 阻塞时候,会 填充 producer.addition.to_wake, 当producer.send() 时候,会 唤醒 recv
  2. 2. sender.send 并不会 阻塞, 因为 多个 Sender ? 反而 阻塞 receiver.recv ? 为什么 会如此设计?
  3. https://doc.rust-lang.org/std/sync/mpsc/ 查询 文档 可知,
  4. channnel 存在 1) 无bound 即 有 infinite buffer 2) bound buff, 即 pre-allocate fixed buff
  5. bound buff 会阻塞 Sender.send 操作
  6. sync_channle 中的 Buffer 为一个 ring buff
    1. 环形 buff, 只需要 保持 start + size 两个变量, 即可
    2. 1. push/enqueue: 将size + 1, pos = (start + size) / buff.len
    3. 2. pop/denque: 将 pos = start, start += 1, size - 1
    4. 可以保持 size 永远小于 buff.len()
    5. start 为 首, (start + size) / buff.len() 为 尾部
    6. 但是 该种 实现, 导致 每次操作 都需要 lock的帮助, 实现完全的互斥, 才能 保证操作完整
      impl<T> Buffer<T> {
          fn enqueue(&mut self, t: T) {
              let pos = (self.start + self.size) % self.buf.len();
              self.size += 1;
              let prev = mem::replace(&mut self.buf[pos], Some(t));
              assert!(prev.is_none());
          }
      
          fn dequeue(&mut self) -> T {
              let start = self.start;
              self.size -= 1;
              self.start = (self.start + 1) % self.buf.len();
              let result = &mut self.buf[start];
              result.take().unwrap()
          }
      
          fn size(&self) -> usize {
              self.size
          }
          fn capacity(&self) -> usize {
              self.buf.len()
          }
      }
      

1.9.10.

2. Error handle, rust 是如何处理error的?

2.1. ? : operator expression & From 机制:

参考资料: ? 详细解释 rfc stackoverflow ? 主要为了解决 Error throw 的问题,即是: 一个函数 声明返回 一个Type,那个返回其subType 依然是可以的, rust中使用 From trait 来实现

let mut f = File::open("foo.txt")?;

//等价于下面
let mut f = match File::Open("foo.txt") {
    Ok(v) => v,
    Err(e) => return Err(From::from(e)) // 这里进行了 From::from(e) 的转转换调用,
};

2.2. std: 标准库 中是如何使用 实现的?

2.2.1. Result core 中自定义了 Result 为一个 enum, 而并没有定义E 的类型

2.2.2. std 库 定义了 多种 Error 类型, 包括 std::io::Error, std::fmt::Error, std::num::ParseIntError … etc,每种 自定义Error 不尽相同:

2.2.3. std::error: 提供 构建、 handle、 report、 propagating Rust 错误, 构建(表示 ) 即为: Result, error trait , custome types & failure 处理模式:

2.2.4. trait: Error : Debug + Display {} 表达错误的基本结构, description(), cause(), type_id() etc…

2.2.5. Box<dyn Error + 'a> {fn from(err: E) -> Box<dyn Error + 'a> {Box::new(err)};

2.2.6. Report: 提供 error 的格式化工作, (通过配置 来控制 )

2.3. Result::unwrap/expect 的区别: expect 接受参数, 来自定义 panic时候的 错误输出

2.4. error-chain: impl details:

    error_chain! {
        foreign_links {
            IoErr(std::io::Error);
        }

        // 定义新的  ErrorKind enum; enum { InvalidToken(String) }
        errors {
            InvalidToken(t: String) {
                description("invalid token")
                display("invalid token display")
            }
        }
    }
// 展开如下 expand
#[doc = " The Error type."]
#[doc = ""]
#[doc = " This tuple struct is made of two elements:"]
#[doc = ""]
#[doc = " - an `ErrorKind` which is used to determine the type of the error."]
#[doc = " - An internal `State`, not meant for direct use outside of `error_chain`"]
#[doc = "   internals, containing:"]
#[doc = "   - a backtrace, generated when the error is created."]
#[doc = "   - an error chain, used for the implementation of `Error::cause()`."]
#[derive(Debug)]
pub struct Error(#[doc = " The kind of the error."]
    pub ErrorKind, #[doc = " Contains the error chain and the backtrace."]
    #[doc(hidden)]
    pub $crate::State, //// State 的作用是什么
);

impl $crate::ChainedError for Error {
  type ErrorKind = ErrorKind;
  fn new(kind:ErrorKind,state:$crate::State) -> Error {
    Error(kind,state)
  }
  fn from_kind(kind:Self::ErrorKind) -> Self {
    Self::from_kind(kind)
  }
  fn with_chain<E,K>(error:E,kind:K) -> Self where E: ::std::error::Error+Send+ 'static,K:Into<Self::ErrorKind>{
    Self::with_chain(error,kind)
  }
  fn kind(&self) ->  &Self::ErrorKind {
    self.kind()
  }
  fn iter(&self) -> $crate::Iter {
    $crate::Iter::new(Some(self))
  }
  fn chain_err<F,EK>(self,error:F) -> Self where F:FnOnce() -> EK,EK:Into<ErrorKind>{
    self.chain_err(error)
  }
  fn backtrace(&self) -> Option<&$crate::Backtrace>{
    self.backtrace()
  }
  #[allow(unknown_lints,renamed_and_removed_lints,bare_trait_objects)]
  #[allow(unused_doc_comment,unused_doc_comments)]
  fn extract_backtrace(e: &(::std::error::Error+Send+ 'static)) -> Option<$crate::InternalBacktrace>{
    if let Some(e) = e.downcast_ref::<Error>(){
      return Some(e.1.backtrace.clone());

    }None
  }

}
#[allow(dead_code)]
impl Error {
  #[doc = " Constructs an error from a kind, and generates a backtrace."]
  pub fn from_kind(kind:ErrorKind) -> Error {
    Error(kind,$crate::State::default(),)
  }
  #[doc = " Constructs a chained error from another error and a kind, and generates a backtrace."]
  pub fn with_chain<E,K>(error:E,kind:K) -> Error where E: ::std::error::Error+Send+ 'static,K:Into<ErrorKind>{
    Error::with_boxed_chain(Box::new(error),kind)
  }
  #[doc = " Construct a chained error from another boxed error and a kind, and generates a backtrace"]
  #[allow(unknown_lints,bare_trait_objects)]
  pub fn with_boxed_chain<K>(error:Box<::std::error::Error+Send>,kind:K) -> Error where K:Into<ErrorKind>{
    Error(kind.into(),$crate::State::new::<Error>(error,),)
  }
  #[doc = " Returns the kind of the error."]
  pub fn kind(&self) ->  &ErrorKind {
    &self.0
  }
  #[doc = " Iterates over the error chain."]
  pub fn iter(&self) -> $crate::Iter {
    $crate::ChainedError::iter(self)
  }
  #[doc = " Returns the backtrace associated with this error."]
  pub fn backtrace(&self) -> Option<&$crate::Backtrace>{
    self.1.backtrace()
  }
  #[doc = " Extends the error chain with a new entry."]
  pub fn chain_err<F,EK>(self,error:F) -> Error where F:FnOnce() -> EK,EK:Into<ErrorKind>{
    Error::with_chain(self,Self::from_kind(error().into()))
  }
  #[doc = " A short description of the error."]
  #[doc = " This method is identical to [`Error::description()`](https://doc.rust-lang.org/nightly/std/error/trait.Error.html#tymethod.description)"]
  pub fn description(&self) ->  &str {
    self.0.description()
  }

 }
impl ::std::error::Error for Error {
  #[cfg(not(has_error_description_deprecated))]
  fn description(&self) ->  &str {
    self.description()
  }
  #[allow(unknown_lints,renamed_and_removed_lints,bare_trait_objects)]
  #[allow(unused_doc_comment,unused_doc_comments)]
  fn source(&self) -> Option<&(std::error::Error+ 'static)>{
    match self.1.next_error {
      Some(ref c) => Some(&**c),
      None => {
        match self.0 {
          ErrorKind::IoErr(ref foreign_err) => {
            foreign_err.source()
          }
          _ => None
          }
      }
    }
  }
}

#[doc = " The kind of an error."]
#[derive(Debug)]
pub enum ErrorKind { //// Error中 为什么不直接 定义enum, 而是使用ErrorKind? 为了设计吧, std中也存在类似的ErrorKind 在io::error 中
  IoErr(std::io::Error), #[doc = " A convenient variant for String."]
  Msg(String),
  InvalidToken(String), #[doc(hidden)]
  __Nonexhaustive{}
}

// Error 的From  impl, 可以将 std error 类型转换为 Error, 在函数中 可以直接抛出 std Error
impl ::std::fmt::Display for Error {
  fn fmt(&self,f: &mut::std::fmt::Formatter) ->  ::std::fmt::Result {
    ::std::fmt::Display::fmt(&self.0,f)
  }
}
impl From<std::io::Error>for Error {
  fn from(e:std::io::Error) -> Self {
    Error::from_kind(ErrorKind::IoErr(e))
  }

}

impl From<ErrorKind>for Error {
  fn from(e:ErrorKind) -> Self {
    Error::from_kind(e)
  }
}

impl < 'a>From<& 'a str>for ErrorKind {
  fn from(s: & 'a str) -> Self {
    ErrorKind::Msg(s.into())
  }

  }
impl From<String>for ErrorKind {
  fn from(s:String) -> Self {
    ErrorKind::Msg(s)
  }

}
// 简化 Error的产生,使用 str, String 转化为 Error, 方便异常的throw, 因为Error 由 ErrorKind 构成,所以需要实现 From<&str> for ErrorKind
impl < 'a>From<& 'a str>for Error {
  fn from(s: & 'a str) -> Self {
    Self::from_kind(s.into())
  }
}

impl From<String>for Error {
  fn from(s:String) -> Self {
    Self::from_kind(s.into())
  }
}

// format display
#[allow(unknown_lints,unused,renamed_and_removed_lints)]
#[allow(unused_doc_comment,unused_doc_comments)]
impl ::std::fmt::Display for ErrorKind {
  fn fmt(&self,fmt: &mut::std::fmt::Formatter) ->  ::std::fmt::Result {
    match*self {
      ErrorKind::IoErr(ref err) => {
        let display_fn = |_,f: &mut::std::fmt::Formatter|{
          f.write_fmt(unsafe {
            std::fmt::Arguments::new_v1(&[], &[std::fmt::ArgumentV1::new(&(err),std::fmt::Display::fmt),])
          })
        };
        display_fn(self,fmt)
      }
      #[doc = " A convenient variant for String."]
      ErrorKind::Msg(ref s) => {
        let display_fn = |_,f: &mut::std::fmt::Formatter|{
          f.write_fmt(unsafe {
            std::fmt::Arguments::new_v1(&[], &[std::fmt::ArgumentV1::new(&(s),std::fmt::Display::fmt),])
          })
        };
        display_fn(self,fmt)
      }
      ErrorKind::InvalidToken(ref t) => {
        let display_fn = |_,f: &mut::std::fmt::Formatter|{
          f.write_fmt(unsafe {
            std::fmt::Arguments::new_v1(&[], &[])
          })
        };
        display_fn(self,fmt)
      }
      _ => Ok(())
      }
  }

  }
#[allow(unknown_lints,unused,renamed_and_removed_lints)]
#[allow(unused_doc_comment,unused_doc_comments)]
impl ErrorKind {
  #[doc = " A string describing the error kind."]
  pub fn description(&self) ->  &str {
    match*self {
      ErrorKind::IoErr(ref err) => {
        ("")
      }
      #[doc = " A convenient variant for String."]
      ErrorKind::Msg(ref s) => {
        (&s)
      }
      ErrorKind::InvalidToken(ref t) => {
        "invalid token"
      }
      _ => "",

      }
  }

  }
impl From<Error>for ErrorKind {
  fn from(e:Error) -> Self {
    e.0
  }

}
#[doc = " Additional methods for `Result`, for easy interaction with this crate."]
pub trait ResultExt<T>{
  #[doc = " If the `Result` is an `Err` then `chain_err` evaluates the closure,"]
  #[doc = " which returns *some type that can be converted to `ErrorKind`*, boxes"]
  #[doc = " the original error to store as the cause, then returns a new error"]
  #[doc = " containing the original error."]
  fn chain_err<F,EK>(self,callback:F) ->  ::std::result::Result<T,Error>where F:FnOnce() -> EK,EK:Into<ErrorKind>;


  }impl <T,E>ResultExt<T>for::std::result::Result<T,E>where E: ::std::error::Error+Send+ 'static{
  fn chain_err<F,EK>(self,callback:F) ->  ::std::result::Result<T,Error>where F:FnOnce() -> EK,EK:Into<ErrorKind>{
    self.map_err(move|e|{
      let state = $crate::State::new::<Error>(Box::new(e),);
      $crate::ChainedError::new(callback().into(),state)
    })
  }

  }
impl <T>ResultExt<T>for::std::option::Option<T>{
  fn chain_err<F,EK>(self,callback:F) ->  ::std::result::Result<T,Error>where F:FnOnce() -> EK,EK:Into<ErrorKind>{
    self.ok_or_else(move||{
      $crate::ChainedError::from_kind(callback().into())
    })
  }

#[doc = " Convenient wrapper around `std::Result`."]
#[allow(unused)]
pub type Result<T>  =  ::std::result::Result<T, Error>;

2.5. error_chain! macro 主要有3步骤:

2.5.1. 1. 定义: struct Error, enum ErrorKind, Error(ErrorKind, State)

其中 ErrorKind 最为重要, 基本所有的转换都通过 ErrorKind 发生

enum pub enum ErrorKind {
  IoErr(std::io::Error),
  Msg(String),
  InvalidToken(String),
}

2.5.2. 2. impl From<xxx> for Error {}, 实现 ?

impl <'a>From<&'a str>for Error {
  fn from(s: &'a str) -> Self {
    Self::from_kind(s.into())
  }
}
impl From<String>for Error {
  fn from(s:String) -> Self {
    Self::from_kind(s.into())
  }
}

impl From<std::io::Error>for Error {
  fn from(e:std::io::Error) -> Self {
    Error::from_kind(ErrorKind::IoErr(e))
  }
 }
impl From<std::num::ParseIntError>for Error {
  fn from(e:std::num::ParseIntError) -> Self {
    Error::from_kind(ErrorKind::ParseErr(e))
  }
}

impl <'a>From<&'a str>for ErrorKind {
  fn from(s: &'a str) -> Self {
    ErrorKind::Msg(s.into())
  }

}
impl From<String>for ErrorKind {
  fn from(s:String) -> Self {
    ErrorKind::Msg(s)
  }
}

2.5.3. 3. 重新定义 Result: pub type Result<T> = ::std::result::Result<T,Error>;

2.6. error_chain! {} 中各个组成部分: 通过 expand-macro 来进行分析

  1. rust macro 可以写到 如此复杂的地步
  2. error-chain 通过macro 生成了大量的 样板式 代码

2.6.1. types: 从macro的展开结果看: 为 自定义 Error, ErrorKind, Result 名称, 不应该自定义 Result名字,而应该使用 Result

error_chain! {
    types {
        MyError, MyErrorKind, MyResult;
    }
}
// expand 展开如下:

pub struct MyError(
  pub MyErrorKind,
  pub $crate::State,
);

pub enum MyErrorKind {
  Msg(String),
  __Nonexhaustive{}

}

2.6.2. errors: 为 ErrorKind 增加 ErrorKind 类型, 如下: 主要为 chain_err(error: F) F: FnOnce() -> EK: Into<ErrorKind> 使用

  error_chain! {
      foreign_links {
          IoErr(std::io::Error);
          ParseErr(std::num::ParseIntError);
      }

      // 定义新的  ErrorKind enum; enum { InvalidToken(String) }
      errors {
          InvalidToken(t: String) {
              description("invalid token")
              display("invalid token display")
          }
      }
  }

 errors {
     InvalidToken(t: String) {
         description("invalid token")
         display("invalid token display")
     }
 }
pub enum ErrorKind {
  IoErr(std::io::Error), #[doc = " A convenient variant for String."]
  Msg(String),InvalidToken(String), #[doc(hidden)]
  __Nonexhaustive{}

}
xx.chain_err(|| ErrorKind::InvalidToken("xx".to_string()))

2.6.3. foreign_links: 实现 foreign error(外部的error定义) 到 ErrorKind的转换:

 error_chain! {
     foreign_links {
         IoErr(std::io::Error);
         ParseErr(std::num::ParseIntError);
     }
 }

// expand 如下:
impl From<std::io::Error>for Error {
  fn from(e:std::io::Error) -> Self {
    Error::from_kind(ErrorKind::IoErr(e))
  }
}

impl From<std::num::ParseIntError>for Error {
  fn from(e:std::num::ParseIntError) -> Self {
    Error::from_kind(ErrorKind::ParseErr(e))
  }
}

2.6.4. ErrorKind::Msg 为 方便 使用 &str, String 定义error 而定义的ErrorKind Variant,

impl < 'a>From<& 'a str>for ErrorKind {
  fn from(s: & 'a str) -> Self {
    ErrorKind::Msg(s.into())
  }
}
impl From<String>for ErrorKind {
  fn from(s:String) -> Self {
    ErrorKind::Msg(s)
  }
}

2.6.5. links: 将其他的 error_chain 中定义的 Error 转换到 当前的Error中

pub mod errors {
    // Create the Error, ErrorKind, ResultExt, and Result types
    error_chain! {
        foreign_links {
            IoErr(std::io::Error);
            ParseErr(std::num::ParseIntError);
        }
}

error_chain! {
  links {
    Other(errors::Error, errors::ErrorKind)
  }
}

// expand 如下
pub enum ErrorKind {
  Other(errors::ErrorKind)
  Msg(String), #[doc(hidden)]
  __Nonexhaustive{}
}

impl From<errors::Error>for Error {
  fn from(e:errors::Error) -> Self {
    Error(ErrorKind::Other(e.0),e.1,)
  }
}

impl From<errors::ErrorKind>for ErrorKind {
  fn from(e:errors::ErrorKind) -> Self {
    ErrorKind::Other(e)
  }
}

3. Rust diesel 还有 InsertAble 未掌握

3.1. 其中table! macro 的作用 在于 其提供的dsl模块, 其中可以 方便的构建 query_dsl::QueryDsl,即是sql 构建工厂,

3.2. diesel::query_dsl::RunQueryDsl 则 实现了 load, execute, get_result 函数,来获取sql查询结果。

3.3. QueryDsl 已经实现了 RunQueryDsl

3.4. 示例代码为:

table! {
    arena_store_realms (id) {
        id -> Integer,
        arena_store_id -> Integer,
        realm_id -> Integer,
    }
}


#[derive(Queryable, Debug)]
pub struct ArenaStoreRealms {
    pub id: i32,
    pub arena_store_id: i32,
    pub realm_id: i32
}

use arena_store_realms::dsl::*;
let result = arena_store_realms.filter(id.eq(1)).first(&conn);

3.5. Insertable, macro: diesel::impl_Insertable

4. Rust redis;

4.1. 理解 其 hget command, 其中的impl_commands macro 是如何展开的,能够自动的 添加返回值

4.2. redis 主要有

4.2.1. types(redis 数据类型 到rust 类型的映射), client, connection, cmd, pipeline

4.2.2. connection: 实现了 连接、操作redis的能力

4.2.3. types: 如何在redis typeless 到 rust type 之间进行转换

4.2.4. macro 的使用方法

4.3. Cmd: struct:

pub struct Cmd {
    data: Vec<u8>, // 主要 字段, 存储 commands 的 字符编码
    // Arg::Simple contains the offset that marks the end of the argument
    args: Vec<Arg<usize>>,
    cursor: Option<u64>,
}

4.3.1. 构建Cmd 可以使用 default/cmd("get")

4.3.2. Cmd 直接 get,hget etc… 命令在

4.3.3. 为Cmd 增加方法的 主要有 impl Cmd; impl RedisWrite; impl Default, 以及重要的 implement_commands! macro 展开形式如下:

   impl Cmd {
     #[doc = " Get the value of a key.  If key is a vec this becomes an `MGET`."]
     #[allow(clippy::extra_unused_lifetimes,clippy::needless_lifetimes)]
     pub fn get<'a,K:ToRedisArgs>(key:K) -> Self {
       ::std::mem::replace({
         cmd(if key.is_single_arg(){
           "GET"
         }else {
           "MGET"
         }).arg(key)
       },Cmd::new())
     }
     #[doc = " Gets all keys matching pattern"]
     #[allow(clippy::extra_unused_lifetimes,clippy::needless_lifetimes)]
     pub fn keys<'a,K:ToRedisArgs>(key:K) -> Self {
       ::std::mem::replace({
         cmd("KEYS").arg(key)
       },Cmd::new())
     }
  }
pub trait Commands:ConnectionLike+Sized {
  #[doc = " Get the value of a key.  If key is a vec this becomes an `MGET`."]
  #[inline]
  #[allow(clippy::extra_unused_lifetimes,clippy::needless_lifetimes)]
  fn get<'a,K:ToRedisArgs,RV:FromRedisValue>(&mut self,key:K) -> RedisResult<RV>{
    Cmd::get(key).query(self)
  }
}

4.4. 如何 执行 redis 命令?

  1. 1. 构建 Cmd: 可以使用 Cmd::get("xxx") 来进行Cmd的构建, 然后 通过 Cmd.query(con: &mut dyn ConnectionLike) 来执行
    1. 如何 execute cmd?: 在 impl Cmd 中的 pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {} 代码如下:
      impl Cmd {
          pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
              match con.req_command(self) {
                  Ok(val) => from_redis_value(&val),
                  Err(e) => Err(e),
              }
          }
          pub fn get_packed_command(&self) -> Vec<u8> {
              let mut cmd = Vec::new();
              self.write_packed_command(&mut cmd);
              cmd
          }
      
          pub(crate) fn write_packed_command(&self, cmd: &mut Vec<u8>) {
              write_command_to_vec(cmd, self.args_iter(), self.cursor.unwrap_or(0))
          }
      
          fn write_command_to_vec<'a, I>(cmd: &mut Vec<u8>, args: I, cursor: u64)
          where
              I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
          {
              let totlen = args_len(args.clone(), cursor);
      
              cmd.reserve(totlen);
      
              write_command(cmd, args, cursor).unwrap()
          }
      
          fn write_command<'a, I>(cmd: &mut (impl ?Sized + io::Write), args: I, cursor: u64) -> io::Result<()>
          where
              I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
          {
              let mut buf = ::itoa::Buffer::new();
      
              cmd.write_all(b"*")?;
              let s = buf.format(args.len());
      
              // ....
          }
      
          pub trait ConnectionLike {
              fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
                  let pcmd = cmd.get_packed_command();
                  self.req_packed_command(&pcmd)
              }
          }
      
          impl ConnectionLike for Connection {
              fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
                  if self.pubsub {
                      self.exit_pubsub()?;
                  }
      
                  self.con.send_bytes(cmd)?;
                  self.read_response()
              }
          }
      
      impl ConnectionLike for Client {
          fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
              self.get_connection()?.req_packed_command(cmd)
          }
      
          fn req_packed_commands(
              &mut self,
              cmd: &[u8],
              offset: usize,
              count: usize,
          ) -> RedisResult<Vec<Value>> {
              self.get_connection()?
                  .req_packed_commands(cmd, offset, count)
          }
      
          fn get_db(&self) -> i64 {
              self.connection_info.redis.db
          }
      
          fn check_connection(&mut self) -> bool {
              if let Ok(mut conn) = self.get_connection() {
                  conn.check_connection()
              } else {
                  false
              }
          }
      
          fn is_open(&self) -> bool {
              if let Ok(conn) = self.get_connection() {
                  conn.is_open()
              } else {
                  false
              }
          }
      }
      
      }
      
    2. 使用方法为 let res: RedisResult<i64> = cmd.query(&conn);
    3. 所以 cmd的 query 执行的主要路径 为:
      1. 1. 通过conn 进行 实际的 request发送, conn.req_command(Cmd)
      2. 2. req_command 透过 cmd.get_packed_command()( 将控制权 又转移到 Cmd) 将 Cmd 转化为字节流, 这其中 顺序调用 write_packed_command(Vec) , write_command_to_vec -> write_command
      3. 3. 调用 ConnectionLike 的 req_packed_command, 然后 调用 conn.req_packed_
  2. 2. impl Commands: (commands 中含有redis 命令,并 直接返回 RedisResult), 在 commands/mod.rs 中 存在 Commands blanket impl, 所以任何实现了 ConnectionLike 的 struct 都可以 直接调用 redis methods like get, hget etc…
    1. Client:
      let mut cli = redis::Client::open("redis://127.0.0.1:6100/1").unwrap();
      let val: RedisResult<Option<String>> = cli.get("xx");
      
    2. Connection:
       let client = redis::Client::open("redis://127.0.0.1/")?;
       let mut con = client.get_connection()?;
      let count : i32 = con.get("my_counter")?;
      
    3. Cmd.query(&mut dyn ConnectionLike)
      // first way 构建  Cmd 方法
      let mut cmd = redis::cmd("get").arg("xx");
      let val: RedisResult<Option<i64>> =
          cmd.query(&mut cli);
      
      // second way
      let mut cmd = Cmd::get("xx");
      let val: RedisResult<Option<i64>> =
         cmd.query(&mut cli);
      
      // cmd 支持 get 转换为 get /  mget 方法
      let res: RedisResult<Vec<Option<String>>> = Cmd::get(&["xx", "age"]).query(&mut cli);
      

4.5. types: 类型转换

4.5.1. ToRedisValue: 不需要看太多内容

4.5.2. FromRedisValue: 大多 都已 macro 完成: 以 i8 为例:

4.5.3. 需要注意 nil 的转换, 在对 i8 的转换代码中, 并没有对 Value::Nil 的匹配

4.5.4. 即简单命令 let res: RedisResult<i64> = cli.get("xx"); 如果xx 没有设定数值的话, 会导致 一直返回错误, 正确的方法是是 let res: RedisResult<Option<i64>> = cli.get("xx");

4.5.5. 但是: HashMap<String, String>, 以及 Vec, Set 等集合类, 这是一个例外, 即便是 没有 key, 依然返回一个 empty 的集合

pub enum Value {
    /// A nil response from the server.
    Nil,
    /// An integer response.  Note that there are a few situations
    /// in which redis actually returns a string for an integer which
    /// is why this library generally treats integers and strings
    /// the same for all numeric responses.
    Int(i64),
    /// An arbitary binary data.
    Data(Vec<u8>),
    /// A bulk response of more data.  This is generally used by redis
    /// to express nested structures.
    Bulk(Vec<Value>),
    /// A status response.
    Status(String),
    /// A status response which represents the string "OK".
    Okay,
}

from_redis_value_for_num!(i8); // 展开结构如下:
impl FromRedisValue for i8 {
  fn from_redis_value(v: &Value) -> RedisResult<i8>{
    {
      let v = v;
      match*v {
        Value::Int(val) => Ok(val as i8),
        Value::Status(ref s) => match s.parse::<i8>(){
          Ok(rv) => Ok(rv),
          Err(_) => {
            return Err(::std::convert::From::from((RedisError::from((ErrorKind::TypeError,"Response was of incompatible type",{
              let res = $crate::fmt::format(unsafe {
                std::fmt::Arguments::new_v1(&[], &[std::fmt::ArgumentV1::new(&("Could not convert from string."),std::fmt::Display::fmt),std::fmt::ArgumentV1::new(&(v),std::fmt::Display::fmt),])
              });
              res
            },)))))
          },

          },
        Value::Data(ref bytes) => match from_utf8(bytes)?.parse::<i8>(){
          Ok(rv) => Ok(rv),
          Err(_) => {
            return Err(::std::convert::From::from((RedisError::from((ErrorKind::TypeError,"Response was of incompatible type",{
              let res = $crate::fmt::format(unsafe {
                std::fmt::Arguments::new_v1(&[], &[std::fmt::ArgumentV1::new(&("Could not convert from string."),std::fmt::Display::fmt),std::fmt::ArgumentV1::new(&(v),std::fmt::Display::fmt),])
              });
              res
            },)))))
          },

          },
        _ => {
          return Err(::std::convert::From::from((RedisError::from((ErrorKind::TypeError,"Response was of incompatible type",{
            let res = $crate::fmt::format(unsafe {
              std::fmt::Arguments::new_v1(&[], &[std::fmt::ArgumentV1::new(&("Response type not convertible to numeric."),std::fmt::Display::fmt),std::fmt::ArgumentV1::new(&(v),std::fmt::Display::fmt),])
            });
            res
          },)))))
        },

        }
    }
  }

  }
  1. 更过细节在 Connection.read_response 中, 并调用 self.parser.parse_value(sock) 即: 读取字节流 并进行转换, 代码如下:
        fn read_response(&mut self) -> RedisResult<Value> {
            let result = match self.con {
                ActualConnection::Tcp(TcpConnection { ref mut reader, .. }) => {
                    self.parser.parse_value(reader)
                }
                #[cfg(feature = "tls")]
                ActualConnection::TcpTls(ref mut boxed_tls_connection) => {
                    let reader = &mut boxed_tls_connection.reader;
                    self.parser.parse_value(reader)
                }
                #[cfg(unix)]
                ActualConnection::Unix(UnixConnection { ref mut sock, .. }) => {
                    self.parser.parse_value(sock)
                }
            };
            // shutdown connection on protocol error
            if let Err(e) = &result {
                let shutdown = match e.as_io_error() {
                    Some(e) => e.kind() == io::ErrorKind::UnexpectedEof,
                    None => false,
                };
                if shutdown {
                    match self.con {
                        ActualConnection::Tcp(ref mut connection) => {
                            let _ = connection.reader.shutdown(net::Shutdown::Both);
                            connection.open = false;
                        }
                        #[cfg(feature = "tls")]
                        ActualConnection::TcpTls(ref mut connection) => {
                            let _ = connection.reader.shutdown();
                            connection.open = false;
                        }
                        #[cfg(unix)]
                        ActualConnection::Unix(ref mut connection) => {
                            let _ = connection.sock.shutdown(net::Shutdown::Both);
                            connection.open = false;
                        }
                    }
                }
            }
            result
        }
    
    impl Parser {
        /// Creates a new parser that parses the data behind the reader.  More
        /// than one value can be behind the reader in which case the parser can
        /// be invoked multiple times.  In other words: the stream does not have
        /// to be terminated.
        pub fn new() -> Parser {
            Parser {
                decoder: combine::stream::decoder::Decoder::new(),
            }
        }
    
        // public api
    
        /// Parses synchronously into a single value from the reader.
        pub fn parse_value<T: Read>(&mut self, mut reader: T) -> RedisResult<Value> {
            let mut decoder = &mut self.decoder;
            //转换为了  macro调用
            let result = combine::decode!(decoder, reader, value(), |input, _| {
                combine::stream::easy::Stream::from(input)
            });
            match result {
                Err(err) => Err(match err {
                    combine::stream::decoder::Error::Io { error, .. } => error.into(),
                    combine::stream::decoder::Error::Parse(err) => {
                        if err.is_unexpected_end_of_input() {
                            RedisError::from(io::Error::from(io::ErrorKind::UnexpectedEof))
                        } else {
                            let err = err
                                .map_range(|range| format!("{:?}", range))
                                .map_position(|pos| pos.translate_position(decoder.buffer()))
                                .to_string();
                            RedisError::from((ErrorKind::ResponseError, "parse error", err))
                        }
                    }
                }),
                Ok(result) => result,
            }
        }
    }
    

4.6. cmd 到 redis命令格式的转换: 主要集中在 write_command 中, 代码如下:

fn write_command<'a, I>(cmd: &mut (impl ?Sized + io::Write), args: I, cursor: u64) -> io::Result<()>
where
    I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
{
    let mut buf = ::itoa::Buffer::new();

    cmd.write_all(b"*")?;
    let s = buf.format(args.len());
    cmd.write_all(s.as_bytes())?;
    cmd.write_all(b"\r\n")?;

    let mut cursor_bytes = itoa::Buffer::new();
    for item in args {
        let bytes = match item {
            Arg::Cursor => cursor_bytes.format(cursor).as_bytes(),
            Arg::Simple(val) => val,
        };

        cmd.write_all(b"$")?;
        let s = buf.format(bytes.len());
        cmd.write_all(s.as_bytes())?;
        cmd.write_all(b"\r\n")?;

        cmd.write_all(bytes)?;
        cmd.write_all(b"\r\n")?;
    }
    Ok(())
}

5. Pin & UnPin, Future

5.1. 需要重新读一下: https://doc.rust-lang.org/std/pin/index.html 因为 Box::pin(t) 可以将 T: Pin, 变为 unpin, 因为 Pointer 为 unpin.

5.2. Pin 的定义: Pin wrap pointers type, guarantee the value behind the pointer, won't be moved 即: Pin<P<T>> , 中的 T 不给交换(mem::swap) 移动

5.3. Pin 的意义: 即解决 self-ref的 结构体的 内存移动。

5.4. Pin & UnPin 类型涉及到 Struct 中存在 自引用(self ref) 指针 的结构。

5.4.1. 因为 该种结构 可能需要move,即: 使用 mem::swap(one, two), 该函数接受任何类型的 &mut ref, 使用 memcpy 对内存进行交换。

5.4.2. memcpy 并不适合 对于 包含 self-ref 的结构进行 交换, 将导致 指针错乱,详见: https://rust-lang.github.io/async-book/04_pinning/01_chapter.html#pinning

5.5. 重读:https://rust-lang.github.io/async-book/04_pinning/01_chapter.html#pinning

5.5.1. 其中有对 为什么 会Future要求Pin 的要求,进行了基本的解释, 比如 async {} 转换为 类似对应的 Future(其中包含了 self-ref ) example:

async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);

}

//compile to below like
struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // points to `x` below
}

struct AsyncFuture {
    x: [u8; 128],
    read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
}

//AsyncFuture 包含 内部 fut 需要的 变量 x, ReadIntoBuf 则 包含指向 AsyncFuture 的指针,即: 构成了 Self-ref 结构。
//即:如果将 AsyncFuture swap出去, 将导致 指针的混乱。

5.5.2. Pin wrap pointers type, guarantee the value behind the pointer, won't be moved, T: !Unpin then Pin<&mut T>, Pin<&T>, Pin<Box<T>> 保证T不被移动,

5.5.3. Future/Stream 从 Pin 到Unpin 的转换: Box::pin(Fut) => Pin<Box<Fut>>, pin_utils::pin_mut!(fut) => Pin<&mut Fut> 即 => 后的实现 都是impl Unpin

5.5.4. summary:

  1. if T: Unpin then Pin<'a, T> impl Unpin
  2. T: !Unpin, &mut T => Pin<T> 的转换 需要unsafe 代码
  3. 大部分的type 以及有这些type组成的struct impl Unpin, async/await 自动生成的 Future 是一个意外
  4. 实现!Unpin 的方法: 1) impl !Unpin 在 nightly + feature flag 2) 添加 std::marker::PhantomPinned field到 struct
  5. Pin 在stack 的var,需要使用unsafe code
  6. Pin 在heap上的var, 可以使用 Box::pin
  7. For pinned data where T: !Unpin you have to maintain the invariant that its memory will not get invalidated or repurposed from the moment it gets pinned until when drop is called. This is an important part of the pin contract.

5.6. Pin 并没有对 mem::swap 做特殊处理,因为swap 要求 &mut T, 所以, Pin来prevent &mut T 的导出即可。

5.6.1. 1. 测试一下,直接 mem::swap(&mut Pin<T>, &mut Pin<T>) ;

5.6.2. 2. 基本上所有的type 都会自动实现 Unpin, Future 是一个意外(需要查找 async function rust auto generator的示例 才能更深入的理解)

5.6.3. 3. T: is Unpin, then Pin<Box<T>>, Box<T>, Pin<&mut T>, &mut T 都是 Unpin 的

5.6.4. 4. Pin<Box<T>>

5.6.5.

5.8. ? 所以Pin 的要求是什么?呢?,如果 要求 Pin<Future<Output = T>> ?

5.9. Test:

5.9.1. mem::swap(&mut Pin<Box<T>>, &mut Pin); T: !Unpin 可以的

6. ?? 如何构建一个 delegate_to macro库:

6.1. 1. 肯定是需要借助 macro构建

6.2. 2. 如何动态的进行一个 信息收集功能呢? (这里面必然涉及到 一个动态分发的功能的实现) 举例说明:

struct PersonInner {

}
impl PersonInner {
  fn test() {}
}

#[derive(TestMacro)]
struct Person {
  inner: PersonInner,
}

impl Person {

  #[delegate_to()]
  fn test() {}

  #[delegate_to()]
  fn test_1(one: i32) {}

  #[delegate_to()] // 如何实现 函数签名的 自动填充呢?
  fn test_1(one: i32, i) {}
}

6.3. 3. 为什么没有使用 Deref trait? 因为Deref 中的target 只能有一个, 不能在一个 包含多个field的 struct中,同时 实现 Deref<One> , Deref<Two>, 即如下:

struct PersonInner {

}
impl PersonInner {
  fn test() {}
}

struct PersonOuter {

}
impl PersonOuter {
  fn fuck() {}
}


#[derive(TestMacro)]
struct Person {
  inner: PersonInner,
  outer: PersonOuter,
}
impl Deref for Person {
   type Target = PersonInner;
   fn deref(&self) -> Self::Target {
      &self.inner
   }
}

//error, conflict impl Deref
impl Deref for Person {
  type Target = PersonOuter;
}

impl Person {

}

7. Async

7.1. 需要查看的相关 文章/github:

7.8. book:

8. Async impl:

8.1. 200 line rust async: 讲解: 代码在: rust_test/async_test/src/main.rs:193

  • 在 main 函数中, 创建 Reactor, 在 block_on 中创建 Waker, 然后调用 Future::poll(Future, waker)
  • Future Trait 定义了 fn poll() 方法,
  • Future::poll 内容为: Task 根据 reactor中自己的 state, 然后决定 .regist, 还是 等待,
  • Reactor::regist 之后,将 waker,以及 task.id, 放倒入 self.tasks 中, 并spawn进行计时, 计时之后执行的代码为, reactor.lock().wake(task.id), reactor.wake 方法简单的 将 thread::unpark 即完成了工作.

8.2. Waker & Context 结构: Waker 最为重要, Waker 在简单的设计中 的操作 即是: 唤醒 thread

  • struct Waker: 构建方式 只能是 unsafe fn from_raw(waker: RawWaker) -> Waker, 其他的则是 wake, will_wake wake相关的函数 etc (impl From<Arc<Wake(trait)>>)
  • std::task::RawWaker: 则是一个 data pointer & vtable(一系列的函数指针) 的struct。(因为在 嵌入式中不能使用 Trait 所以提供此类方式 实现 Trait Waker)
  • Context 结构目前 仅是Waker结构的 简单包装而已,并没有任何特殊用途

8.3. Future trait: https://aturon.github.io/blog/2016/08/11/futures/

  • fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
  • Task 实现了 Future Trait: Task 中的poll方法,应该在Task不能够继续进展时, 应该 reactor.regist(self) 关注 自己的 fd(将自身注册到 reactor上)
  • 所以 结构如下:
struct Task {
    id: usize,
    data: u64,
    reactor: Arc<Mutex<Box<Reactor>>>, //// 应该包含唯一的 reactor 指针
}

impl Future for Task {
    type Output = usize;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("poll ing----------");
        let mut r = self.reactor.lock().unwrap();

        if r.is_ready(self.id) {
            *r.tasks.get_mut(&self.id).unwrap() = TaskState::Finished;
            Poll::Ready(self.id)
        } else if r.tasks.contains_key(&self.id) {
            r.tasks
                .insert(self.id, TaskState::NotReady(cx.waker().clone()));
            Poll::Pending
        } else {
            // 与 reactor 的交互,  reactor 将 任务 状态插入到 tasks 中, 然后 send(msg) 到 另外一个 线程 (另一个 线程 接受到哦 msg 之后进行 计时,  之后 修改 TaskState, 然后 wake 运行线程)
            //  fn regist(&mut self, task_id: u16, waker: Waker, duration: u64) {
            //      if self.tasks.insert(task_id, TaskState::Wait(waker)).is_none() {
            //          self.dispatcher
            //              .send(Event::Timeout(task_id, duration))
            //              .unwrap();
            //      }
            //  }

             r.regist
                (self.data, cx.waker().clone(), self.id);
            Poll::Pending
        }
    }
}

8.4. Reactor: async 的核心, 类似于 epoll: 是链接 fd 与 Task的重要纽带,

  • 在简单的实现中: Reactor 实现的比较 复杂,保存着 Task的TaskState, TaskState是包含Waker的一个 enum, 在 fd可用时,能够 waker.wake()

8.5. Executor: 主要包含 Task Struct & TaskState, Queue , 如何 Handle Panic, & spawn Task

8.6. ? Arc:

  • 应该注意在 多线程 中, 对于 Arc的把持,可能会产生的“死锁” 问题, 即: 应该在 生命最短 的 thread中 把持 Arc::downgrade, 而非 Arc.clone()
  • 在 main.rs 代码中, 即: Reactor 不会在main thread 中进行释放,可能会在其他的thread 中进行,从而导致错误的代码逻辑 (因为 Thread 之间为兄弟关系,并非从属关系,所以 哪一个 wait 另一个 并没有太大的问题?)

8.7. rust 是如何将 async block 转换为 Task: Future 的poll调用的

  • Generator & Future 有非常多相似之处,但是 之前存在哪些关联, 则是比较隐晦、晦涩的。即: 简单的介绍了 Generator 代码生成后的 效果,即: Future 也是如此 (代码具有许多的关联之处)
  • 关键: Task 的poll 方法,是如何 通过 async {} 调用到的
// Generator 代码
fn main() {
  let a:i32 = 4;
  let mut gen = move || {
     yield a * 2; // 跟 (a*2).await 类似
  }

  if let GeneratorState::Yield(n) = gen.resume()  {
     // do something
  }
  if let GeneratorState::Compelete(()) = gen.resume() {
    // ....
  }
}
// 上述代码 被转换为 通用代码
enum GeneratorState<Y, R> {
  Yield(Y),
  Complete(R),
}

trait Generator {
  type Yield; type Return;

  fn resume(&mut self) -> GeneratorState<Yield, Return>;
}


enum GeneratorA {
  Enter(i32),
  Yield(i32),
  Exit,
}

impl GeneratorA {
  fn start(a: i32) {GeneratorA::Enter(a)}
}

impl Generator for GeneratorA {
  type Yield = i32;
  type Return = ();
  fn resume(&mut self) -> GeneratorState<Yield, Return> {
    match self {
      GeneratorA::Enter(a) => {
        // yield code, a = a * 2;
        a = a * 2;
        *self = GeneratorA::Yield(a);
        GeneratorState::Yield(a);
      }
      GeneratorA::Yield(_) => {
        *self = GeneratorA::Exist;
        GeneratorState::Complete(());
      }
      GeneratorA::Exist() => {
        panic("can't advance any more");
      }
   }
  }
}

8.8. 几个重要的 crate: crossbeam, async-std, tokio, 之间的关系, async-task

9. tokio: 全解析:

9.1. Tokio useage:

9.1.1. #[tokio:main]: 将 main函数 转换为 async 然后 自动 构建 let r = runtime::Builder::new(); r.block_on(整体的 func)

9.1.2. runtime.block_on(async future) 直接收 async func, 并 调用 exec.block_on(future) 以执行 future

9.1.3. .await 只能在 async function中 调用, 即: 1) await 只为 async fun 准备, 2) await 将 future 转换为 Generator 的状态机 3) await 顺序执行, 执行一次, loop 为 server 循环处理 client的 关键逻辑

9.1.4. block_on(future) 为 阻塞调用, 一直 poll 直到 future完成

9.1.5. spawn 则直接spawn task,交由 runtime去执行

9.1.6. ? tokio 如何实现的 sleep ? 为什么 Sleep 并没有实现 Future?

9.2. 在异步中使用 Mutex 并非一个好的主意, 无论是 tokio::sync::Mutex, or std Mutex 都是: 所以最好的方式,依然是 spawn task for all task use, and pass msg for connection

9.3. 在使用 HashMap<room, TcpStream> 实现的过程中, 存在了如下问题:

9.3.1. 1. std::net::TcpStream, 因为需要保持 socket 的多处ref & 可写, 导致 需要使用 RefCell, 而 RefCell 是 !Sync 导致, Mutex<RefCell<TcpStream>> 不能够 Send + Sync, 因为 Mutex 的签名 为: impl<T: ?Sized + Send> Send for Mutex<T>, impl<T: ?Sized + Send> Sync for Mutex<T>

9.3.2. 2. tokio::net::TcpStream 因为 是 Sync & Send, 所以实现了 如下的代码包装: 其中存在的问题为, 经常出现“死锁”情况. 并且 无法避免: 因为 一个read 完成之后,会进入另一个 循环,进行read, 并且 总是在read中, read 会把持 lock, 导致, public_msg 无法进行完全.

9.3.3. 3. 所以, 使用 Mutex 对 Fd 进行加锁的行为, 不可行.(因为无法避免 fd 在等待 read)

#[derive(Clone)]
struct SocketWrapper {
    inner: Arc<Mutex<TcpStream>>,
    eq: SocketAddr,
}
impl Deref for SocketWrapper {
    type Target = Arc<Mutex<TcpStream>>;
    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}
impl DerefMut for SocketWrapper {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.inner
    }
}

impl SocketWrapper {
    fn new(socket: Arc<Mutex<TcpStream>>, eq: SocketAddr) -> SocketWrapper {
        SocketWrapper { inner: socket, eq }
    }
}
impl PartialEq for SocketWrapper {
    fn eq(&self, other: &SocketWrapper) -> bool {
        self.eq.eq(&other.eq)
    }

    fn ne(&self, other: &SocketWrapper) -> bool {
        self.eq.ne(&other.eq)
    }
}
impl Hash for SocketWrapper {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.eq.hash(state);
    }
}

impl fmt::Debug for SocketWrapper {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "addr: {}", self.eq)
    }
}
impl Eq for SocketWrapper {}

let chat_room: Mutex<HashMap<String, HashSet<SocketWrapper>>> = Mutex::new(HashMap::new());
// 主体结构如下:
fn main() {
    let addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:8080".to_string());

    // Next up we create a TCP listener which will listen for incoming
    // connections. This TCP listener is bound to the address we determined
    // above and must be associated with an event loop.

    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    let chat_room: Mutex<HashMap<String, HashSet<SocketWrapper>>> = Mutex::new(HashMap::new());
    let chat_room = Arc::new(chat_room);

    runtime.block_on(async {
        let listener = TcpListener::bind(&addr).await.unwrap();
        // let mut db: Arc<Mutex<HashMap<String, HashSet<TcpStream>>>> =
        //     Arc::new(Mutex::new(HashMap::default()));
        loop {
            let (mut socket, addr) = listener.accept().await.unwrap();
            let peer_addr = socket.peer_addr().unwrap();
            let socket = Arc::new(Mutex::new(socket));
            let socket_fd = SocketWrapper::new(socket, peer_addr);
            // println!("filename: {:?}", socket.to_m.file_name_ref());
            // println!("socket: {:?}", socket.peer_addr());
            // let chat = chat_room.clone();
            // let socket = SocketWrapper::new(socket, socket.peer_addr());

            let i_chat = chat_room.clone();
            // let idb = db.clone();
            // let mut chat = db.clone();
            tokio::spawn(async move { socket_handler(socket_fd, i_chat).await });
        }
    })
}

async fn socket_handler(
    socket: SocketWrapper,
    db: Arc<Mutex<HashMap<String, HashSet<SocketWrapper>>>>,
) {
    let mut buf = vec![0; 1024];
    let mut n = 0;
    loop {
        {
            n = socket
                .lock()
                .await
                .read(&mut buf)
                .await
                .expect("failed to read data from socket");
        }

        if n == 0 {
            return;
        }
        let istr: String = String::from_utf8_lossy(&buf[0..n]).into_owned();
        // split by : , room, msg;
        let mut iter = istr.split(":");
        let room = iter.next().unwrap_or("global");
        let msg = iter.next();
        let i_db = db.clone();

        {
            let mut lock = i_db.lock().await;
            let mut val = lock.entry(room.to_owned()).or_insert(HashSet::new());
            val.insert(socket.clone());
        }
        println!(
            "istr: {:?}, room: {:?}, msg: {:?}, db: {:?}",
            istr, room, msg, db
        );

        {
            if let Some(i_msg) = msg {
                publish_msg(room, i_msg, db.clone()).await;
            }
        }

        {
            socket
                .lock()
                .await
                .write_all(&buf[0..n])
                .await
                .expect("failed to write data to socket");
        }
        n = 0;
    }
}
async fn publish_msg(
    room: &str,
    msg: &str,
    db: Arc<Mutex<HashMap<String, HashSet<SocketWrapper>>>>,
) {
    println!("publish_msg begin");
    if let Some(pepoles) = db.lock().await.get(room) {
        println!("****************************************************************************************************");
        for s in pepoles.iter() {
            s.lock().await.write(&msg.as_ref()).await;
        }
    }
    println!("publish_msg done");
}

9.4. mio:

9.4.1. mio::Poll : 屏蔽 各个平台实现的 EventLoop : (linux epoll)

9.4.2. 大概的使用方法为:

let poll = Poll::new()?;
let socket = TcpStream::connect(&"216.58.193.100:80".parse()?)?;
// the socket with `poll`, requesting readable
poll.register(&socket, Token(0), Ready::readable(), PollOpt::edge())?;


// Poll 结构比较重要:

pub struct Poll {
    // Platform specific IO selector
    selector: sys::Selector,

    // Custom readiness queue
    readiness_queue: ReadinessQueue,

    // Use an atomic to first check if a full lock will be required. This is a
    // fast-path check for single threaded cases avoiding the extra syscall
    lock_state: AtomicUsize,

    // Sequences concurrent calls to `Poll::poll`
    lock: Mutex<()>,

    // Wakeup the next waiter
    condvar: Condvar,
}

9.4.3. 经过层层调用, 最终 调用为 Selector.register()

9.4.4. Poll.poll(&mut events) 中的代码比较复杂: 需要重新看

9.4.5. Poll.register 方法比较 有意思: 这其中隐含着 rust的代码设计模式, 值得思考, 如何范化 参数限制

// 这里 使用了 generic template, 但是对E 做了impl的限制
pub fn register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>
    where E: Evented
{
    validate_args(token)?;

    /*
     * Undefined behavior:
     * - Reusing a token with a different `Evented` without deregistering
     * (or closing) the original `Evented`.
     */
    trace!("registering with poller");

    // Register interests for this socket
    // 这里 将 方法调用 转身 传递给了 handle 参数
    handle.register(self, token, interest, opts)?;

    Ok(())
}

9.4.6.

9.5. tokio: source code

9.5.1. Events: A collection of readiness events

/// Representation of a spawned future/stream.
///
/// This object is returned by the `spawn` function in this module. This
/// represents a "fused task and future", storing all necessary pieces of a task
/// and owning the top-level future that's being driven as well.
///
/// A `Spawn` can be poll'd for completion or execution of the current thread
/// can be blocked indefinitely until a notification arrives. This can be used
/// with either futures or streams, with different methods being available on
/// `Spawn` depending which is used.
pub struct Spawn<T: ?Sized> {
    id: usize,
    data: LocalMap,
    obj: T,
}


/// Pre-allocated storage for a uniform data type
///
/// See [module documentation] for more details.
///
/// [module documentation]: index.html
#[derive(Clone)]
pub struct Slab<T> {
    // Chunk of memory
    entries: Vec<Entry<T>>,

    // Number of Filled elements currently in the slab
    len: usize,

    // Offset of the next available slot in the slab. Set to the slab's
    // capacity when the slab is full.
    next: usize,
}

/// An event loop.
///
/// The event loop is the main source of blocking in an application which drives
/// all other I/O events and notifications happening. Each event loop can have
/// multiple handles pointing to it, each of which can then be used to create
/// various I/O objects to interact with the event loop in interesting ways.
// TODO: expand this
pub struct Core {
    events: mio::Events,
    tx: mpsc::UnboundedSender<Message>,
    rx: RefCell<Spawn<mpsc::UnboundedReceiver<Message>>>,
    _rx_registration: mio::Registration,
    rx_readiness: Arc<MySetReadiness>,

    inner: Rc<RefCell<Inner>>,

    // Used for determining when the future passed to `run` is ready. Once the
    // registration is passed to `io` above we never touch it again, just keep
    // it alive.
    _future_registration: mio::Registration,
    future_readiness: Arc<MySetReadiness>,
}


/// A collection of readiness events.
///
/// `Events` is passed as an argument to [`Poll::poll`] and will be used to
/// receive any new readiness events received since the last poll. Usually, a
/// single `Events` instance is created at the same time as a [`Poll`] and
/// reused on each call to [`Poll::poll`].
///
/// See [`Poll`] for more documentation on polling.
///
/// # Examples
///
/// ```
/// # use std::error::Error;
/// # fn try_main() -> Result<(), Box<Error>> {
/// use mio::{Events, Poll};
/// use std::time::Duration;
///
/// let mut events = Events::with_capacity(1024);
/// let poll = Poll::new()?;
///
/// assert_eq!(0, events.len());
///
/// // Register `Evented` handles with `poll`
///
/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
///
/// for event in &events {
///     println!("event={:?}", event);
/// }
/// #     Ok(())
/// # }
/// #
/// # fn main() {
/// #     try_main().unwrap();
/// # }
/// ```
///
/// [`Poll::poll`]: struct.Poll.html#method.poll
/// [`Poll`]: struct.Poll.html
pub struct Events {
    inner: sys::Events,
}

pub struct Events {
    sys_events: KeventList,
    events: Vec<Event>,
    event_map: HashMap<Token, usize>,
}

struct KeventList(Vec<libc::kevent>);

/// [`Token`]: ../struct.Token.html
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct Event {
    kind: Ready,
    token: Token
}

9.5.2. Runtime:

9.5.3. source code

    /// cpu-pool  代码 ----------------------------------
    /// A thread pool intended to run CPU intensive work.
    /// 使用 Thread Pool 执行 task
    pub struct CpuPool {
        inner: Arc<Inner>,
    }

    impl Builder {
        pub fn create(&mut self) -> CpuPool {
            let (tx, rx) = mpsc::channel();
            let pool = CpuPool {
                inner: Arc::new(Inner {
                    tx: Mutex::new(tx),
                    rx: Mutex::new(rx),
                    cnt: AtomicUsize::new(1),
                    size: self.pool_size,
                }),
            };
            assert!(self.pool_size > 0);

            for counter in 0..self.pool_size {
                let inner = pool.inner.clone();
                let after_start = self.after_start.clone();
                let before_stop = self.before_stop.clone();
                let mut thread_builder = thread::Builder::new();
                if let Some(ref name_prefix) = self.name_prefix {
                    thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter));
                }
                if self.stack_size > 0 {
                    thread_builder = thread_builder.stack_size(self.stack_size);
                }
                thread_builder.spawn(move || inner.work(after_start, before_stop)).unwrap();
            }
            return pool
        }
    }

    impl Inner {
        fn send(&self, msg: Message) {
            self.tx.lock().unwrap().send(msg).unwrap();
        }

        fn work(&self, after_start: Option<Arc<Fn() + Send + Sync>>, before_stop: Option<Arc<Fn() + Send + Sync>>) {
            after_start.map(|fun| fun());
            loop {
                let msg = self.rx.lock().unwrap().recv().unwrap();
                match msg {
                    Message::Run(r) => r.run(),
                    Message::Close => break,
                }
            }
            before_stop.map(|fun| fun());
        }
    }

    impl Run {
        /// Actually run the task (invoking `poll` on its future) on the current
        /// thread.
        #[allow(deprecated)]
        pub fn run(self) {
            let Run { mut spawn, inner } = self;

            // SAFETY: the ownership of this `Run` object is evidence that
            // we are in the `POLLING`/`REPOLL` state for the mutex.
            unsafe {
                inner.mutex.start_poll();

                loop {
                    let unpark: Arc<Unpark> = inner.clone();
                    match spawn.poll_future(unpark) { /// 这里 为 Future 执行的地方, spawn: Spawn<Box<MySender>> 
                        Ok(Async::NotReady) => {}
                        Ok(Async::Ready(())) |
                        Err(()) => return inner.mutex.complete(),
                    }
                    let run = Run { spawn: spawn, inner: inner.clone() };
                    match inner.mutex.wait(run) {
                        Ok(()) => return,            // we've waited
                        Err(r) => spawn = r.spawn,   // someone's notified us
                    }
                }
            }
        }
    }

    impl Executor for Inner {
        fn execute(&self, run: Run) {
            self.send(Message::Run(run))
        }
    }

    impl CpuPool {
        /// Panics if `size == 0`.
        pub fn new(size: usize) -> CpuPool {
            Builder::new().pool_size(size).create()
        }


        /// Spawns a closure on this thread pool.
        ///
        /// This function is a convenience wrapper around the `spawn` function above
        /// for running a closure wrapped in `future::lazy`. It will spawn the
        /// function `f` provided onto the thread pool, and continue to run the
        /// future returned by `f` on the thread pool as well.
        ///
        /// The returned future will be a handle to the result produced by the
        /// future that `f` returns.
        pub fn spawn_fn<F, R>(&self, f: F) -> CpuFuture<R::Item, R::Error>
        where F: FnOnce() -> R + Send + 'static,
              R: IntoFuture + 'static,
              R::Future: Send + 'static,
              R::Item: Send + 'static,
              R::Error: Send + 'static,
        {
            self.spawn(lazy(f))
        }

        pub fn spawn<F>(&self, f: F) -> CpuFuture<F::Item, F::Error>
        where F: Future + Send + 'static,
              F::Item: Send + 'static,
              F::Error: Send + 'static,
        {
            let (tx, rx) = channel(); ///创建 f:F 执行结果 传递通道
            let keep_running_flag = Arc::new(AtomicBool::new(false));
            // AssertUnwindSafe is used here because `Send + 'static` is basically
            // an alias for an implementation of the `UnwindSafe` trait but we can't
            // express that in the standard library right now.
            let sender = MySender { /// 主要的运算 载体, 其中包含 tx, 将结果发送的 sender, 包含 fut
                fut: AssertUnwindSafe(f).catch_unwind(),
                tx: Some(tx), /// 在runner 实体 中 保持 sender
                keep_running_flag: keep_running_flag.clone(),
            };
            executor::spawn(sender).execute(self.inner.clone());
            CpuFuture { inner: rx , keep_running_flag: keep_running_flag.clone() } /// 接受 f:F 的结果
        }
    }

    /// Future 执行的地方 
    impl<F: Future> Future for MySender<F, Result<F::Item, F::Error>> {
        type Item = ();
        type Error = ();

        fn poll(&mut self) -> Poll<(), ()> {
            if let Ok(Async::Ready(_)) = self.tx.as_mut().unwrap().poll_cancel() {
                if !self.keep_running_flag.load(Ordering::SeqCst) {
                    // Cancelled, bail out
                    return Ok(().into())
                }
            }

            let res = match self.fut.poll() {
                Ok(Async::Ready(e)) => Ok(e),
                Ok(Async::NotReady) => return Ok(Async::NotReady),
                Err(e) => Err(e),
            };

            // if the receiving end has gone away then that's ok, we just ignore the
            // send error here.
            drop(self.tx.take().unwrap().send(res));
            Ok(Async::Ready(()))
        }
    }

     /// spawn_fun  与 cpupool  之间的 消息结构,  
     enum Message {
         Run(Run),
         Close,
     }

     /// Units of work submitted to an `Executor`, currently only created
     /// internally.
     pub struct Run {
         spawn: Spawn<Box<Future<Item = (), Error = ()> + Send>>, /// 这里面 为 Spawn<Box<MySender>> 
         inner: Arc<RunInner>,
     }

      /// The type of future returned from the `CpuPool::spawn` function, which
      /// proxies the futures running on the thread pool.
      ///
      /// This future will resolve in the same way as the underlying future, and it
      /// will propagate panics.
      #[must_use]
      #[derive(Debug)]
      pub struct CpuFuture<T, E> {
          inner: Receiver<thread::Result<Result<T, E>>>,
          keep_running_flag: Arc<AtomicBool>,
      }

     impl<T: Send + 'static, E: Send + 'static> Future for CpuFuture<T, E> {
       type Item = T;
       type Error = E;

       fn poll(&mut self) -> Poll<T, E> {
           match self.inner.poll().expect("cannot poll CpuFuture twice") {
               Async::Ready(Ok(Ok(e))) => Ok(e.into()),
               Async::Ready(Ok(Err(e))) => Err(e),
               Async::Ready(Err(e)) => panic::resume_unwind(e),
               Async::NotReady => Ok(Async::NotReady),
           }
       }
    }


    /// future lib 库的 代码 -----------------------------------------------------
    /// Creates a new future which will eventually be the same as the one created
    /// by the closure provided.
    ///
    /// The provided closure is only run once the future has a callback scheduled
    /// on it, otherwise the callback never runs. Once run, however, this future is
    /// the same as the one the closure creates.
    ///
    /// # Examples
    ///
    /// ```
    /// use futures::future::*;
    ///
    /// let a = lazy(|| ok::<u32, u32>(1));
    ///
    /// let b = lazy(|| -> FutureResult<u32, u32> {
    ///     panic!("oh no!")
    /// });
    /// drop(b); // closure is never run
    /// ```
    pub fn lazy<F, R>(f: F) -> Lazy<F, R>
    where F: FnOnce() -> R,
          R: IntoFuture
    {
        Lazy {
            inner: _Lazy::First(f),
        }
    }

    /// A future which defers creation of the actual future until a callback is
    /// scheduled.
    ///
    /// This is created by the `lazy` function.
    #[derive(Debug)]
        #[must_use = "futures do nothing unless polled"]
    pub struct Lazy<F, R: IntoFuture> {
        inner: _Lazy<F, R::Future>,
    }

    #[derive(Debug)]
    enum _Lazy<F, R> {
        First(F),
        Second(R),
        Moved,
    }

  // futures-0.1.17/src/task_impl/mod.rs
  pub fn spawn<T>(obj: T) -> Spawn<T> {
      Spawn {
          id: fresh_task_id(),
          obj: obj,
          data: local_map(),
      }
  }


impl<F: Future> Spawn<F> {
    /// Polls the internal future, scheduling notifications to be sent to the
    /// `unpark` argument.
    ///
    /// This method will poll the internal future, testing if it's completed
    /// yet. The `unpark` argument is used as a sink for notifications sent to
    /// this future. That is, while the future is being polled, any call to
    /// `task::park()` will return a handle that contains the `unpark`
    /// specified.
    ///
    /// If this function returns `NotReady`, then the `unpark` should have been
    /// scheduled to receive a notification when poll can be called again.
    /// Otherwise if `Ready` or `Err` is returned, the `Spawn` task can be
    /// safely destroyed.
    #[deprecated(note = "recommended to use `poll_future_notify` instead")]
    #[allow(deprecated)]
    pub fn poll_future(&mut self, unpark: Arc<Unpark>) -> Poll<F::Item, F::Error> {
        self.enter(BorrowedUnpark::Old(&unpark), |f| f.poll())
    }

    /// Waits for the internal future to complete, blocking this thread's
    /// execution until it does.
    ///
    /// This function will call `poll_future` in a loop, waiting for the future
    /// to complete. When a future cannot make progress it will use
    /// `thread::park` to block the current thread.
    pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
        ThreadNotify::with_current(|notify| {

            loop {
                match self.poll_future_notify(notify, 0)? {
                    Async::NotReady => notify.park(),
                    Async::Ready(e) => return Ok(e),
                }
            }
        })
    }

    /// A specialized function to request running a future to completion on the
    /// specified executor.
    ///
    /// This function only works for futures whose item and error types are `()`
    /// and also implement the `Send` and `'static` bounds. This will submit
    /// units of work (instances of `Run`) to the `exec` argument provided
    /// necessary to drive the future to completion.
    ///
    /// When the future would block, it's arranged that when the future is again
    /// ready it will submit another unit of work to the `exec` provided. This
    /// will happen in a loop until the future has completed.
    ///
    /// This method is not appropriate for all futures, and other kinds of
    /// executors typically provide a similar function with perhaps relaxed
    /// bounds as well.
    ///
    /// Note that this method is likely to be deprecated in favor of the
    /// `futures::Executor` trait and `execute` method, but if this'd cause
    /// difficulty for you please let us know!
    pub fn execute(self, exec: Arc<Executor>)
        where F: Future<Item=(), Error=()> + Send + 'static,
    {
        exec.clone().execute(Run { /// 这里最终将 调用  Inner impl Executor 测 方法, 即tx.send
            // Ideally this method would be defined directly on
            // `Spawn<BoxFuture<(), ()>>` so we wouldn't have to box here and
            // it'd be more explicit, but unfortunately that currently has a
            // link error on nightly: rust-lang/rust#36155
            spawn: spawn(Box::new(self.into_inner())),
            inner: Arc::new(RunInner {
                exec: exec,
                mutex: UnparkMutex::new()
            }),
        })
    }
}

9.5.4. CpuPool 的使用方法为:

  1. 1. CpuPool::new(cpu_cores), 在这里即 spawn了 thread, 来进行 receive: Message::Run(r) => r.run(),
  2. 2. let fut = ENV.cpu_pool.spawn_fn(move || { }); Box::new(fut)
    1. 这里面 CpuPool.spawn_fn 将 block wrapper 成一个 Run, 然后使用 Exector trait的方法, execute() 方法, 将 Run 提交给 tx.send(runner)
    2. 如何 将 Run的结果,返回给 主线程, 在 cpu_pool.spawn_fn(|| {}) 之后, 会返回 一个 CpuFuture { rx: rx, keep_running_flag: Arc<AtomicBool>} , rx 为 spawn 中 let (tx, rx) = channel();
  3. 一些问题:
    1. 1. mpsc::channel 为 多生产者 一个 消费者 模式, 所以 在多个 Thread 进行 消费 Future的时候, 需要 self.rx.lock().unwrap().recv() 进行 lock的争用
    2. 2. 重要的关键点, 即: regist 以及, run。 wait 在 这里
  4. ? 问题未了解:
    1. 1. 在返回 NotReady 时候,会发生什么? 如何 进行的注册?

10. Mutex & Condvar

**

struct InnerPool {
    queue: BinaryHeap<Job>,
    shutdown: bool,
}

struct SharedPool {
    inner: Mutex<InnerPool>,
    cvar: Condvar,
}

11. r2d2: 链接池管理工具

11.0.1. 1. Pool: pub struct Pool<M: ManageConnection>(Arc<SharedPool<M>>);


  1.    pub struct Pool<M: ManageConnection>(Arc<SharedPool<M>>); /// 使用Arc 共享 Pool
    
        struct Conn<C> {
            conn: C,
            birth: Instant,
        }
    
        struct IdleConn<C> {
            conn: Conn<C>,
            idle_start: Instant,
        }
    
        struct PoolInternals<C> {
            conns: VecDeque<IdleConn<C>>,
            num_conns: u32,
            pending_conns: u32,
            last_error: Option<String>,
        }
    
        struct SharedPool<M>
        where
            M: ManageConnection,
        {
            config: Config<M::Connection, M::Error>, /// 配置信息   指向Config:
            manager: M,
            internals: Mutex<PoolInternals<M::Connection>>, /// 具体的 connection 链接, 这里面的 Mutex 为 可重入的 Mutex。(即 已经获得lock的在acquire 一次 依然可以)
            cond: Condvar,
        }
    
        pub struct Config<C, E> { /// max_size, min_idle 等 配置信息
            pub max_size: u32,
            pub min_idle: Option<u32>,
            pub test_on_check_out: bool,
            pub max_lifetime: Option<Duration>,
            pub idle_timeout: Option<Duration>,
            pub connection_timeout: Duration,
            pub error_handler: Box<HandleError<E>>,
            pub connection_customizer: Box<CustomizeConnection<C, E>>,
            pub thread_pool: Arc<ScheduledThreadPool>, /// 在 add_connection 方法中使用, 使用 thread_pool.execute_after(|| { create_connection();  internals.conns.push_back(conn);})
        }
    
      pub trait ManageConnection: Send + Sync + 'static { /// 实现 connect 方法, 即可,比如Mysql, Redis 实现自己的 连接
          /// The connection type this manager deals with.
          type Connection: Send + 'static;
    
          /// The error type returned by `Connection`s.
          type Error: error::Error + 'static;
    
    
        /// Attempts to create a new connection.
        fn connect(&self) -> Result<Self::Connection, Self::Error>;
    
        /// Determines if the connection is still connected to the database.
        ///
        /// A standard implementation would check if a simple query like `SELECT 1`
        /// succeeds.
        fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>;
    
        /// *Quickly* determines if the connection is no longer usable.
        ///
        /// This will be called synchronously every time a connection is returned
        /// to the pool, so it should *not* block. If it returns `true`, the
        /// connection will be discarded.
        ///
        /// For example, an implementation might check if the underlying TCP socket
        /// has disconnected. Implementations that do not support this kind of
        /// fast health check may simply return `false`.
        fn has_broken(&self, conn: &mut Self::Connection) -> bool;
    }
    

11.0.2. Pool 的适用方法 pool.get() -> Result<PooledConnection<M>, Error>

**

pub struct Pool<M>(Arc<SharedPool<M>>)
where
    M: ManageConnection;


pub struct PooledConnection<M> 
/// 这里简单的 impl Drop + Deref + DerefMut 其中 Drop 将 conn 归还, 
/// Deref 将 conn 暴露, 可以作为wrapper 的内容使用 
where
    M: ManageConnection,
{
    pool: Pool<M>,
    checkout: Instant,
    conn: Option<Conn<M::Connection>>,
}

impl<M> Drop for PooledConnection<M>
where
    M: ManageConnection,
{
    fn drop(&mut self) {
        self.pool.put_back(self.checkout, self.conn.take().unwrap());
    }
}
impl<M> Deref for PooledConnection<M>
where
    M: ManageConnection,
{
    type Target = M::Connection;

    fn deref(&self) -> &M::Connection {
        &self.conn.as_ref().unwrap().conn
    }
}


impl<M> Pool<M>
where
    M: ManageConnection,
{

    /// Retrieves a connection from the pool.
    ///
    /// Waits for at most the configured connection timeout before returning an
    /// error.
    pub fn get(&self) -> Result<PooledConnection<M>, Error> { /// 这里为返回的class 类型, 如上
        self.get_timeout(self.0.config.connection_timeout)
    }

    /// Retrieves a connection from the pool, waiting for at most `timeout`
    ///
    /// The given timeout will be used instead of the configured connection
    /// timeout.
    pub fn get_timeout(&self, timeout: Duration) -> Result<PooledConnection<M>, Error> {
        let start = Instant::now();
        let end = start + timeout;
        let mut internals = self.0.internals.lock(); /// SharedPool中的 internals 为 可重入锁

        loop {
            match self.try_get_inner(internals) { /// 来到下面代码
                Ok(conn) => {
                    let event = CheckoutEvent {
                        id: conn.conn.as_ref().unwrap().id,
                        duration: start.elapsed(),
                    };
                    self.0.config.event_handler.handle_checkout(event);
                    return Ok(conn);
                }
                Err(i) => internals = i,
            }

            add_connection(&self.0, &mut internals); 
           ///只有在 try_get_inner(internals) 失败的时候,才会到这里
           /// 即: 没有拿到Conn, 这里才是 比较重要的地方 !!!


            if self.0.cond.wait_until(&mut internals, end).timed_out() {
                let event = TimeoutEvent { timeout };
                self.0.config.event_handler.handle_timeout(event);

                return Err(Error(internals.last_error.take()));
            }
        }
    }

    fn try_get_inner<'a>(
        &'a self,
        mut internals: MutexGuard<'a, PoolInternals<M::Connection>>,
    ) -> Result<PooledConnection<M>, MutexGuard<'a, PoolInternals<M::Connection>>> {
        loop { /// 没啥太大用处
            if let Some(mut conn) = internals.conns.pop() { ///直接从  sharedPool 中 pop出 Conn来
                establish_idle_connections(&self.0, &mut internals);
                drop(internals);

                if self.0.config.test_on_check_out {
                    if let Err(e) = self.0.manager.is_valid(&mut conn.conn.conn) {
                        let msg = e.to_string();
                        self.0.config.error_handler.handle_error(e);
                        // FIXME we shouldn't have to lock, unlock, and relock here
                        internals = self.0.internals.lock();
                        internals.last_error = Some(msg);
                        drop_conns(&self.0, internals, vec![conn.conn]);
                        internals = self.0.internals.lock();
                        continue;
                    }
                }

                return Ok(PooledConnection {
                    pool: self.clone(),
                    checkout: Instant::now(),
                    conn: Some(conn.conn),
                });
            } else {
                return Err(internals);
            }
        }
    }

}

///使用到 ThreadPool的地方!!!
fn add_connection<M>(shared: &Arc<SharedPool<M>>, internals: &mut PoolInternals<M::Connection>)
where
    M: ManageConnection,
{
    /// 超过  max_size  时, 直接返回, 不再 建立连接
    if internals.num_conns + internals.pending_conns >= shared.config.max_size {
        return;
    }

    internals.pending_conns += 1;
    /// 
    inner(Duration::from_secs(0), shared);

    fn inner<M>(delay: Duration, shared: &Arc<SharedPool<M>>)
    where
        M: ManageConnection,
    {
        let new_shared = Arc::downgrade(shared);
        shared.config.thread_pool.execute_after(delay, move || { /// thread_pool 执行  创建链接任务
            let shared = match new_shared.upgrade() {
                Some(shared) => shared,
                None => return,
            };

            let conn = shared.manager.connect().and_then(|mut conn| { /// 创建链接 需要使用 Manager
                shared
                    .config
                    .connection_customizer
                    .on_acquire(&mut conn)
                    .map(|_| conn)
            });
            match conn {
                Ok(conn) => {
                    let id = CONNECTION_ID.fetch_add(1, Ordering::Relaxed) as u64;

                    let event = AcquireEvent { id };
                    shared.config.event_handler.handle_acquire(event);

                    let mut internals = shared.internals.lock();
                    internals.last_error = None;
                    let now = Instant::now();
                    let conn = IdleConn {
                        conn: Conn {
                            conn,
                            extensions: Extensions::new(),
                            birth: now,
                            id,
                        },
                        idle_start: now,
                    };
                    internals.conns.push(conn); /// 将链接 放到 conns 中, 即: 链接池: 
                    internals.pending_conns -= 1;
                    internals.num_conns += 1;
                    shared.cond.notify_one();
                }
                Err(err) => {
                    shared.internals.lock().last_error = Some(err.to_string());
                    shared.config.error_handler.handle_error(err);
                    let delay = cmp::max(Duration::from_millis(200), delay);
                    let delay = cmp::min(shared.config.connection_timeout / 2, delay * 2);
                    inner(delay, &shared); /// 发生失败 的时候, 重试
                }
            }
        });
    }
}

12. daom_rs 项目中 ENV.spawn_fn 如何与 gate core进行同步的, 应该说是: 没有,因为 futures-cpupool 只会讲 spawn_fun中的block进行封装,然后 执行, 根 core 完全没关系了

13. tokio:

13.1. SourceCode:

let mut runtime: tokio::runtime::Runtime = tokio::runtime::Runtime::new().unwrap();
impl Runtime {
    pub fn new() -> std::io::Result<Runtime> {
        Builder::new_multi_thread().enable_all().build()
    }
}

// tokio::runtime::Builder
impl Builder {
    pub fn new_multi_thread() -> Buil|'
        der {
        // The number `61` is fairly arbitrary. I believe this value was copied from golang.
        Builder::new(Kind::MultiThread, 61, 61)
    }

    pub fn build(&mut self) -> io::Result<Runtime> {
        match &self.kind {
            Kind::CurrentThread => self.build_current_thread_runtime(),
            #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
            Kind::MultiThread => self.build_threaded_runtime(),
        }
    }

    /// Build::new_mutli_threadpool().build()
    fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
        use crate::loom::sys::num_cpus;
        use crate::runtime::{Config, runtime::Scheduler};
        use crate::runtime::scheduler::{self, MultiThread};

        let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

        /// 重要, IO/Timer (事件驱动的源泉) 其中 driver: Driver
        /// driver_handle: [[file+emacs:/Users/shaohua.li/.cargo/registry/src/mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd/tokio-1.28.0/src/runtime/driver.rs::18][Handle]]
        let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

        /// Create the blocking pool
        let blocking_pool =
            blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
        let blocking_spawner = blocking_pool.spawner().clone();

        /// Generate a rng seed for this runtime.
        let seed_generator_1 = self.seed_generator.next_generator();
        let seed_generator_2 = self.seed_generator.next_generator();

        /// 这里比较重要, 调用了 worker.rs#create
        let (scheduler, handle, launch) = MultiThread::new(
            core_threads,
            driver,
            driver_handle,
            blocking_spawner,
            seed_generator_2,
            Config {
                before_park: self.before_park.clone(),
                after_unpark: self.after_unpark.clone(),
                global_queue_interval: self.global_queue_interval,
                event_interval: self.event_interval,
                #[cfg(tokio_unstable)]
                unhandled_panic: self.unhandled_panic.clone(),
                disable_lifo_slot: self.disable_lifo_slot,
                seed_generator: seed_generator_1,
            },
        );

        let handle = Handle { inner: scheduler::Handle::MultiThread(handle) };

        /// Spawn the thread pool workers
        let _enter = handle.enter();
        launch.launch();

        Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
    }

}

/// Blocking Pool struct
pub(crate) struct BlockingPool {
    spawner: Spawner,
    shutdown_rx: shutdown::Receiver,
}

impl BlockingPool {
    pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
        let (shutdown_tx, shutdown_rx) = shutdown::channel();
        let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE);

        BlockingPool {
            spawner: Spawner {
                inner: Arc::new(Inner {
                    shared: Mutex::new(Shared {
                        queue: VecDeque::new(),
                        num_notify: 0,
                        shutdown: false,
                        shutdown_tx: Some(shutdown_tx),
                        last_exiting_thread: None,
                        worker_threads: HashMap::new(),
                        worker_thread_index: 0,
                    }),
                    condvar: Condvar::new(),
                    thread_name: builder.thread_name.clone(),
                    stack_size: builder.thread_stack_size,
                    after_start: builder.after_start.clone(),
                    before_stop: builder.before_stop.clone(),
                    thread_cap,
                    keep_alive,
                    metrics: Default::default(),
                }),
            },
            shutdown_rx,
        }
    }
}


13.2. worker::create, workers 的数量 与 Runtime::worker_threads 的数量 相同

13.2.1. 1. queue::local 分配 queue 空间: local

13.2.2. 2. 组装 cores: Array<Core>, 其结构中:run_queue 为接受 Task的 队列, 比较重要

13.2.3. 3. 构建 了 1 个 Handle {} 为 n 个 workers 共享

13.2.4. 4. 组装为: pub(crate) struct Launch(Vec<Arc<Worker>>);

13.2.5. ? MutlThread::Handle 类型的作用是?

* [[file:///Users/shaohua.li.cargo/registry/src/mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd/tokio-1.28.0/src/runtime/scheduler/multi_thread/worker.rs:222][let handle]] = Arc::new(Handle { shared:Shared {}} / // Handle to the multi thread scheduler

13.2.6. runtime::spawn_blocking(move run(worker))

CURRENT.set(&cx, || {
    // This should always be an error. It only returns a `Result` to support
    // using `?` to short circuit.
    assert!(cx.run(core).is_err());

    // Check if there are any deferred tasks to notify. This can happen when
    // the worker core is lost due to `block_in_place()` being called from
    // within the task.
    wake_deferred_tasks();
});

13.2.7. cx.run 最终 在一个while 循环中 进行

13.3. lanch.launch thread::spawn 的时候,创建 thread 与 worker关联的地方

13.3.1. Launch 结构为:

  pub(crate) struct Launch(Vec<Arc<Worker>>);

  pub(crate) fn launch(mut self) {
      for worker in self.0.drain(..) {
          runtime::spawn_blocking(move || run(worker));
      }
  }

  /// runtime::Handle
  pub struct Handle {
      pub(crate) inner: scheduler::Handle,
  }

  /// scheduler::Handle
  #[derive(Debug, Clone)]
  pub(crate) enum Handle {
      #[cfg(feature = "rt")]
      CurrentThread(Arc<current_thread::Handle>),

      #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
      MultiThread(Arc<multi_thread::Handle>),

      // TODO: This is to avoid triggering "dead code" warnings many other places
      // in the codebase. Remove this during a later cleanup
      #[cfg(not(feature = "rt"))]
      #[allow(dead_code)]
      Disabled,
  }

  /// multi_thread::Handle
  /// Handle to the multi thread scheduler
  pub(crate) struct Handle {
      /// Task spawner
      pub(super) shared: worker::Shared,

      /// Resource driver handles
      pub(crate) driver: driver::Handle,

      /// Blocking pool spawner
      pub(crate) blocking_spawner: blocking::Spawner,

      /// Current random number generator seed
      pub(crate) seed_generator: RngSeedGenerator,
  }

    /// runtime::spawn_blocking
    pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
    where
        F: FnOnce() -> R + Send + 'static,
        R: Send + 'static,
    {
        let rt = Handle::current(); /// 上面 runtime::Handle
        rt.spawn_blocking(func) /// runtime#Handle -> rt.spawn_blocking(func)
    }

    /// runtime#Handle.spawn_blocking
    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
    where
        F: FnOnce() -> R + Send + 'static,
        R: Send + 'static,
    {
        self.inner.blocking_spawner().spawn_blocking(self, func)
    }

    /// 所以这里的返回的为 Handle#blocking_spawner: blocking::Spawner
    pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
        match self {
            Handle::CurrentThread(h) => &h.blocking_spawner,

            #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
            Handle::MultiThread(h) => &h.blocking_spawner,
        }
    }

  impl Spawner {
      #[track_caller]
      pub(crate) fn spawn_blocking<F, R>(&self, rt: &Handle, func: F) -> JoinHandle<R>
      where
          F: FnOnce() -> R + Send + 'static,
          R: Send + 'static,
      {
          /// 重要的logic, 创建进程的 地方
          let (join_handle, spawn_result) =
              if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 {
                  self.spawn_blocking_inner(Box::new(func), Mandatory::NonMandatory, None, rt)
              } else {
                  self.spawn_blocking_inner(func, Mandatory::NonMandatory, None, rt)
              };

          match spawn_result {
              Ok(()) => join_handle,
              // Compat: do not panic here, return the join_handle even though it will never resolve
              Err(SpawnError::ShuttingDown) => join_handle,
              Err(SpawnError::NoThreads(e)) => {
                  panic!("OS can't spawn worker thread: {}", e)
              }
          }
      }
  }

    pub(crate) fn spawn_blocking_inner<F, R>(
        &self,
        func: F,
        is_mandatory: Mandatory,
        name: Option<&str>,
        rt: &Handle,
    ) -> (JoinHandle<R>, Result<(), SpawnError>)
    where
        F: FnOnce() -> R + Send + 'static,
        R: Send + 'static,
    {
        let fut = BlockingTask::new(func);
        let id = task::Id::next();
        /// ...
        #[cfg(not(all(tokio_unstable, feature = "tracing")))]
        let _ = name;

        let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id); /// 这里将 func 封装为 Task

        let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
        (handle, spawned)
    }

    fn spawn_task(&self, task: Task, rt: &Handle) -> Result<(), SpawnError> {
        let mut shared = self.inner.shared.lock();

        shared.queue.push_back(task); /// 这里将 task 放入到 shared.queue
        self.inner.metrics.inc_queue_depth();

        if self.inner.metrics.num_idle_threads() == 0 {
            // No threads are able to process the task.
            if self.inner.metrics.num_threads() == self.inner.thread_cap {
                // At max number of threads
            } else {
                assert!(shared.shutdown_tx.is_some());
                let shutdown_tx = shared.shutdown_tx.clone();

                if let Some(shutdown_tx) = shutdown_tx {
                    let id = shared.worker_thread_index;

                    /// spawn_thread
                    match self.spawn_thread(shutdown_tx, rt, id) {
                        Ok(handle) => {
                            self.inner.metrics.inc_num_threads();
                            shared.worker_thread_index += 1;
                            shared.worker_threads.insert(id, handle);
                        }
                        Err(ref e)
                            if is_temporary_os_thread_error(e)
                                && self.inner.metrics.num_threads() > 0 =>
                        {
                            // OS temporarily failed to spawn a new thread.
                            // The task will be picked up eventually by a currently
                            // busy thread.
                        }
                        Err(e) => {
                            // The OS refused to spawn the thread and there is no thread
                            // to pick up the task that has just been pushed to the queue.
                            return Err(SpawnError::NoThreads(e));
                        }
                    }
                }
            }
        } else {
            // Notify an idle worker thread. The notification counter
            // is used to count the needed amount of notifications
            // exactly. Thread libraries may generate spurious
            // wakeups, this counter is used to keep us in a
            // consistent state.
            self.inner.metrics.dec_num_idle_threads();
            shared.num_notify += 1;
            self.inner.condvar.notify_one();
        }

        Ok(())
    }


    /// 产生 thread 的地方
    fn spawn_thread(
        &self,
        shutdown_tx: shutdown::Sender,
        rt: &Handle,
        id: usize,
    ) -> std::io::Result<thread::JoinHandle<()>> {
        let mut builder = thread::Builder::new().name((self.inner.thread_name)());

        if let Some(stack_size) = self.inner.stack_size {
            builder = builder.stack_size(stack_size);
        }

        let rt = rt.clone(); /// rt为 runtime#Handle

        builder.spawn(move || {
            // Only the reference should be moved into the closure
            let _enter = rt.enter();
            rt.inner.blocking_spawner().inner.run(id); /// 下面为 主要的逻辑
            drop(shutdown_tx);
        })
    }

impl Inner {
    fn run(&self, worker_thread_id: usize) {
        if let Some(f) = &self.after_start {
            f()
        }

        let mut shared = self.shared.lock();
        let mut join_on_thread = None;

        'main: loop {
            /// BUSY  这里为 执行 run(worker) 转换为 Task 之后 task.run()
            while let Some(task) = shared.queue.pop_front() {
                self.metrics.dec_queue_depth();
                drop(shared);
                task.run(); /// run(worker)

                shared = self.shared.lock();
            }

            // IDLE
            self.metrics.inc_num_idle_threads();

            while !shared.shutdown {
                let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();

                shared = lock_result.0;
                let timeout_result = lock_result.1;

                if shared.num_notify != 0 {
                    // We have received a legitimate wakeup,
                    // acknowledge it by decrementing the counter
                    // and transition to the BUSY state.
                    shared.num_notify -= 1;
                    break;
                }

                // Even if the condvar "timed out", if the pool is entering the
                // shutdown phase, we want to perform the cleanup logic.
                if !shared.shutdown && timeout_result.timed_out() {
                    // We'll join the prior timed-out thread's JoinHandle after dropping the lock.
                    // This isn't done when shutting down, because the thread calling shutdown will
                    // handle joining everything.
                    let my_handle = shared.worker_threads.remove(&worker_thread_id);
                    join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);

                    break 'main;
                }

                // Spurious wakeup detected, go back to sleep.
            }

            if shared.shutdown {
                // Drain the queue
                while let Some(task) = shared.queue.pop_front() {
                    self.metrics.dec_queue_depth();
                    drop(shared);

                    task.shutdown_or_run_if_mandatory();

                    shared = self.shared.lock();
                }

                // Work was produced, and we "took" it (by decrementing num_notify).
                // This means that num_idle was decremented once for our wakeup.
                // But, since we are exiting, we need to "undo" that, as we'll stay idle.
                self.metrics.inc_num_idle_threads();
                // NOTE: Technically we should also do num_notify++ and notify again,
                // but since we're shutting down anyway, that won't be necessary.
                break;
            }
        }

        // Thread exit
        self.metrics.dec_num_threads();

        // num_idle should now be tracked exactly, panic
        // with a descriptive message if it is not the
        // case.
        let prev_idle = self.metrics.dec_num_idle_threads();
        if prev_idle < self.metrics.num_idle_threads() {
            panic!("num_idle_threads underflowed on thread exit")
        }

        if shared.shutdown && self.metrics.num_threads() == 0 {
            self.condvar.notify_one();
        }

        drop(shared);

        if let Some(f) = &self.before_stop {
            f()
        }

        if let Some(handle) = join_on_thread {
            let _ = handle.join();
        }
    }
}

13.4. 在 runtime::spawn_blocking(move || run(worker)); 中 的几个关键点:

13.4.1. block {run(worker)} => fut 的转换

      pub(crate) fn spawn_blocking_inner<F, R>(
          &self,
          func: F,
          is_mandatory: Mandatory,
          name: Option<&str>,
          rt: &Handle,
      ) -> (JoinHandle<R>, Result<(), SpawnError>)
      where
          F: FnOnce() -> R + Send + 'static,
          R: Send + 'static,
      {
          /// 将 func 封装为 BlockingTask
          let fut = BlockingTask::new(func);
          let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id); /// 所以 这里最终集那个 func 封装为 task, 

          let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
      }

  pub(crate) struct BlockingSchedule {
      #[cfg(feature = "test-util")]
      handle: Handle,
  }

  /// Converts a function to a future that completes on poll.
  pub(crate) struct BlockingTask<T> {
      func: Option<T>,
  }

  impl<T> BlockingTask<T> {
      /// Initializes a new blocking task from the given function.
      pub(crate) fn new(func: T) -> BlockingTask<T> {
          BlockingTask { func: Some(func) }
      }
  }


impl<T, R> Future for BlockingTask<T>
where
    T: FnOnce() -> R + Send + 'static,
    R: Send + 'static,
{
    type Output = R;

    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<R> {
        let me = &mut *self;
        let func = me
            .func
            .take()
            .expect("[internal exception] blocking task ran twice."); /// 将func take出来

        // This is a little subtle:
        // For convenience, we'd like _every_ call tokio ever makes to Task::poll() to be budgeted
        // using coop. However, the way things are currently modeled, even running a blocking task
        // currently goes through Task::poll(), and so is subject to budgeting. That isn't really
        // what we want; a blocking task may itself want to run tasks (it might be a Worker!), so
        // we want it to start without any budgeting.
        crate::runtime::coop::stop();

        Poll::Ready(func()) /// func()  执行 block, run(worker)
    }
}

13.5. Blocking_pool: blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);

13.5.1. 结构:

pub(crate) struct BlockingPool {
    spawner: Spawner,
    shutdown_rx: shutdown::Receiver,
}

pub(crate) struct Spawner {
    inner: Arc<Inner>,
}
struct Inner {
    /// State shared between worker threads.
    shared: Mutex<Shared>,

    /// Pool threads wait on this.
    condvar: Condvar,

    /// Spawned threads use this name.
    thread_name: ThreadNameFn,

    /// Spawned thread stack size.
    stack_size: Option<usize>,

    /// Call after a thread starts.
    after_start: Option<Callback>,

    /// Call before a thread stops.
    before_stop: Option<Callback>,

    // Maximum number of threads.
    thread_cap: usize,

    // Customizable wait timeout.
    keep_alive: Duration,

    // Metrics about the pool.
    metrics: SpawnerMetrics,
}

13.5.2. 调用:

/// runtime/builder.rs
let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
let blocking_pool =
   blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); /// 所以这里的数值为:  max_blocking_threads (512) + core_threads (16)


///runtime/blocking/mod.rs
  pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
    BlockingPool::new(builder, thread_cap)
}

/// runtime/blocking/pool.rs

impl BlockingPool {
    pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
        let (shutdown_tx, shutdown_rx) = shutdown::channel();
        let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE);

        BlockingPool {
            spawner: Spawner {
                inner: Arc::new(Inner {
                    shared: Mutex::new(Shared {
                        queue: VecDeque::new(),
                        num_notify: 0,
                        shutdown: false,
                        shutdown_tx: Some(shutdown_tx),
                        last_exiting_thread: None,
                        worker_threads: HashMap::new(),
                        worker_thread_index: 0,
                    }),
                    condvar: Condvar::new(),
                    thread_name: builder.thread_name.clone(), /// 下面这些基本同步 builder 的配置
                    stack_size: builder.thread_stack_size,
                    after_start: builder.after_start.clone(),
                    before_stop: builder.before_stop.clone(),
                    thread_cap, /// 
                    keep_alive,
                    metrics: Default::default(),
                }),
            },
            shutdown_rx,
        }
    }
}

13.6. Worker#run

fn run(worker: Arc<Worker>) {
    struct AbortOnPanic;

    impl Drop for AbortOnPanic {
        fn drop(&mut self) {
            if std::thread::panicking() {
                eprintln!("worker thread panicking; aborting process");
                std::process::abort();
            }
        }
    }

    // Catching panics on worker threads in tests is quite tricky. Instead, when
    // debug assertions are enabled, we just abort the process.
    #[cfg(debug_assertions)]
    let _abort_on_panic = AbortOnPanic;

    // Acquire a core. If this fails, then another thread is running this
    // worker and there is nothing further to do.
    let core = match worker.core.take() {
        Some(core) => core,
        None => return,
    };

    let handle = scheduler::Handle::MultiThread(worker.handle.clone());
    let _enter = crate::runtime::context::enter_runtime(&handle, true);

    // Set the worker context.
    let cx = Context { /// ? Context 的作用是? 不应该是 每个Task中一个 Context吗?
        worker,
        core: RefCell::new(None),
    };

    CURRENT.set(&cx, || {
        // This should always be an error. It only returns a `Result` to support
        // using `?` to short circuit.
        assert!(cx.run(core).is_err()); /// cx.run 调用 最终worker逻辑

        // Check if there are any deferred tasks to notify. This can happen when
        // the worker core is lost due to `block_in_place()` being called from
        // within the task.
        wake_deferred_tasks();
    });
}


impl Context {
    fn run(&self, mut core: Box<Core>) -> RunResult {
        while !core.is_shutdown { /// 死循环
            // Increment the tick
            core.tick();

            /// Run maintenance, if needed
            core = self.maintenance(core);

            /// First, check work available to the current worker.
            if let Some(task) = core.next_task(&self.worker) { /// worker 获取任务  这里面 需要 注意 next_task 的返回 并非Task, 而是Notified, Notified 类型是一个 非常 特殊的类型: 
                core = self.run_task(task, core)?;
                continue;
            }

            /// There is no more **local** work to process, try to steal work
            /// from other workers.
            if let Some(task) = core.steal_work(&self.worker) {
                core = self.run_task(task, core)?;
            } else {
                // Wait for work
                core = if did_defer_tasks() {
                    self.park_timeout(core, Some(Duration::from_millis(0)))
                } else {
                    self.park(core)
                };
            }
        }

        core.pre_shutdown(&self.worker);

        // Signal shutdown
        self.worker.handle.shutdown_core(core);
        Err(())
    }

    fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
        let task = self.worker.handle.shared.owned.assert_owner(task);

        // Make sure the worker is not in the **searching** state. This enables
        // another idle worker to try to steal work.
        core.transition_from_searching(&self.worker);

        // Make the core available to the runtime context
        core.metrics.incr_poll_count();
        *self.core.borrow_mut() = Some(core);

        /// Run the task
        coop::budget(|| {
            task.run(); /// task的上下文 切换在哪里?

            // As long as there is budget remaining and a task exists in the
            // `lifo_slot`, then keep running.
            loop {
                // Check if we still have the core. If not, the core was stolen
                // by another worker.
                let mut core = match self.core.borrow_mut().take() {
                    Some(core) => core,
                    None => return Err(()),
                };

                // Check for a task in the LIFO slot
                let task = match core.lifo_slot.take() {
                    Some(task) => task,
                    None => return Ok(core),
                };

                if coop::has_budget_remaining() {
                    // Run the LIFO task, then loop
                    core.metrics.incr_poll_count();
                    *self.core.borrow_mut() = Some(core);
                    let task = self.worker.handle.shared.owned.assert_owner(task);
                    task.run();
                } else {
                    // Not enough budget left to run the LIFO task, push it to
                    // the back of the queue and return.
                    core.run_queue
                        .push_back(task, self.worker.inject(), &mut core.metrics);
                    return Ok(core);
                }
            }
        })
    }
}

impl Core {

    /// Return the next notified task available to this worker.
    fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
        if self.tick % worker.handle.shared.config.global_queue_interval == 0 {
            worker.inject().pop().or_else(|| self.next_local_task()) /// 在规定的周期内 先从 global queue中 尝试获取
        } else {
            self.next_local_task().or_else(|| worker.inject().pop())
        }
    }

    fn next_local_task(&mut self) -> Option<Notified> { /// 从 run_queue 中 pop Task
        self.lifo_slot.take().or_else(|| self.run_queue.pop())
    }
}

13.7. Task: 的 结构类型: 其中woker.next_local_task(&self) -> Option<Notified> , Notified 为: type Notified = task::Notified<Arc<Handle>>;

13.7.1. pub(crate) struct task::Notified<S: 'static>(Task<S>);

13.7.2. pub(crate) struct Task<S: 'static> { raw: RawTask, _p: PhantomData<S> }

13.7.3. 这里面的 Notified 非常奇怪: 其包含了一个 raw: RawTask 以及 _p: PhantomData, 不占用空间, 就是说 Notified基本等同于 RawTask, 至于, 为什么需要带上 <S>, 仅仅是 传递 Sync, 等属性 吗?

13.7.4. 另外 需要注意: Task 与 blocking/pool.rs 中的 pub(crate) struct Task { task: task::UnownedTask<BlockingSchedule>, mandatory: Mandatory,} 结构不同,

13.7.5. 说明 blocking_spawn 与 worker.spawn 产生的Task 不同

/// A scheduler worker
pub(super) struct Worker {
    /// Used to hand-off a worker's core to another thread.
    core: AtomicCell<Core>,
}
/// Core data
struct Core {
    /// Used to schedule bookkeeping tasks every so often.
    tick: u32,

    /// When a task is scheduled from a worker, it is stored in this slot. The
    /// worker will check this slot for a task **before** checking the run
    /// queue. This effectively results in the **last** scheduled task to be run
    /// next (LIFO). This is an optimization for message passing patterns and
    /// helps to reduce latency.
    lifo_slot: Option<Notified>,

    /// The worker-local run queue.
    run_queue: queue::Local<Arc<Handle>>, /// 每个worker一个 run_queue, 如果没有之后才会从 woker.shared 中获取
}
impl core {
  fn next_local_task(&mut self) -> Option<Notified> {
      self.lifo_slot.take().or_else(|| self.run_queue.pop())
  }

}


13.8. derive#Handle: 主要为 Epoll, 或者其他 IO事件 注册器

13.8.1. source code:

pub(crate) struct Handle {
    /// IO driver handle
    pub(crate) io: IoHandle,

    /// Signal driver handle
    #[cfg_attr(any(not(unix), loom), allow(dead_code))]
    pub(crate) signal: SignalHandle,

    /// Time driver handle
    pub(crate) time: TimeHandle,

    /// Source of `Instant::now()`
    #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))]
    pub(crate) clock: Clock,
}
pub(crate) enum IoHandle {
    Enabled(crate::runtime::io::Handle),
    Disabled(UnparkThread),
}
/// A reference to an I/O driver.
pub(crate) struct Handle {
    /// Registers I/O resources.
    registry: mio::Registry,

    /// Allocates `ScheduledIo` handles when creating new resources.
    io_dispatch: RwLock<IoDispatcher>,

    /// Used to wake up the reactor from a call to `turn`.
    /// Not supported on Wasi due to lack of threading support.
    #[cfg(not(tokio_wasi))]
    waker: mio::Waker,

    pub(crate) metrics: IoDriverMetrics,
}


13.9. Lanch workers: pub(crate) struct Launch(Vec<Arc<Worker>>);

13.10. ?scheduler? 区分与 runtime/handle.rs 这里面就是 Runtime的一个包装,

13.11. Context Switch: rust async runtime like tokio 并不需要 Context Switch https://users.rust-lang.org/t/where-the-tokio-task-context-switch-code/94984/11

13.11.1. Rust 将 .await, async 转换为 状态机, 即便是 多层的嵌套 .await 调用, 依然 可以转换为 状态机 无非是 状态多一点

13.11.2. 状态机的转换中, 跨越 .await 的变量使用 保存在 状态机中, 对于 如下 foo 内调用 bar, 将转换为 FooState (其内嵌 BarState)

  async fn foo() {
    let local = 123;
    bar().await;
    println!("{local}");
  }

  async fn bar() {
    let other = 456;
    yield().await;
    println!("{other}");
  }


  enum FooState {
    Initial,
    AwaitBar { local: i32, bar: BarState },
  }

  enum BarState {
    Initial,
    AwaitYield { other: i32, yield: YeildState },
  }

  FooState::AwaitBar {
    local: 123,
    bar: BarState::AwaitYield {
      other: 345,
      yield: ...
    },
  }

  yield
  bar { other: 345 }
  foo { local: 123 }

/// 如果转换为 代码, fn 则转换为 何种形式的呢?

  async fn foo() {
    let local = 123;
    bar().await;
    println!("{local}");
  }

/// rust 将 bar().await 转换为如下
while barstate.poll() != Poll::Ready() {

}

13.11.3. 所以 一个 Future 的最终结果为 一个巨大的 Enum 包含各种状态, fn 的内嵌调用将会被 压平 为一个函数,(此处错误) 函数不用压平, 还是 各种源代码, pc保持 代码位置。

13.11.4. 一个 函数 是如何处理 heap以及 stack变量的。

  1. func(name) { local val = 10; if name == 5 ; name = val * 5 }
  2. name 应该为 heap 分配的, 传入为 addr
  3. val 需要在stack中分配, 如果跨越 await, 则 保存为 一种状态机
  4. 所以 name = val * 5 将直接更改 name 地址(内存地址)的数值, 也没有什么不妥之处,val 可能为寄存器, 比 name 内存要快不少而已。
  5. 如果 rust 不将 fn 压平的话, 则需要将 .await 转换为 一个 loop/while 来 不断的对Future进行 poll, pc 始终 在 该循环中, 下次 唤醒, 则再次进入
  6. 如果 内嵌的 state 不在 heap中的话, 则 是 在 while 中调用内层的 函数 Poll::NotReady return 之后(, 类似于 name = val * 5, 直接修改自身 存在heap中的值, ) 然后yield

14. codec:

14.1. Frame / 主要作用为 实现 Stream & Sink Trait

*

pub struct Framed<T, U> {
    #[pin]
    inner: FramedImpl<T, U, RWFrames>
}

14.1.1. Stream / Sink: Stream 用于 read, Sink用于 write Sink 中异步发送, 其中包含data的 cache

14.1.2. FramedImpl 为 主要实现

pub(crate) struct FramedImpl<T, U, State> {
    #[pin]
    pub(crate) inner: T,
    pub(crate) state: State,
    pub(crate) codec: U,
}

14.2. impl Stream for Frame<T, U> 最终转移到 FramedImpl 中

pin_project! {
    #[derive(Debug)]
    pub(crate) struct FramedImpl<T, U, State = RWFrames> {
        #[pin]
        pub(crate) inner: T,
        pub(crate) state: State,
        pub(crate) codec: U,
    }
}

14.3.

15. await/async 的使用疑问:

15.1. async 将 fun 转换为 一个 Future

16. wasm: Wasm: 看起来使用比较复杂, 已经很难在手写了

16.1. wasm 为什么会快: 整体上看 wasm 特别像是OS,但是没有提供任何抽象

  1. 二进制格式wasm, 网络传输体积小,不需要转码 快
  2. linear memory: 线性内存。 内存模型简单,没有GC, 不提供内存管理 使其简单
  3. 提议中: 多线程, 能够与os交互 实现 多进程,

16.2. wasm 在broswer 快速使用的原因:

  1. vm(broswer ) 本身存在vm来run js code
  2. web api 作为host api 使用: DOM, WebGl, Audio Api

16.3. 周边工具:

  1. wabbit, xxx.wat –—wat2wasm-—> xxx.wasm -—wasm2wat—> xx.wat, xxx.wasm—objdump—> wasm结构展示
  2. wat 裸写 比较困难,除了需要写xx.wat之外,还需要 对xx.wasm使用js进行包装(wrapper), 才能在web中进行使用。
  3. WASI: 定义了 在 非 浏览器器之外运行 wasm 的 能力。 defines a modular system interface to run WebAssembly outside the web, providing access to things like files, network connections, clocks, and random numbers.
  4. Emscripten 是将 C/C++ 代码编译为 WASM 模块的工具链。 Emscripten 的优势: 支持多种语言,包括 C/C++、Rust、Go 等。 可以生成高效、安全的 WASM 模块。 提供 JavaScript 胶水代码,用于在 Web 浏览器中加载和执行 WASM 模块

16.4. 格式: 参考 text format mdn

16.5. key Concepts:

  1. Module: wasm –broswer–compilie to -> executable machine code, 无状态,仅代表 executable code
  2. Memory: ArrayBuffer 就是 wasm运行的 linear memory?
  3. Table: reference, 不方便存储在 Memory中 (考虑安全以及 便携)
  4. Instance: 1 + 2 + 3 即是: 代码 + 运行状态: js api

16.6. rust <-> wasm: mdn, rustwasm wasm-pack: 是rust 与js 沟通的重要桥梁, 应该试试更高级的沟通,比如 Js中的类型, Rust中的类型 互相调用

  1. abc:
  cargo install wasm-pack
  // cargo.toml

[package]
name = "xxxx"
version = "0.1.0"
authors = ["Your Name <you@example.com>"]
description = "A sample project with wasm-pack"
license = "MIT/Apache-2.0"
repository = "https://github.com/yourgithubusername/hello-wasm"
edition = "2018"

[lib]
crate-type = ["cdylib"]
p
[dependencies]
wasm-bindgen = "0.2"



// lib.rs

use wasm_bindgen::prelude::*;

#[wasm_bindgen] // rust 调用 浏览器的 alert, 函数绑定
extern {
    pub fn alert(s: &str);
}

#[wasm_bindgen] // rust 输出到js 函数绑定
pub fn greet(name: &str) {
    alert(&format!("Hello, {}!", name));
}

wasm-pack build --target web // 注意target = web参数 会安装 wasm32-unknown-unknown 参考: https://rust-lang.github.io/rustup/cross-compilation.html

16.7. wasmtime: 使用rust编写的 wasm 运行时, 即: 符合wasm标准的 rust实现:

  1. wasm book 看了看,没看懂,wasm 如何在 树莓派上运行的, 为什么没有 wasm runtime 呢?

16.8. lldb 使用:

rust使用 rust gdb 不支持 aarch, mac自带 lldb, 所以使用 rust-lldb (rust 提供的对 lldb 调用的封装shell):

16.9. lldb 使用方法: 注意 xxx.file 为 source code file

  1. 启动 lldb xxx.file / lldb; file xxx.file / lldb – xx.file para1 para2
  2. breakpoints:
    • breakpoint set -f xx.file -l num / br s -f xxx.file -l num / b xxx.file : num
    • br list, br del
    • b function_name
  3. next/n, step/s (step into function) c(continue)
  4. p varname
  5. 罗列当前 stack frame var: frame variable/ frame v/ fr v
  6. 当前代码 在哪里: frame select / fr s
  7. 堆栈 inspect: backstack: bt select 0 / bt 0 操作不会破坏堆栈 可以 用来了解 函数调用栈的 详细信息
  8. 设置观察点: watchpoints: watchpoint set variable varname/ watchpoint set variable -w read|write|read_write varname/ w s v varname -w read|write|read_write

16.10. rust-lldb:

  1. rust compile with debug symbol: cargo build 默认为debug / rustc -g / rustc -C debuginfo=2
  2. rust-lldb target/debug/xxxx arg1 arg2

17. ----------–— ^^^^^ the trait `Speak` is not implemented for `&dyn Speak`

18. rust 资源:

19. ? 待探索问题:

19.1. 标准库的 thread 使用

19.2. mspc 使用 + 内部实现原理 (channel)

19.3. HOLD tokio example

19.4. tokio inner

19.5. error handle 方式

Author: wildimagine

Created: 2024-03-13 Wed 13:01