seastar::futureget() 之后就像被掏空了一样。

最近有同事说,在测试 Release build 的时候发现 crimson::do_until() 会产生 segfault。重现的代码很简单:

  future<> test()
  {
    return crimson::do_until([this]() -> future<bool> {
      if (i < 5) {
        ++i;
        return ertr::make_ready_future<bool>(false);
      } else {
        return ertr::make_ready_future<bool>(true);
      }
    });
  }

看了下,的确如此。祭出 seastar-addr2line

?? ??:0
seastar::internal::future_base::detach_promise() at /var/ssd/ceph/build-release/../src/seastar/include/seastar/core/future.hh:1169
 (inlined by) seastar::internal::future_base::schedule(seastar::task*, seastar::future_state_base*) at /var/ssd/ceph/build-release/../src/seastar/include/seastar/core/future.hh:1175
 (inlined by) seastar::future<bool>::schedule(seastar::continuation_base<bool>*) at /var/ssd/ceph/build-release/../src/seastar/include/seastar/core/future.hh:1372
 (inlined by) void seastar::future<bool>::schedule<seastar::internal::promise_base_with_type<>, crimson::do_until<a_basic_test_t::test()::{lambda()#1}>(a_basic_test_t::test()::{lambda()#1})::{lambda(a_basic_test_t::test()::{lambda()#1}&&)#1}, seastar::future<bool>::then_impl_nrvo<a_basic_test_t::test()::{lambda()#1}&&, crimson::errorator<crimson::unthrowable_wrapper<std::error_code const&, crimson::ec<(std::errc)22> > >::_future<crimson::errorated_future_marker<> > >(a_basic_test_t::test()::{lambda()#1}&&)::{lambda(seastar::internal::promise_base_with_type<>&&, a_basic_test_t::test()::{lambda()#1}&, seastar::future_state<bool>&&)#1}>(seastar::internal::promise_base_with_type<>, crimson::errorated_future_marker<>&&, seastar::future<bool>::then_impl_nrvo<a_basic_test_t::test()::{lambda()#1}&&, crimson::errorator<crimson::unthrowable_wrapper<std::error_code const&, crimson::ec<(std::errc)22> > >::_future<crimson::errorated_future_marker<> > >(a_basic_test_t::test()::{lambda()#1}&&)::{lambda(seastar::internal::promise_base_with_type<>&&, a_basic_test_t::test()::{lambda()#1}&, seastar::future_state<bool>&&)#1}&&) at /var/ssd/ceph/build-release/../src/seastar/include/seastar/core/future.hh:1391
 (inlined by) crimson::errorator<crimson::unthrowable_wrapper<std::error_code const&, crimson::ec<(std::errc)22> > >::_future<crimson::errorated_future_marker<> > seastar::future<bool>::then_impl_nrvo<crimson::do_until<a_basic_test_t::test()::{lambda()#1}>(a_basic_test_t::test()::{lambda()#1})::{lambda(a_basic_test_t::test()::{lambda()#1}&&)#1}, crimson::errorator<crimson::unthrowable_wrapper<std::error_code const&, crimson::ec<(std::errc)22> > >::_future<crimson::errorated_future_marker<> > >(crimson::do_until<a_basic_test_t::test()::{lambda()#1}>(a_basic_test_t::test()::{lambda()#1})::{lambda(a_basic_test_t::test()::{lambda()#1}&&)#1}) at /var/ssd/ceph/build-release/../src/seastar/include/seastar/core/future.hh:1571
 (inlined by) crimson::errorator<crimson::unthrowable_wrapper<std::error_code const&, crimson::ec<(std::errc)22> > >::_future<crimson::errorated_future_marker<> > seastar::future<bool>::then_impl<crimson::do_until<a_basic_test_t::test()::{lambda()#1}>(a_basic_test_t::test()::{lambda()#1})::{lambda(a_basic_test_t::test()::{lambda()#1}&&)#1}, crimson::errorator<crimson::unthrowable_wrapper<std::error_code const&, crimson::ec<(std::errc)22> > >::_future<crimson::errorated_future_marker<> > >(crimson::do_until<a_basic_test_t::test()::{lambda()#1}>(a_basic_test_t::test()::{lambda()#1})::{lambda(a_basic_test_t::test()::{lambda()#1}&&)#1}) at /var/ssd/ceph/build-release/../src/seastar/include/seastar/core/future.hh:1605
 (inlined by) seastar::internal::future_result<a_basic_test_t::test()::{lambda()#1}, bool>::future_type seastar::internal::call_then_impl<seastar::future<bool> >::run<crimson::do_until<a_basic_test_t::test()::{lambda()#1}>(a_basic_test_t::test()::{lambda()#1})::{lambda(a_basic_test_t::test()::{lambda()#1}&&)#1}>(seastar::future<bool>&, crimson::do_until<a_basic_test_t::test()::{lambda()#1}>(a_basic_test_t::test()::{lambda()#1})::{lambda(a_basic_test_t::test()::{lambda()#1}&&)#1}) at /var/ssd/ceph/build-release/../src/seastar/include/seastar/core/future.hh:1234
 (inlined by) crimson::errorator<crimson::unthrowable_wrapper<std::error_code const&, crimson::ec<(std::errc)22> > >::_future<crimson::errorated_future_marker<> > seastar::future<bool>::then<crimson::do_until<a_basic_test_t::test()::{lambda()#1}>(a_basic_test_t::test()::{lambda()#1})::{lambda(a_basic_test_t::test()::{lambda()#1}&&)#1}, crimson::errorator<crimson::unthrowable_wrapper<std::error_code const&, crimson::ec<(std::errc)22> > >::_future<crimson::errorated_future_marker<> > >(crimson::do_until<a_basic_test_t::test()::{lambda()#1}>(a_basic_test_t::test()::{lambda()#1})::{lambda(a_basic_test_t::test()::{lambda()#1}&&)#1}) at /var/ssd/ceph/build-release/../src/seastar/include/seastar/core/future.hh:1520
 (inlined by) auto crimson::errorator<crimson::unthrowable_wrapper<std::error_code const&, crimson::ec<(std::errc)22> > >::_future<crimson::errorated_future_marker<bool> >::_then<crimson::do_until<a_basic_test_t::test()::{lambda()#1}>(a_basic_test_t::test()::{lambda()#1})::{lambda(a_basic_test_t::test()::{lambda()#1}&&)#1}>(crimson::do_until<a_basic_test_t::test()::{lambda()#1}>(a_basic_test_t::test()::{lambda()#1})::{lambda(a_basic_test_t::test()::{lambda()#1}&&)#1}) at /var/ssd/ceph/build-release/../src/crimson/common/errorator.h:676
 (inlined by) auto crimson::do_until<a_basic_test_t::test()::{lambda()#1}>(a_basic_test_t::test()::{lambda()#1}) at /var/ssd/ceph/build-release/../src/crimson/common/errorator.h:68
seastar::noncopyable_function<void ()>::direct_vtable_for<seastar::async<a_basic_test_t_0_basic_Test::TestBody()::{lambda()#1}>(seastar::thread_attributes, std::decay&&, (std::decay<a_basic_test_t_0_basic_Test::TestBody()::{lambda()#1}>::type&&)...)::{lambda()#1}>::call(seastar::noncopyable_function<void ()> const*) at /var/ssd/ceph/build-release/../src/test/crimson/test_errorator.cc:22
seastar::noncopyable_function<void ()>::operator()() const at /var/ssd/ceph/build-release/../src/seastar/include/seastar/util/noncopyable_function.hh:201
 (inlined by) seastar::thread_context::main() at /var/ssd/ceph/build-release/../src/seastar/src/core/thread.cc:297

这个问题的特点是 Release 版本才有。注意到 Seastar 中 future::schedule() 的实现 (文中把过长的行折成多行,方便阅读)

    template <typename Pr, typename Func, typename Wrapper>
    void schedule(Pr&& pr, Func&& func, Wrapper&& wrapper) noexcept {
        // If this new throws a std::bad_alloc there is nothing that
        // can be done about it. The corresponding future is not ready
        // and we cannot break the chain. Since this function is
        // noexcept, it will call std::terminate if new throws.
        memory::disable_failure_guard dfg;
        auto tws = new continuation<Pr, Func, Wrapper, T SEASTAR_ELLIPSIS>(std::move(pr),
                                                                           std::move(func),
                                                                           std::move(wrapper));
        // In a debug build we schedule ready futures, but not in
        // other build modes.
#ifdef SEASTAR_DEBUG
        if (_state.available()) {
            tws->set_state(std::move(_state));
            ::seastar::schedule(tws);
            return;
        }
#endif
        schedule(tws);
        _state._u.st = future_state_base::state::invalid;
    }

其中对 Debug 版本有特殊的处理,如果 future::_state 当时就有,那么调用 continuation_base::set_state()_state 搬到新建的 tws 里面。 future_state::move_it()future_state(future_state&&) 的具体实现。它比较直接,把值 move 或者 memmove 到自己手里面。

但是 Release 版则会调用 future_base::schedule(tws, &tws→_state)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    promise_base* detach_promise() noexcept {
        _promise->_state = nullptr;
        _promise->_future = nullptr;
        return std::exchange(_promise, nullptr);
    }

    void schedule(task* tws, future_state_base* state) noexcept {
        promise_base* p = detach_promise();
        p->_state = state;
        p->_task = tws;
    }

segfault 发生在第 2 行,所以说 _promise 在那个时候已经是个空指针。这是谁干的呢?我们回过头看看 do_until() 的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template<typename AsyncAction>
inline auto do_until(AsyncAction action) {
  using futurator = \
    ::seastar::futurize<std::result_of_t<AsyncAction()>>;

  while (true) {
    auto f = futurator::invoke(action);
    if (!seastar::need_preempt() && f.available() && f.get()) {
      return futurator::type::errorator_type::template make_ready_future<>();
    }
    if (!f.available() || seastar::need_preempt()) {
      return std::move(f)._then(
        [ action = std::move(action)] (auto &&done) mutable {
          if (done) {
            return futurator::type::errorator_type::template make_ready_future<>();
          }
          return ::crimson::do_until(
            std::move(action));
        });
    }
    if (f.failed()) {
      return futurator::type::errorator_type::template make_exception_future2<>(
        f.get_exception()
      );
    }
  }
}

思路很简单,就是递归调用,直到 f 返回真。因为递归是通过 post message 风格的调用实现的,所以不需要担心栈的大小问题。其中最可疑的地方就是 ._then() 了,它其实就是 future::then()。后者分情况讨论,如果 future 的 state 是立等可取的,那么就直接 futurator::invoke() 了,否则调用 then_impl_nrvo()。接下来则是 future::schedule()schedule() 会把 future_promise 取走,留下一个空指针。这下子就和前面的 backtrace 对上了。但是稍等,为什么要调用 schedule() 呢?test() 里面都返回的 future 的 state 都是 available 的啊。

我们再看看 future::get()

    [[gnu::always_inline]]
    value_type&& get() {
        wait();
        return get_available_state_ref().take();
    }

这个 take() 很奇怪。get()take() 的语义是不一样的。一个是返回拷贝或者引用,一个则是从所有者手中 夺走,然后返回抢到的东西。果不其然:

    T&& take() && {
        assert(available());
        if (_u.st >= state::exception_min) {
            std::move(*this).rethrow_exception();
        }
        _u.st = state::result_unavailable;
        return static_cast<T&&>(this->uninitialized_get());
    }

所以 take() 之后,future 里面原来的 state 成了 unavailable 的状态。难怪 do_until() 回过头再看 f 的时候,它已经变成了 unavailable,所以就傻乎乎地去调用 _then() 了。

再看看 get_available_state_ref()

    [[gnu::always_inline]]
    future_state&& get_available_state_ref() noexcept {
        if (_promise) {
            detach_promise();
        }
        return std::move(_state);
    }

原来 _promise 是在这里被拿走的,罪魁祸首并非 schedule()。人家只是受害者。调整一下顺序,最后再 get(),问题就解决了。

这两天学 Rust。现炒现卖一下,用 Rust 来写这个有 bug 的 do_until,就是

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct Future {
  state: State,
}

impl Future {
  fn get(self) {
    // takes the ownership of self
  }
  fn _then(self) {
    // also takes the ownership of self
  }
}

loop {
  let mut f = futurator::invoke(action);
  if (!seastar::need_preempt() && f.available() && f.get()) {
    return now();
  }
  if (!f.available() || seastar::need_preempt()) {
    return f._then( /* */);
  }
}

编译的时候 rustc 就会出错:

error[E0382]: use of moved value: `f`
 --> src/main.rs:19:9
   |
16 |     if (!seastar::need_preempt() && f.available() && f.get()) {
   |                                                      -- value moved here
17 |       return now();
18 |     }
19 |     if (!f.available() || seastar::need_preempt()) {
   |          ^^ value used here after move
   |
  = note: move occurs because `f` has type `Future`, which does not implement
  the `Copy` trait

顿时有弃暗投明的冲动。