this repo has no description
at fixPythonPipStalling 201 lines 7.4 kB view raw
1#include <stdatomic.h> 2#include <limits.h> 3#include <stdbool.h> 4 5#include "lock.h" 6 7#ifndef XTRACE_LOCK_DEBUG 8 #define XTRACE_LOCK_DEBUG 0 9#endif 10 11#if XTRACE_LOCK_DEBUG 12 #define xtrace_lock_debug_internal(x, ...) xtrace_log(x "\n", ## __VA_ARGS__) 13 #undef XTRACE_INLINE 14 #define XTRACE_INLINE 15#else 16 #define xtrace_lock_debug_internal(x, ...) 17#endif 18 19#define xtrace_lock_debug(x, ...) xtrace_lock_debug_internal("lock %p: " x, lock, ## __VA_ARGS__) 20#define xtrace_once_debug(x, ...) xtrace_lock_debug_internal("once %p: " x, once, ## __VA_ARGS__) 21 22// 23// xtrace_lock 24// 25// simple lock implementation on top of futexes, based on https://github.com/eliben/code-for-blog/blob/master/2018/futex-basics/mutex-using-futex.cpp 26// 27 28// used to hide the use of C11 atomics from external code (e.g. C++ code, which can't use C11 atomics). 29// it's *technically* undefined behavior to cast the external one to this one, but that's okay; 30// we know our exact environment and it works in it 31typedef struct xtrace_lock_internal { 32 _Atomic uint32_t state; 33} xtrace_lock_internal_t; 34 35enum xtrace_lock_state { 36 // the lock is unlocked (duh) 37 xtrace_lock_state_unlocked = 0, 38 39 // the lock is locked, but no one is waiting for it 40 xtrace_lock_state_locked_uncontended = 1, 41 42 // the locked is locked and there are waiters 43 xtrace_lock_state_locked_contended = 2, 44}; 45 46XTRACE_INLINE 47uint32_t cmpxchg_wrapper_u32(_Atomic uint32_t* atom, uint32_t expected, uint32_t desired) { 48 atomic_compare_exchange_strong_explicit(atom, &expected, desired, memory_order_acq_rel, memory_order_acquire); 49 return expected; 50}; 51 52extern "C" 53void xtrace_lock_lock(xtrace_lock_t* _lock) { 54 xtrace_lock_internal_t* lock = (xtrace_lock_internal_t*)_lock; 55 56 xtrace_lock_debug("locking"); 57 58 // we do the initial exchange using `xtrace_lock_state_locked_uncontended` because 59 // if it was previously unlocked, it is now locked but nobody is waiting for it 60 uint32_t prev = cmpxchg_wrapper_u32(&lock->state, xtrace_lock_state_unlocked, xtrace_lock_state_locked_uncontended); 61 62 xtrace_lock_debug("previous state was %u", prev); 63 64 // if it was not unlocked, we need to do some waiting 65 if (prev != xtrace_lock_state_unlocked) { 66 do { 67 // we can skip the cmpxchg if the lock was already contended 68 // also, when we do the cmpxchg, we need to make sure it's still locked 69 // we use `xtrace_lock_state_locked_contended` because it was either uncontended (but it is now) or it was already contended 70 if (prev == xtrace_lock_state_locked_contended || cmpxchg_wrapper_u32(&lock->state, xtrace_lock_state_locked_uncontended, xtrace_lock_state_locked_contended) != xtrace_lock_state_unlocked) { 71 xtrace_lock_debug("going to wait"); 72 // do the actual sleeping 73 // we expect `xtrace_lock_state_locked_contended` because we don't want to sleep if it's not contended 74 __linux_futex_reterr((int*)&lock->state, FUTEX_WAIT, xtrace_lock_state_locked_contended, nullptr, 0, 0); 75 xtrace_lock_debug("awoken"); 76 } 77 78 // loop as long as the lock was not unlocked 79 // we use `xtrace_lock_state_locked_contended` for the same reason as before 80 } while ((prev = cmpxchg_wrapper_u32(&lock->state, xtrace_lock_state_unlocked, xtrace_lock_state_locked_contended)) != xtrace_lock_state_unlocked); 81 xtrace_lock_debug("lock acquired after sleeping"); 82 } 83 84 xtrace_lock_debug("lock acquired"); 85}; 86 87extern "C" 88void xtrace_lock_unlock(xtrace_lock_t* _lock) { 89 xtrace_lock_internal_t* lock = (xtrace_lock_internal_t*)_lock; 90 91 xtrace_lock_debug("unlocking"); 92 93 uint32_t prev = atomic_exchange_explicit(&lock->state, xtrace_lock_state_unlocked, memory_order_acq_rel); 94 95 xtrace_lock_debug("previous state was %u", prev); 96 97 // if it was previously contended, then we need to wake someone up 98 if (prev == xtrace_lock_state_locked_contended) { 99 xtrace_lock_debug("waking someone up"); 100 __linux_futex_reterr((int*)&lock->state, FUTEX_WAKE, 1, NULL, 0, 0); 101 } 102}; 103 104// 105// xtrace_once 106// 107// see https://github.com/bugaevc/serenity/blob/1fc3f2ba84d9c75c6c30e62014dabe4fcd62aae1/Libraries/LibPthread/pthread_once.cpp 108// 109 110typedef struct xtrace_once_internal { 111 _Atomic uint32_t state; 112} xtrace_once_internal_t; 113 114enum xtrace_once_state { 115 // the once hasn't been triggered yet 116 xtrace_once_state_untriggered = 0, 117 118 // the once has been triggered, but no one is waiting for it 119 xtrace_once_state_triggered_uncontended = 1, 120 121 // the once has been triggered and there are waiters 122 xtrace_once_state_triggered_contended = 2, 123 124 // the once has been triggered and has completed execution 125 xtrace_once_state_completed = 3, 126}; 127 128extern "C" 129void xtrace_once(xtrace_once_t* _once, xtrace_once_callback callback) { 130 xtrace_once_internal_t* once = (xtrace_once_internal_t*)_once; 131 132 xtrace_once_debug("evaluating..."); 133 134 uint32_t prev = xtrace_once_state_untriggered; 135 bool exchanged = atomic_compare_exchange_strong_explicit(&once->state, &prev, xtrace_once_state_triggered_uncontended, memory_order_acq_rel, memory_order_acquire); 136 137 xtrace_once_debug("previous state was %u", prev); 138 139 if (exchanged) { 140 xtrace_once_debug("performing callback..."); 141 142 // we had `xtrace_once_state_untriggered` and now we have `xtrace_once_state_triggered_uncontended`, so let's do the callback 143 callback(); 144 145 xtrace_once_debug("callback done; updating state..."); 146 147 // now let's update the state to tell everyone we're done 148 prev = atomic_exchange_explicit(&once->state, xtrace_once_state_completed, memory_order_acq_rel); 149 150 xtrace_once_debug("previous state was %u", prev); 151 152 switch (prev) { 153 // no one was waiting, so we have no one to wake up 154 case xtrace_once_state_triggered_uncontended: break; 155 case xtrace_once_state_triggered_contended: { 156 xtrace_once_debug("waking up all waiters..."); 157 // otherwise, we have to wake someone up 158 __linux_futex_reterr((int*)&once->state, FUTEX_WAKE, INT_MAX, NULL, 0, 0); 159 } break; 160 } 161 162 return; 163 } 164 165 xtrace_once_debug("someone else is performing the callback"); 166 167 while (true) { 168 switch (prev) { 169 case xtrace_once_state_completed: { 170 // we're done 171 xtrace_once_debug("callback completed by someone else; returning..."); 172 return; 173 }; // not reached 174 case xtrace_once_state_triggered_uncontended: { 175 // somebody is already performing the callback; 176 // now we're waiting, so let's update the state for that 177 xtrace_once_debug("someone is already performing the callback with no waiters; becoming first waiter..."); 178 exchanged = atomic_compare_exchange_strong_explicit(&once->state, &prev, xtrace_once_state_triggered_contended, memory_order_acq_rel, memory_order_acquire); 179 if (!exchanged) { 180 xtrace_once_debug("state changed from uncontended; re-evaluating..."); 181 // something changed, let's re-evaluate 182 continue; 183 } 184 prev = xtrace_once_state_triggered_contended; 185 }; // fall through 186 case xtrace_once_state_triggered_contended: { 187 // somebody is already performing the callback and there are already waiters; 188 // let's wait 189 xtrace_once_debug("someone is already performing the callback with waiters; going to wait..."); 190 __linux_futex_reterr((int*)&once->state, FUTEX_WAIT, prev, NULL, 0, 0); 191 192 xtrace_once_debug("woken up"); 193 194 // we got woken up, but that may have been spurious; 195 // let's loop again to re-evaluate 196 prev = atomic_load_explicit(&once->state, memory_order_acquire); 197 xtrace_once_debug("previous state was %u; going to re-evaluate...", prev); 198 } break; 199 } 200 } 201};