Asynchronous Programming in Rust

Asynchronous Programming in Rust


记录对Rust异步编程的一些理解

关于Tokio Runtime

Tokio is able to concurrently run many tasks on a few threads by repeatedly swapping the currently running task on each thread. However, this kind of swapping can only happen at .await points, so code that spends a long time without reaching an .await will prevent other tasks from running. To combat this, Tokio provides two kinds of threads: Core threads and blocking threads. The core threads are where all asynchronous code runs, and Tokio will by default spawn one for each CPU core. The blocking threads are spawned on demand, can be used to run blocking code that would otherwise block other tasks from running and are kept alive when not used for a certain amount of time which can be configured with thread_keep_alive. Since it is not possible for Tokio to swap out blocking tasks, like it can do with asynchronous code, the upper limit on the number of blocking threads is very large. These limits can be configured on the Builder.

Tokio 通过在每个线程上频繁换入换出当前正在运行的Task,达到能够在多个线程上同时运行多个Task的效果。但是,这种交换只能在代码执行到.await时触发,因此一个Task长时间未执行.await将阻止其他Task的运行(因为线程一直被当前执行的Task占用)。为了解决这个问题,Tokio 提供了两种线程:核心线程阻塞线程。核心线程是所有异步代码运行的地方,默认情况下,Tokio 将为每个 CPU core生成一个。阻塞线程是按需产生的,可用于运行阻塞代码,否则会阻塞其他Task的运行,并且在一段时间不使用时保持活动状态,可以配置为thread_keep_alive. 由于 Tokio 不可能像处理异步代码那样交换阻塞任务,因此阻塞线程数的上限可以非常大,可以在通过Runtime builder来手动配置。

也就是说,Tokio Runtime在会存在两个线程池:阻塞线程池(Blocking threads)异步线程池(Core threads)

  • 阻塞线程池可以用于运行同步代码,线程池默认上限为512,我们可以手动指定最大数量,它会随着程序的运行动态申请。
  • 异步线程池可以用于运行异步代码,线程池中默认的数量为CPU核心数,异步线程池中的线程占满后数量不会再增加。

关于同步与异步互相调用

正常同步和异步的执行是这样的:

  • 在完全同步的代码中,执行逻辑比较简单,占用当前线程顺序执行即可,阻塞当前线程是可以理解的也是必须的。
  • 在常规的异步代码中,遇到耗时会阻塞线程的操作可以通过调用.await,将其换出线程并换入其他Task,不会阻塞当前异步线程。

Rust中普通fn方法为同步方法,执行过程中会持续占用当前线程;而async fn为异步方法,在执行过程中遇到.await时会被换出当前线程。

所以换个角度理解,一个方法是普通fn,那么就应当阻塞当前线程,而如果一个方法是async fn,就应该对外保证不会长时间阻塞当前线程,这也是我们在开发过程中应当遵守的原则。

我们的Tokio使用场景中,存在许多场景是比上面的情况要复杂的,有同步与异步混用的情况,即同步代码调用异步代码或是异步代码调用同步代码,开发过程中出现的一个问题是,异步调用了同步代码,同步代码阻塞了异步线程导致了异步Task无法得到线程去执行,所以说到底,这个问题产生的原因在于我们违背了这个原则:一个方法是普通fn,那么就应当阻塞当前线程,而如果一个方法是async fn,就应该对外保证不会长时间阻塞当前线程


Spawn,Spawn blocking,Block in place

在编写同步与异步代码的过程中应当想办法满足上面的原则:

同步调用异步(fn ->async):最外层函数为fn,所以要保证整个函数为同步的,可以使用block_on()的方式:

pub fn block_on<F: Future>(&self, future: F) -> F::Output

Run a future to completion on the Tokio runtime. This is the runtime’s entry point.

This runs the given future on the current thread, blocking until it is complete, and yielding its resolved result. Any tasks or timers which the future spawns internally will be executed on the runtime.

block会阻塞当前线程,直至运行完成,这样可以保证整个函数对外是同步的。

异步调用同步(async->fn):最外层函数为async,所以要保证整个函数为异步的,可以使用spawn_blocking()的方式:

pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R> 

Runs the provided closure on a thread where blocking is acceptable.

spawn blocking会使用阻塞线程池上的线程来运行同步函数,调用spawn blocking的函数会让出异步线程让其他Task运行。

关于block in place,曾向Tokio的作者请教过:about block in place()

block in place


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!