1 +
//
 
2 +
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
 
3 +
// Copyright (c) 2026 Steve Gerbino
 
4 +
//
 
5 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
6 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
7 +
//
 
8 +
// Official repository: https://github.com/cppalliance/corosio
 
9 +
//
 
10 +

 
11 +
#ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
 
12 +
#define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
 
13 +

 
14 +
#include <boost/corosio/timer.hpp>
 
15 +
#include <boost/corosio/io_context.hpp>
 
16 +
#include <boost/corosio/detail/scheduler_op.hpp>
 
17 +
#include <boost/corosio/native/native_scheduler.hpp>
 
18 +
#include <boost/corosio/detail/intrusive.hpp>
 
19 +
#include <boost/corosio/detail/thread_local_ptr.hpp>
 
20 +
#include <boost/capy/error.hpp>
 
21 +
#include <boost/capy/ex/execution_context.hpp>
 
22 +
#include <boost/capy/ex/executor_ref.hpp>
 
23 +
#include <system_error>
 
24 +

 
25 +
#include <atomic>
 
26 +
#include <chrono>
 
27 +
#include <coroutine>
 
28 +
#include <cstddef>
 
29 +
#include <limits>
 
30 +
#include <mutex>
 
31 +
#include <optional>
 
32 +
#include <stop_token>
 
33 +
#include <vector>
 
34 +

 
35 +
namespace boost::corosio::detail {
 
36 +

 
37 +
struct scheduler;
 
38 +

 
39 +
/*
 
40 +
    Timer Service
 
41 +
    =============
 
42 +

 
43 +
    Data Structures
 
44 +
    ---------------
 
45 +
    waiter_node holds per-waiter state: coroutine handle, executor,
 
46 +
    error output, stop_token, embedded completion_op. Each concurrent
 
47 +
    co_await t.wait() allocates one waiter_node.
 
48 +

 
49 +
    timer_service::implementation holds per-timer state: expiry,
 
50 +
    heap index, and an intrusive_list of waiter_nodes. Multiple
 
51 +
    coroutines can wait on the same timer simultaneously.
 
52 +

 
53 +
    timer_service owns a min-heap of active timers, a free list
 
54 +
    of recycled impls, and a free list of recycled waiter_nodes. The
 
55 +
    heap is ordered by expiry time; the scheduler queries
 
56 +
    nearest_expiry() to set the epoll/timerfd timeout.
 
57 +

 
58 +
    Optimization Strategy
 
59 +
    ---------------------
 
60 +
    1. Deferred heap insertion — expires_after() stores the expiry
 
61 +
       but does not insert into the heap. Insertion happens in wait().
 
62 +
    2. Thread-local impl cache — single-slot per-thread cache.
 
63 +
    3. Embedded completion_op — eliminates heap allocation per fire/cancel.
 
64 +
    4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
 
65 +
    5. might_have_pending_waits_ flag — skips lock when no wait issued.
 
66 +
    6. Thread-local waiter cache — single-slot per-thread cache.
 
67 +

 
68 +
    Concurrency
 
69 +
    -----------
 
70 +
    stop_token callbacks can fire from any thread. The impl_
 
71 +
    pointer on waiter_node is used as a "still in list" marker.
 
72 +
*/
 
73 +

 
74 +
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
 
75 +

 
76 +
inline void timer_service_invalidate_cache() noexcept;
 
77 +

 
78 +
// timer_service class body — member function definitions are
 
79 +
// out-of-class (after implementation and waiter_node are complete)
 
80 +
class BOOST_COROSIO_DECL timer_service final
 
81 +
    : public capy::execution_context::service
 
82 +
    , public io_object::io_service
 
83 +
{
 
84 +
public:
 
85 +
    using clock_type = std::chrono::steady_clock;
 
86 +
    using time_point = clock_type::time_point;
 
87 +

 
88 +
    class callback
 
89 +
    {
 
90 +
        void* ctx_         = nullptr;
 
91 +
        void (*fn_)(void*) = nullptr;
 
92 +

 
93 +
    public:
 
94 +
        callback() = default;
 
95 +
        callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
 
96 +

 
97 +
        explicit operator bool() const noexcept
 
98 +
        {
 
99 +
            return fn_ != nullptr;
 
100 +
        }
 
101 +
        void operator()() const
 
102 +
        {
 
103 +
            if (fn_)
 
104 +
                fn_(ctx_);
 
105 +
        }
 
106 +
    };
 
107 +

 
108 +
    struct implementation;
 
109 +

 
110 +
private:
 
111 +
    struct heap_entry
 
112 +
    {
 
113 +
        time_point time_;
 
114 +
        implementation* timer_;
 
115 +
    };
 
116 +

 
117 +
    scheduler* sched_ = nullptr;
 
118 +
    mutable std::mutex mutex_;
 
119 +
    std::vector<heap_entry> heap_;
 
120 +
    implementation* free_list_     = nullptr;
 
121 +
    waiter_node* waiter_free_list_ = nullptr;
 
122 +
    callback on_earliest_changed_;
 
123 +
    // Avoids mutex in nearest_expiry() and empty()
 
124 +
    mutable std::atomic<std::int64_t> cached_nearest_ns_{
 
125 +
        (std::numeric_limits<std::int64_t>::max)()};
 
126 +

 
127 +
public:
 
128 +
    inline timer_service(capy::execution_context&, scheduler& sched)
 
129 +
        : sched_(&sched)
 
130 +
    {
 
131 +
    }
 
132 +

 
133 +
    inline scheduler& get_scheduler() noexcept
 
134 +
    {
 
135 +
        return *sched_;
 
136 +
    }
 
137 +

 
138 +
    ~timer_service() override = default;
 
139 +

 
140 +
    timer_service(timer_service const&)            = delete;
 
141 +
    timer_service& operator=(timer_service const&) = delete;
 
142 +

 
143 +
    inline void set_on_earliest_changed(callback cb)
 
144 +
    {
 
145 +
        on_earliest_changed_ = cb;
 
146 +
    }
 
147 +

 
148 +
    inline bool empty() const noexcept
 
149 +
    {
 
150 +
        return cached_nearest_ns_.load(std::memory_order_acquire) ==
 
151 +
            (std::numeric_limits<std::int64_t>::max)();
 
152 +
    }
 
153 +

 
154 +
    inline time_point nearest_expiry() const noexcept
 
155 +
    {
 
156 +
        auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
 
157 +
        return time_point(time_point::duration(ns));
 
158 +
    }
 
159 +

 
160 +
    inline void shutdown() override;
 
161 +
    inline io_object::implementation* construct() override;
 
162 +
    inline void destroy(io_object::implementation* p) override;
 
163 +
    inline void destroy_impl(implementation& impl);
 
164 +
    inline waiter_node* create_waiter();
 
165 +
    inline void destroy_waiter(waiter_node* w);
 
166 +
    inline std::size_t update_timer(implementation& impl, time_point new_time);
 
167 +
    inline void insert_waiter(implementation& impl, waiter_node* w);
 
168 +
    inline std::size_t cancel_timer(implementation& impl);
 
169 +
    inline void cancel_waiter(waiter_node* w);
 
170 +
    inline std::size_t cancel_one_waiter(implementation& impl);
 
171 +
    inline std::size_t process_expired();
 
172 +

 
173 +
private:
 
174 +
    inline void refresh_cached_nearest() noexcept
 
175 +
    {
 
176 +
        auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
 
177 +
                                : heap_[0].time_.time_since_epoch().count();
 
178 +
        cached_nearest_ns_.store(ns, std::memory_order_release);
 
179 +
    }
 
180 +

 
181 +
    inline void remove_timer_impl(implementation& impl);
 
182 +
    inline void up_heap(std::size_t index);
 
183 +
    inline void down_heap(std::size_t index);
 
184 +
    inline void swap_heap(std::size_t i1, std::size_t i2);
 
185 +
};
 
186 +

 
187 +
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
 
188 +
    : intrusive_list<waiter_node>::node
 
189 +
{
 
190 +
    // Embedded completion op — avoids heap allocation per fire/cancel
 
191 +
    struct completion_op final : scheduler_op
 
192 +
    {
 
193 +
        waiter_node* waiter_ = nullptr;
 
194 +

 
195 +
        static void do_complete(
 
196 +
            void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
 
197 +

 
198 +
        completion_op() noexcept : scheduler_op(&do_complete) {}
 
199 +

 
200 +
        void operator()() override;
 
201 +
        // No-op — lifetime owned by waiter_node, not the scheduler queue
 
202 +
        void destroy() override {}
 
203 +
    };
 
204 +

 
205 +
    // Per-waiter stop_token cancellation
 
206 +
    struct canceller
 
207 +
    {
 
208 +
        waiter_node* waiter_;
 
209 +
        void operator()() const;
 
210 +
    };
 
211 +

 
212 +
    // nullptr once removed from timer's waiter list (concurrency marker)
 
213 +
    timer_service::implementation* impl_ = nullptr;
 
214 +
    timer_service* svc_                  = nullptr;
 
215 +
    std::coroutine_handle<> h_;
 
216 +
    capy::executor_ref d_;
 
217 +
    std::error_code* ec_out_ = nullptr;
 
218 +
    std::stop_token token_;
 
219 +
    std::optional<std::stop_callback<canceller>> stop_cb_;
 
220 +
    completion_op op_;
 
221 +
    std::error_code ec_value_;
 
222 +
    waiter_node* next_free_ = nullptr;
 
223 +

 
224 +
    waiter_node() noexcept
 
225 +
    {
 
226 +
        op_.waiter_ = this;
 
227 +
    }
 
228 +
};
 
229 +

 
230 +
struct timer_service::implementation final : timer::implementation
 
231 +
{
 
232 +
    using clock_type = std::chrono::steady_clock;
 
233 +
    using time_point = clock_type::time_point;
 
234 +
    using duration   = clock_type::duration;
 
235 +

 
236 +
    timer_service* svc_ = nullptr;
 
237 +
    intrusive_list<waiter_node> waiters_;
 
238 +

 
239 +
    // Free list linkage (reused when impl is on free_list)
 
240 +
    implementation* next_free_ = nullptr;
 
241 +

 
242 +
    inline explicit implementation(timer_service& svc) noexcept;
 
243 +

 
244 +
    inline std::coroutine_handle<> wait(
 
245 +
        std::coroutine_handle<>,
 
246 +
        capy::executor_ref,
 
247 +
        std::stop_token,
 
248 +
        std::error_code*) override;
 
249 +
};
 
250 +

 
251 +
// Thread-local caches avoid hot-path mutex acquisitions:
 
252 +
// 1. Impl cache — single-slot, validated by comparing svc_
 
253 +
// 2. Waiter cache — single-slot, no service affinity
 
254 +
// All caches are cleared by timer_service_invalidate_cache() during shutdown.
 
255 +

 
256 +
inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
 
257 +
inline thread_local_ptr<waiter_node> tl_cached_waiter;
 
258 +

 
259 +
inline timer_service::implementation*
 
260 +
try_pop_tl_cache(timer_service* svc) noexcept
 
261 +
{
 
262 +
    auto* impl = tl_cached_impl.get();
 
263 +
    if (impl)
 
264 +
    {
 
265 +
        tl_cached_impl.set(nullptr);
 
266 +
        if (impl->svc_ == svc)
 
267 +
            return impl;
 
268 +
        // Stale impl from a destroyed service
 
269 +
        delete impl;
 
270 +
    }
 
271 +
    return nullptr;
 
272 +
}
 
273 +

 
274 +
inline bool
 
275 +
try_push_tl_cache(timer_service::implementation* impl) noexcept
 
276 +
{
 
277 +
    if (!tl_cached_impl.get())
 
278 +
    {
 
279 +
        tl_cached_impl.set(impl);
 
280 +
        return true;
 
281 +
    }
 
282 +
    return false;
 
283 +
}
 
284 +

 
285 +
inline waiter_node*
 
286 +
try_pop_waiter_tl_cache() noexcept
 
287 +
{
 
288 +
    auto* w = tl_cached_waiter.get();
 
289 +
    if (w)
 
290 +
    {
 
291 +
        tl_cached_waiter.set(nullptr);
 
292 +
        return w;
 
293 +
    }
 
294 +
    return nullptr;
 
295 +
}
 
296 +

 
297 +
inline bool
 
298 +
try_push_waiter_tl_cache(waiter_node* w) noexcept
 
299 +
{
 
300 +
    if (!tl_cached_waiter.get())
 
301 +
    {
 
302 +
        tl_cached_waiter.set(w);
 
303 +
        return true;
 
304 +
    }
 
305 +
    return false;
 
306 +
}
 
307 +

 
308 +
inline void
 
309 +
timer_service_invalidate_cache() noexcept
 
310 +
{
 
311 +
    delete tl_cached_impl.get();
 
312 +
    tl_cached_impl.set(nullptr);
 
313 +

 
314 +
    delete tl_cached_waiter.get();
 
315 +
    tl_cached_waiter.set(nullptr);
 
316 +
}
 
317 +

 
318 +
// timer_service out-of-class member function definitions
 
319 +

 
320 +
inline timer_service::implementation::implementation(
 
321 +
    timer_service& svc) noexcept
 
322 +
    : svc_(&svc)
 
323 +
{
 
324 +
}
 
325 +

 
326 +
inline void
 
327 +
timer_service::shutdown()
 
328 +
{
 
329 +
    timer_service_invalidate_cache();
 
330 +

 
331 +
    // Cancel waiting timers still in the heap
 
332 +
    for (auto& entry : heap_)
 
333 +
    {
 
334 +
        auto* impl = entry.timer_;
 
335 +
        while (auto* w = impl->waiters_.pop_front())
 
336 +
        {
 
337 +
            w->stop_cb_.reset();
 
338 +
            w->h_.destroy();
 
339 +
            sched_->work_finished();
 
340 +
            delete w;
 
341 +
        }
 
342 +
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
 
343 +
        delete impl;
 
344 +
    }
 
345 +
    heap_.clear();
 
346 +
    cached_nearest_ns_.store(
 
347 +
        (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
 
348 +

 
349 +
    // Delete free-listed impls
 
350 +
    while (free_list_)
 
351 +
    {
 
352 +
        auto* next = free_list_->next_free_;
 
353 +
        delete free_list_;
 
354 +
        free_list_ = next;
 
355 +
    }
 
356 +

 
357 +
    // Delete free-listed waiters
 
358 +
    while (waiter_free_list_)
 
359 +
    {
 
360 +
        auto* next = waiter_free_list_->next_free_;
 
361 +
        delete waiter_free_list_;
 
362 +
        waiter_free_list_ = next;
 
363 +
    }
 
364 +
}
 
365 +

 
366 +
inline io_object::implementation*
 
367 +
timer_service::construct()
 
368 +
{
 
369 +
    implementation* impl = try_pop_tl_cache(this);
 
370 +
    if (impl)
 
371 +
    {
 
372 +
        impl->svc_        = this;
 
373 +
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
 
374 +
        impl->might_have_pending_waits_ = false;
 
375 +
        return impl;
 
376 +
    }
 
377 +

 
378 +
    std::lock_guard lock(mutex_);
 
379 +
    if (free_list_)
 
380 +
    {
 
381 +
        impl              = free_list_;
 
382 +
        free_list_        = impl->next_free_;
 
383 +
        impl->next_free_  = nullptr;
 
384 +
        impl->svc_        = this;
 
385 +
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
 
386 +
        impl->might_have_pending_waits_ = false;
 
387 +
    }
 
388 +
    else
 
389 +
    {
 
390 +
        impl = new implementation(*this);
 
391 +
    }
 
392 +
    return impl;
 
393 +
}
 
394 +

 
395 +
inline void
 
396 +
timer_service::destroy(io_object::implementation* p)
 
397 +
{
 
398 +
    destroy_impl(static_cast<implementation&>(*p));
 
399 +
}
 
400 +

 
401 +
inline void
 
402 +
timer_service::destroy_impl(implementation& impl)
 
403 +
{
 
404 +
    cancel_timer(impl);
 
405 +

 
406 +
    if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
 
407 +
    {
 
408 +
        std::lock_guard lock(mutex_);
 
409 +
        remove_timer_impl(impl);
 
410 +
        refresh_cached_nearest();
 
411 +
    }
 
412 +

 
413 +
    if (try_push_tl_cache(&impl))
 
414 +
        return;
 
415 +

 
416 +
    std::lock_guard lock(mutex_);
 
417 +
    impl.next_free_ = free_list_;
 
418 +
    free_list_      = &impl;
 
419 +
}
 
420 +

 
421 +
inline waiter_node*
 
422 +
timer_service::create_waiter()
 
423 +
{
 
424 +
    if (auto* w = try_pop_waiter_tl_cache())
 
425 +
        return w;
 
426 +

 
427 +
    std::lock_guard lock(mutex_);
 
428 +
    if (waiter_free_list_)
 
429 +
    {
 
430 +
        auto* w           = waiter_free_list_;
 
431 +
        waiter_free_list_ = w->next_free_;
 
432 +
        w->next_free_     = nullptr;
 
433 +
        return w;
 
434 +
    }
 
435 +

 
436 +
    return new waiter_node();
 
437 +
}
 
438 +

 
439 +
inline void
 
440 +
timer_service::destroy_waiter(waiter_node* w)
 
441 +
{
 
442 +
    if (try_push_waiter_tl_cache(w))
 
443 +
        return;
 
444 +

 
445 +
    std::lock_guard lock(mutex_);
 
446 +
    w->next_free_     = waiter_free_list_;
 
447 +
    waiter_free_list_ = w;
 
448 +
}
 
449 +

 
450 +
inline std::size_t
 
451 +
timer_service::update_timer(implementation& impl, time_point new_time)
 
452 +
{
 
453 +
    bool in_heap =
 
454 +
        (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
 
455 +
    if (!in_heap && impl.waiters_.empty())
 
456 +
        return 0;
 
457 +

 
458 +
    bool notify = false;
 
459 +
    intrusive_list<waiter_node> canceled;
 
460 +

 
461 +
    {
 
462 +
        std::lock_guard lock(mutex_);
 
463 +

 
464 +
        while (auto* w = impl.waiters_.pop_front())
 
465 +
        {
 
466 +
            w->impl_ = nullptr;
 
467 +
            canceled.push_back(w);
 
468 +
        }
 
469 +

 
470 +
        if (impl.heap_index_ < heap_.size())
 
471 +
        {
 
472 +
            time_point old_time           = heap_[impl.heap_index_].time_;
 
473 +
            heap_[impl.heap_index_].time_ = new_time;
 
474 +

 
475 +
            if (new_time < old_time)
 
476 +
                up_heap(impl.heap_index_);
 
477 +
            else
 
478 +
                down_heap(impl.heap_index_);
 
479 +

 
480 +
            notify = (impl.heap_index_ == 0);
 
481 +
        }
 
482 +

 
483 +
        refresh_cached_nearest();
 
484 +
    }
 
485 +

 
486 +
    std::size_t count = 0;
 
487 +
    while (auto* w = canceled.pop_front())
 
488 +
    {
 
489 +
        w->ec_value_ = make_error_code(capy::error::canceled);
 
490 +
        sched_->post(&w->op_);
 
491 +
        ++count;
 
492 +
    }
 
493 +

 
494 +
    if (notify)
 
495 +
        on_earliest_changed_();
 
496 +

 
497 +
    return count;
 
498 +
}
 
499 +

 
500 +
inline void
 
501 +
timer_service::insert_waiter(implementation& impl, waiter_node* w)
 
502 +
{
 
503 +
    bool notify = false;
 
504 +
    {
 
505 +
        std::lock_guard lock(mutex_);
 
506 +
        if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
 
507 +
        {
 
508 +
            impl.heap_index_ = heap_.size();
 
509 +
            heap_.push_back({impl.expiry_, &impl});
 
510 +
            up_heap(heap_.size() - 1);
 
511 +
            notify = (impl.heap_index_ == 0);
 
512 +
            refresh_cached_nearest();
 
513 +
        }
 
514 +
        impl.waiters_.push_back(w);
 
515 +
    }
 
516 +
    if (notify)
 
517 +
        on_earliest_changed_();
 
518 +
}
 
519 +

 
520 +
inline std::size_t
 
521 +
timer_service::cancel_timer(implementation& impl)
 
522 +
{
 
523 +
    if (!impl.might_have_pending_waits_)
 
524 +
        return 0;
 
525 +

 
526 +
    // Not in heap and no waiters — just clear the flag
 
527 +
    if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
 
528 +
        impl.waiters_.empty())
 
529 +
    {
 
530 +
        impl.might_have_pending_waits_ = false;
 
531 +
        return 0;
 
532 +
    }
 
533 +

 
534 +
    intrusive_list<waiter_node> canceled;
 
535 +

 
536 +
    {
 
537 +
        std::lock_guard lock(mutex_);
 
538 +
        remove_timer_impl(impl);
 
539 +
        while (auto* w = impl.waiters_.pop_front())
 
540 +
        {
 
541 +
            w->impl_ = nullptr;
 
542 +
            canceled.push_back(w);
 
543 +
        }
 
544 +
        refresh_cached_nearest();
 
545 +
    }
 
546 +

 
547 +
    impl.might_have_pending_waits_ = false;
 
548 +

 
549 +
    std::size_t count = 0;
 
550 +
    while (auto* w = canceled.pop_front())
 
551 +
    {
 
552 +
        w->ec_value_ = make_error_code(capy::error::canceled);
 
553 +
        sched_->post(&w->op_);
 
554 +
        ++count;
 
555 +
    }
 
556 +

 
557 +
    return count;
 
558 +
}
 
559 +

 
560 +
inline void
 
561 +
timer_service::cancel_waiter(waiter_node* w)
 
562 +
{
 
563 +
    {
 
564 +
        std::lock_guard lock(mutex_);
 
565 +
        // Already removed by cancel_timer or process_expired
 
566 +
        if (!w->impl_)
 
567 +
            return;
 
568 +
        auto* impl = w->impl_;
 
569 +
        w->impl_   = nullptr;
 
570 +
        impl->waiters_.remove(w);
 
571 +
        if (impl->waiters_.empty())
 
572 +
        {
 
573 +
            remove_timer_impl(*impl);
 
574 +
            impl->might_have_pending_waits_ = false;
 
575 +
        }
 
576 +
        refresh_cached_nearest();
 
577 +
    }
 
578 +

 
579 +
    w->ec_value_ = make_error_code(capy::error::canceled);
 
580 +
    sched_->post(&w->op_);
 
581 +
}
 
582 +

 
583 +
inline std::size_t
 
584 +
timer_service::cancel_one_waiter(implementation& impl)
 
585 +
{
 
586 +
    if (!impl.might_have_pending_waits_)
 
587 +
        return 0;
 
588 +

 
589 +
    waiter_node* w = nullptr;
 
590 +

 
591 +
    {
 
592 +
        std::lock_guard lock(mutex_);
 
593 +
        w = impl.waiters_.pop_front();
 
594 +
        if (!w)
 
595 +
            return 0;
 
596 +
        w->impl_ = nullptr;
 
597 +
        if (impl.waiters_.empty())
 
598 +
        {
 
599 +
            remove_timer_impl(impl);
 
600 +
            impl.might_have_pending_waits_ = false;
 
601 +
        }
 
602 +
        refresh_cached_nearest();
 
603 +
    }
 
604 +

 
605 +
    w->ec_value_ = make_error_code(capy::error::canceled);
 
606 +
    sched_->post(&w->op_);
 
607 +
    return 1;
 
608 +
}
 
609 +

 
610 +
inline std::size_t
 
611 +
timer_service::process_expired()
 
612 +
{
 
613 +
    intrusive_list<waiter_node> expired;
 
614 +

 
615 +
    {
 
616 +
        std::lock_guard lock(mutex_);
 
617 +
        auto now = clock_type::now();
 
618 +

 
619 +
        while (!heap_.empty() && heap_[0].time_ <= now)
 
620 +
        {
 
621 +
            implementation* t = heap_[0].timer_;
 
622 +
            remove_timer_impl(*t);
 
623 +
            while (auto* w = t->waiters_.pop_front())
 
624 +
            {
 
625 +
                w->impl_     = nullptr;
 
626 +
                w->ec_value_ = {};
 
627 +
                expired.push_back(w);
 
628 +
            }
 
629 +
            t->might_have_pending_waits_ = false;
 
630 +
        }
 
631 +

 
632 +
        refresh_cached_nearest();
 
633 +
    }
 
634 +

 
635 +
    std::size_t count = 0;
 
636 +
    while (auto* w = expired.pop_front())
 
637 +
    {
 
638 +
        sched_->post(&w->op_);
 
639 +
        ++count;
 
640 +
    }
 
641 +

 
642 +
    return count;
 
643 +
}
 
644 +

 
645 +
inline void
 
646 +
timer_service::remove_timer_impl(implementation& impl)
 
647 +
{
 
648 +
    std::size_t index = impl.heap_index_;
 
649 +
    if (index >= heap_.size())
 
650 +
        return; // Not in heap
 
651 +

 
652 +
    if (index == heap_.size() - 1)
 
653 +
    {
 
654 +
        // Last element, just pop
 
655 +
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
 
656 +
        heap_.pop_back();
 
657 +
    }
 
658 +
    else
 
659 +
    {
 
660 +
        // Swap with last and reheapify
 
661 +
        swap_heap(index, heap_.size() - 1);
 
662 +
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
 
663 +
        heap_.pop_back();
 
664 +

 
665 +
        if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
 
666 +
            up_heap(index);
 
667 +
        else
 
668 +
            down_heap(index);
 
669 +
    }
 
670 +
}
 
671 +

 
672 +
inline void
 
673 +
timer_service::up_heap(std::size_t index)
 
674 +
{
 
675 +
    while (index > 0)
 
676 +
    {
 
677 +
        std::size_t parent = (index - 1) / 2;
 
678 +
        if (!(heap_[index].time_ < heap_[parent].time_))
 
679 +
            break;
 
680 +
        swap_heap(index, parent);
 
681 +
        index = parent;
 
682 +
    }
 
683 +
}
 
684 +

 
685 +
inline void
 
686 +
timer_service::down_heap(std::size_t index)
 
687 +
{
 
688 +
    std::size_t child = index * 2 + 1;
 
689 +
    while (child < heap_.size())
 
690 +
    {
 
691 +
        std::size_t min_child = (child + 1 == heap_.size() ||
 
692 +
                                 heap_[child].time_ < heap_[child + 1].time_)
 
693 +
            ? child
 
694 +
            : child + 1;
 
695 +

 
696 +
        if (heap_[index].time_ < heap_[min_child].time_)
 
697 +
            break;
 
698 +

 
699 +
        swap_heap(index, min_child);
 
700 +
        index = min_child;
 
701 +
        child = index * 2 + 1;
 
702 +
    }
 
703 +
}
 
704 +

 
705 +
inline void
 
706 +
timer_service::swap_heap(std::size_t i1, std::size_t i2)
 
707 +
{
 
708 +
    heap_entry tmp                = heap_[i1];
 
709 +
    heap_[i1]                     = heap_[i2];
 
710 +
    heap_[i2]                     = tmp;
 
711 +
    heap_[i1].timer_->heap_index_ = i1;
 
712 +
    heap_[i2].timer_->heap_index_ = i2;
 
713 +
}
 
714 +

 
715 +
// waiter_node out-of-class member function definitions
 
716 +

 
717 +
inline void
 
718 +
waiter_node::canceller::operator()() const
 
719 +
{
 
720 +
    waiter_->svc_->cancel_waiter(waiter_);
 
721 +
}
 
722 +

 
723 +
inline void
 
724 +
waiter_node::completion_op::do_complete(
 
725 +
    void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
 
726 +
{
 
727 +
    if (!owner)
 
728 +
        return;
 
729 +
    static_cast<completion_op*>(base)->operator()();
 
730 +
}
 
731 +

 
732 +
inline void
 
733 +
waiter_node::completion_op::operator()()
 
734 +
{
 
735 +
    auto* w = waiter_;
 
736 +
    w->stop_cb_.reset();
 
737 +
    if (w->ec_out_)
 
738 +
        *w->ec_out_ = w->ec_value_;
 
739 +

 
740 +
    auto h      = w->h_;
 
741 +
    auto d      = w->d_;
 
742 +
    auto* svc   = w->svc_;
 
743 +
    auto& sched = svc->get_scheduler();
 
744 +

 
745 +
    svc->destroy_waiter(w);
 
746 +

 
747 +
    d.post(h);
 
748 +
    sched.work_finished();
 
749 +
}
 
750 +

 
751 +
inline std::coroutine_handle<>
 
752 +
timer_service::implementation::wait(
 
753 +
    std::coroutine_handle<> h,
 
754 +
    capy::executor_ref d,
 
755 +
    std::stop_token token,
 
756 +
    std::error_code* ec)
 
757 +
{
 
758 +
    // Already-expired fast path — no waiter_node, no mutex.
 
759 +
    // Post instead of dispatch so the coroutine yields to the
 
760 +
    // scheduler, allowing other queued work to run.
 
761 +
    if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
 
762 +
    {
 
763 +
        if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
 
764 +
        {
 
765 +
            if (ec)
 
766 +
                *ec = {};
 
767 +
            d.post(h);
 
768 +
            return std::noop_coroutine();
 
769 +
        }
 
770 +
    }
 
771 +

 
772 +
    auto* w    = svc_->create_waiter();
 
773 +
    w->impl_   = this;
 
774 +
    w->svc_    = svc_;
 
775 +
    w->h_      = h;
 
776 +
    w->d_      = d;
 
777 +
    w->token_  = std::move(token);
 
778 +
    w->ec_out_ = ec;
 
779 +

 
780 +
    svc_->insert_waiter(*this, w);
 
781 +
    might_have_pending_waits_ = true;
 
782 +
    svc_->get_scheduler().work_started();
 
783 +

 
784 +
    if (w->token_.stop_possible())
 
785 +
        w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
 
786 +

 
787 +
    return std::noop_coroutine();
 
788 +
}
 
789 +

 
790 +
// Free functions
 
791 +

 
792 +
struct timer_service_access
 
793 +
{
 
794 +
    static native_scheduler& get_scheduler(io_context& ctx) noexcept
 
795 +
    {
 
796 +
        return static_cast<native_scheduler&>(*ctx.sched_);
 
797 +
    }
 
798 +
};
 
799 +

 
800 +
// Bypass find_service() mutex by reading the scheduler's cached pointer
 
801 +
inline io_object::io_service&
 
802 +
timer_service_direct(capy::execution_context& ctx) noexcept
 
803 +
{
 
804 +
    return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
 
805 +
                .timer_svc_;
 
806 +
}
 
807 +

 
808 +
inline std::size_t
 
809 +
timer_service_update_expiry(timer::implementation& base)
 
810 +
{
 
811 +
    auto& impl = static_cast<timer_service::implementation&>(base);
 
812 +
    return impl.svc_->update_timer(impl, impl.expiry_);
 
813 +
}
 
814 +

 
815 +
inline std::size_t
 
816 +
timer_service_cancel(timer::implementation& base) noexcept
 
817 +
{
 
818 +
    auto& impl = static_cast<timer_service::implementation&>(base);
 
819 +
    return impl.svc_->cancel_timer(impl);
 
820 +
}
 
821 +

 
822 +
inline std::size_t
 
823 +
timer_service_cancel_one(timer::implementation& base) noexcept
 
824 +
{
 
825 +
    auto& impl = static_cast<timer_service::implementation&>(base);
 
826 +
    return impl.svc_->cancel_one_waiter(impl);
 
827 +
}
 
828 +

 
829 +
inline timer_service&
 
830 +
get_timer_service(capy::execution_context& ctx, scheduler& sched)
 
831 +
{
 
832 +
    return ctx.make_service<timer_service>(sched);
 
833 +
}
 
834 +

 
835 +
} // namespace boost::corosio::detail
 
836 +

 
837 +
#endif