diff options
-rw-r--r-- | src/include/kernel/futexmgr.H | 34 | ||||
-rw-r--r-- | src/include/kernel/syscalls.H | 6 | ||||
-rw-r--r-- | src/include/sys/sync.h | 78 | ||||
-rw-r--r-- | src/kernel/futexmgr.C | 54 | ||||
-rw-r--r-- | src/kernel/syscall.C | 105 | ||||
-rw-r--r-- | src/lib/sync.C | 96 | ||||
-rw-r--r-- | src/usr/testcore/lib/synctest.H | 139 |
7 files changed, 428 insertions, 84 deletions
diff --git a/src/include/kernel/futexmgr.H b/src/include/kernel/futexmgr.H index 75fc31b08..aeba05291 100644 --- a/src/include/kernel/futexmgr.H +++ b/src/include/kernel/futexmgr.H @@ -53,12 +53,18 @@ class FutexManager static uint64_t wait(task_t * i_task, uint64_t * i_addr, uint64_t i_val); /** - * Wakeup threads - * @param[in] i_addr pointer to a futex - * @param[in] i_count The max number of threads to wake - * @returns The number of threads awoken + * Wakeup and optionally move waiting processes. + * @param[in] i_futex1 pointer to a futex + * @param[in] i_count1 The max number of tasks to wake + * @param[in] i_futex2 pointer to a futex, (default NULL) Optional + * futex to move i_count_2 unwoken tasks to. + * @param[in] i_count2 The max number of theads to move from futex1 to + * futex2. (default 0) + * + * @returns The number of tasks awoken */ - static uint64_t wake(uint64_t * i_addr, uint64_t i_count); + static uint64_t wake(uint64_t * i_futex1, uint64_t i_count1, + uint64_t * i_futex2 = NULL, uint64_t i_count2 = 0); protected: @@ -74,22 +80,12 @@ class FutexManager private: // functions - /** - * Put the current processes on a wait queue - * @param[in] i_task pointer to the current task structure - * @param[in] i_addr Futex address - * @param[in] i_val Value that *i_addr should contain - * @returns [0 | error code] if *i_addr != i_val returns EWOULDBLOCK - */ + /** see wait(...) */ uint64_t _wait(task_t * i_task, uint64_t * i_addr, uint64_t i_val); - /** - * Wakeup threads - * @param[in] i_addr pointer to a futex - * @param[in] i_count The max number of threads to wake - * @returns The number of threads awoken - */ - uint64_t _wake(uint64_t * i_addr, uint64_t i_count); + /** see wake(...) */ + uint64_t _wake(uint64_t * i_futex1, uint64_t i_count1, + uint64_t * i_futex2, uint64_t i_count2); private: // data diff --git a/src/include/kernel/syscalls.H b/src/include/kernel/syscalls.H index 856666d74..e09171596 100644 --- a/src/include/kernel/syscalls.H +++ b/src/include/kernel/syscalls.H @@ -77,10 +77,8 @@ namespace Systemcalls /** nanosleep() */ TIME_NANOSLEEP, - /** futex_wait() */ - FUTEX_WAIT, - /** futex_wake() */ - FUTEX_WAKE, + /** futex_wake() futex_wait() futex_requeue() */ + SYS_FUTEX, /** shutdown() */ MISC_SHUTDOWN, diff --git a/src/include/sys/sync.h b/src/include/sys/sync.h index d0074cbc5..58e5c9327 100644 --- a/src/include/sys/sync.h +++ b/src/include/sys/sync.h @@ -51,10 +51,31 @@ typedef _barrier_imp_t barrier_t; #define MUTEX_INITIALIZER {0} /** + * Conditional variable types + */ +struct _cond_imp_t +{ + mutex_t * mutex; + uint64_t sequence; +}; + +typedef _cond_imp_t sync_cond_t; + + +#define COND_INITIALIZER {NULL, 0} + +enum _FUTEX_OP +{ + FUTEX_WAIT, + FUTEX_WAKE, + FUTEX_REQUEUE +}; + +/** * @fn barrier_init * @brief Initialize a barrier object * @param[out] o_barrier The barrier - * @param[in] i_count The number of threads to wait on + * @param[in] i_count The number of tasks to wait on * @pre an uninitialized barrier object * @post a valid barrier object */ @@ -72,7 +93,7 @@ void barrier_destroy (barrier_t * i_barrier); /** * @fn barrier_wait * @brief Wait on a barrier - * This thread will block until the barrier count is reached. + * This tasks will block until the barrier count is reached. * @param[in] i_barrier The barrier */ void barrier_wait (barrier_t * i_barrier); @@ -99,7 +120,7 @@ void mutex_destroy(mutex_t * i_mutex); * @fn mutex_lock * @brief Obtain a lock on a mutex * @param[in] i_mutex - The mutex - * @post returns when this thread has the lock + * @post returns when this task has the lock */ void mutex_lock(mutex_t * i_mutex); @@ -111,4 +132,55 @@ void mutex_lock(mutex_t * i_mutex); */ void mutex_unlock(mutex_t * i_mutex); +/** + * @fn sync_cond_init + * @brief Initialize a condtional variable + * @param i_cond, The conditional variable + * @post + */ +void sync_cond_init(sync_cond_t * i_cond); + +/** + * @fn sync_cond_destroy + * @brief Destroy a conditional variable + * @param i_cond, The conditional variable + */ +void sync_cond_destroy(sync_cond_t * i_cond); + +/** + * @fn sync_cond_wait + * @brief Block the calling task until the specified condition is signaled + * @param i_cond, The condition variable + * @param i_mutex, A mutex for which this task has the lock + * @pre This task must have the mutex lock + * @post This task will have the mutex lock + * @note i_mutex will be unlocked while this task is in the wait state. + * @note failing to lock the mutex before calling this function may cause it + * not to block + */ +int sync_cond_wait(sync_cond_t * i_cond, mutex_t * i_mutex); + +/** + * @fn sync_cond_signal + * @brief Signal to wake a task waiting on the condition varible. + * @param i_cond, The condition variable + * @pre This task must hold the lock on the mutex used in sync_cond_wait() + * @pre sync_cond_wait() must have been called for conditional variable + * @note failing to unlock the mutex after this call may cause the waiting + * task to remain blocked. If there is more than one task waiting on the + * conditional variable then sync_cond_broadcast() should be used instead. + */ +void sync_cond_signal(sync_cond_t * i_cond); + +/** + * @fn sync_cond_broadcast + * @brief Signal to wake all tasks waiting on the condition variable + * @param i_cond, The conditional variable + * @note same restrictions as sync_cond_signal() except this function should + * be used if there is more than one task waiting on the conditional variable. + * There is no guarantee on which waiting task will get the mutex lock first + * when this task unlocks the mutex. + */ +void sync_cond_broadcast(sync_cond_t * i_cond); + #endif diff --git a/src/kernel/futexmgr.C b/src/kernel/futexmgr.C index 606e26c8c..f66452543 100644 --- a/src/kernel/futexmgr.C +++ b/src/kernel/futexmgr.C @@ -42,9 +42,11 @@ uint64_t FutexManager::wait(task_t* i_task, uint64_t * i_addr, uint64_t i_val) //----------------------------------------------------------------------------- -uint64_t FutexManager::wake(uint64_t * i_addr, uint64_t i_count) +uint64_t FutexManager::wake(uint64_t * i_futex1, uint64_t i_count1, + uint64_t * i_futex2, uint64_t i_count2) { - return Singleton<FutexManager>::instance()._wake(i_addr, i_count); + return Singleton<FutexManager>::instance()._wake(i_futex1, i_count1, + i_futex2, i_count2); } //----------------------------------------------------------------------------- @@ -57,7 +59,7 @@ uint64_t FutexManager::_wait(task_t* i_task, uint64_t * i_addr, uint64_t i_val) if(unlikely(*i_addr != i_val)) { - // some other thread has modified the futex + // some other task has modified the futex // bail-out retry required. iv_lock.unlock(); rc = EWOULDBLOCK; @@ -81,18 +83,22 @@ uint64_t FutexManager::_wait(task_t* i_task, uint64_t * i_addr, uint64_t i_val) return rc; } -//----------------------------------------------------------------------------- -uint64_t FutexManager::_wake(uint64_t * i_addr, uint64_t i_count) +// Wake processes. Any number of processes in excess of count1 are not +// woken up but moved to futex2. the number of processes to move +// is capped by count2. +uint64_t FutexManager::_wake(uint64_t * i_futex1, uint64_t i_count1, + uint64_t * i_futex2, uint64_t i_count2 + ) { uint64_t started = 0; - // Remove task(s) from futex queue - // Put it/them on the run queue iv_lock.lock(); - while(started < i_count) + + // First start up to i_count1 task(s) + while(started < i_count1) { - _FutexWait_t * waiter = iv_list.find(i_addr); + _FutexWait_t * waiter = iv_list.find(i_futex1); if(waiter == NULL) { break; @@ -109,6 +115,36 @@ uint64_t FutexManager::_wake(uint64_t * i_addr, uint64_t i_count) wait_task->cpu->scheduler->addTask(wait_task); ++started; } + + if(i_futex2 && i_count2) + { + uint64_t moved = 0; + + // Move up to i_count2 tasks to futex2 + while(moved < i_count2) + { + // What if *i_futex2 got modified !!!! TODO + // Do we need a safety check here (another val param) ???? + _FutexWait_t * waiter = iv_list.find(i_futex1); + if(waiter == NULL) + { + break; + } + + task_t * wait_task = waiter->task; + iv_list.erase(waiter); + + kassert(wait_task != NULL); // should never happen, but... + + waiter->key = i_futex2; + wait_task->state_info = i_futex2; + + iv_list.insert(waiter); + ++moved; + } + } + + iv_lock.unlock(); return started; diff --git a/src/kernel/syscall.C b/src/kernel/syscall.C index 561218b00..f0f5b81f0 100644 --- a/src/kernel/syscall.C +++ b/src/kernel/syscall.C @@ -40,6 +40,7 @@ #include <kernel/stacksegment.H> #include <kernel/heapmgr.H> #include <kernel/intmsghandler.H> +#include <sys/sync.h> extern "C" void kernel_execute_decrementer() @@ -87,8 +88,7 @@ namespace Systemcalls void DevMap(task_t*); void DevUnmap(task_t*); void TimeNanosleep(task_t*); - void FutexWait(task_t *t); - void FutexWake(task_t *t); + void Futex(task_t *t); void Shutdown(task_t *t); void CpuCoreType(task_t *t); void CpuDDLevel(task_t *t); @@ -119,8 +119,7 @@ namespace Systemcalls &TimeNanosleep, // TIME_NANOSLEEP - &FutexWait, // FUTEX_WAIT - &FutexWake, // FUTEX_WAKE + &Futex, // SYS_FUTEX operations &Shutdown, // MISC_SHUTDOWN &CpuCoreType, // MISC_CPUCORETYPE @@ -475,62 +474,78 @@ namespace Systemcalls t->cpu->scheduler->setNextRunnable(); } - /** - * Put task on wait queue based on futex - * @param[in] t: The task to block - */ - void FutexWait(task_t * t) + + void Futex(task_t * t) { - uint64_t uaddr = (uint64_t) TASK_GETARG0(t); - uint64_t val = (uint64_t) TASK_GETARG1(t); + uint64_t op = static_cast<uint64_t>(TASK_GETARG0(t)); + uint64_t futex = static_cast<uint64_t>(TASK_GETARG1(t)); + uint64_t val = static_cast<uint64_t>(TASK_GETARG2(t)); + uint64_t val2 = static_cast<uint64_t>(TASK_GETARG3(t)); + uint64_t futex2 = static_cast<uint64_t>(TASK_GETARG4(t)); + uint64_t rc = 0; // Set RC to success initially. TASK_SETRTN(t,0); - //translate uaddr from user space to kernel space - uaddr = VmmManager::findPhysicalAddress(uaddr); - if(uaddr != (uint64_t)(-EFAULT)) - { - uint64_t rc = FutexManager::wait(t,(uint64_t *)uaddr,val); - if (rc != 0) // Can only set rc if we still have control of the task, - // which is only (for certain) on error rc's. - { - TASK_SETRTN(t,rc); - } - } - else + futex = VmmManager::findPhysicalAddress(futex); + if(futex == (static_cast<uint64_t>(-EFAULT))) { printk("Task %d terminated. No physical address found for address 0x%p", - t->tid, (void *) uaddr); + t->tid, + reinterpret_cast<void *>(futex)); + TaskManager::endTask(t, NULL, TASK_STATUS_CRASHED); + return; } - } - /** - * Wake tasks on futex wait queue - * @param[in] t: The current task - */ - void FutexWake(task_t * t) - { - uint64_t uaddr = (uint64_t) TASK_GETARG0(t); - uint64_t count = (uint64_t) TASK_GETARG1(t); + uint64_t * futex_p = reinterpret_cast<uint64_t *>(futex); - // translate uaddr from user space to kernel space - uaddr = VmmManager::findPhysicalAddress(uaddr); - if(uaddr != (uint64_t)(-EFAULT)) + switch(op) { - uint64_t started = FutexManager::wake((uint64_t *)uaddr,count); + case FUTEX_WAIT: // Put task on wait queue based on futex - TASK_SETRTN(t,started); - } - else - { - printk("Task %d terminated. No physical address found for address 0x%p", - t->tid, (void *) uaddr); - TaskManager::endTask(t, NULL, TASK_STATUS_CRASHED); - } + rc = FutexManager::wait(t, futex_p, val); + + // Can only be set rc if control of the task is still had, + // which is only, for certain, on error rc's + if(rc != 0) + { + TASK_SETRTN(t,rc); + } + break; + + case FUTEX_WAKE: // Wake task(s) on the futex wait queue + + rc = FutexManager::wake(futex_p, val); + TASK_SETRTN(t,rc); + break; + + case FUTEX_REQUEUE: + // Wake (val) task(s) on futex && requeue remaining tasks on futex2 + + futex2 = VmmManager::findPhysicalAddress(futex2); + if(futex2 == (static_cast<uint64_t>(-EFAULT))) + { + printk("Task %d terminated. No physical address found for address 0x%p", + t->tid, + reinterpret_cast<void *>(futex2)); + + TaskManager::endTask(t, NULL, TASK_STATUS_CRASHED); + return; + } + + rc = FutexManager::wake(futex_p, val, + reinterpret_cast<uint64_t *>(futex2), + val2); + break; + + default: + printk("ERROR Futex invalid op %ld\n",op); + TASK_SETRTN(t,static_cast<uint64_t>(-EINVAL)); + }; } + /** * Shutdown all CPUs * @param[in] t: The current task diff --git a/src/lib/sync.C b/src/lib/sync.C index 0b57aa0b7..7dce4c62d 100644 --- a/src/lib/sync.C +++ b/src/lib/sync.C @@ -24,6 +24,8 @@ #include <sys/sync.h> #include <sys/syscall.h> #include <assert.h> +#include <errno.h> +#include <kernel/console.H> using namespace Systemcalls; @@ -31,14 +33,39 @@ using namespace Systemcalls; uint64_t futex_wait(uint64_t * i_addr, uint64_t i_val) { - return (uint64_t) _syscall2(FUTEX_WAIT,i_addr, (void *)i_val); + return (uint64_t) _syscall5(SYS_FUTEX, + (void *)FUTEX_WAIT, + i_addr, + (void *)i_val, + NULL, + NULL); } //----------------------------------------------------------------------------- uint64_t futex_wake(uint64_t * i_addr, uint64_t i_count) { - return (uint64_t) _syscall2(FUTEX_WAKE, i_addr, (void *)i_count); + return (uint64_t) _syscall5(SYS_FUTEX, + (void *)FUTEX_WAKE, + i_addr, + (void *)i_count, + NULL, + NULL); +} + +//----------------------------------------------------------------------------- + +uint64_t futex_requeue(uint64_t * i_addr, + uint64_t i_count1, + uint64_t i_count2, + uint64_t * i_futex2) +{ + return (uint64_t) _syscall5(SYS_FUTEX, + (void *)FUTEX_REQUEUE, + i_addr, + (void *)i_count1, + (void *)i_count2, + i_futex2); } //----------------------------------------------------------------------------- @@ -127,7 +154,7 @@ void mutex_lock(mutex_t * i_mutex) { futex_wait( &(i_mutex->iv_val), 2); l_count = __sync_lock_test_and_set(&(i_mutex->iv_val),2); - // if more than one thread gets out - one continues while + // if more than one task gets out - one continues while // the rest get blocked again. } } @@ -151,10 +178,71 @@ void mutex_unlock(mutex_t * i_mutex) if(unlikely(2 <= l_count)) { i_mutex->iv_val = 0; - futex_wake(&(i_mutex->iv_val), 1); // wake one thread + futex_wake(&(i_mutex->iv_val), 1); // wake one task } return; } +void sync_cond_init(sync_cond_t * i_cond) +{ + i_cond->mutex = NULL; + i_cond->sequence = 0; +} + +void sync_cond_destroy(sync_cond_t * i_cond) +{ + // don't need to do anything +} + +int sync_cond_wait(sync_cond_t * i_cond, mutex_t * i_mutex) +{ + uint64_t seq = i_cond->sequence; + if(i_cond->mutex != i_mutex) + { + if(i_cond->mutex) return EINVAL; + + // Atomically set mutex + __sync_bool_compare_and_swap(&i_cond->mutex, NULL, i_mutex); + if(i_cond->mutex != i_mutex) return EINVAL; + } + + mutex_unlock(i_mutex); + + futex_wait( &(i_cond->sequence), seq); + + + // Can't continue until i_mutex lock is obtained. + //mutex_lock(i_mutex); <--- Does not work - Havn't figure out why + // but the followin code does work. + // this code locks the mutex and makes sure it's contended. + while(0 != __sync_lock_test_and_set(&(i_mutex->iv_val), 2)) + { + futex_wait(&(i_mutex->iv_val),2); + } + + return 0; +} + +void sync_cond_signal(sync_cond_t * i_cond) +{ + __sync_fetch_and_add(&(i_cond->sequence),1); + // Wake up one + futex_wake(&(i_cond->sequence), 1); +} + +void sync_cond_broadcast(sync_cond_t * i_cond) +{ + mutex_t * m = i_cond->mutex; + + // no mutex means no waiters + if(!m) return; + + // wake up all + __sync_fetch_and_add(&(i_cond->sequence),1); + + // need to wake up one on the sequence and + // re-queue the rest onto the mutex m; + futex_requeue(&(i_cond->sequence),1,UINT64_MAX,&(m->iv_val)); +} diff --git a/src/usr/testcore/lib/synctest.H b/src/usr/testcore/lib/synctest.H index e1de74843..a5d15b3f2 100644 --- a/src/usr/testcore/lib/synctest.H +++ b/src/usr/testcore/lib/synctest.H @@ -32,8 +32,10 @@ #include <sys/sync.h> #include <sys/task.h> #include <sys/time.h> +#include <utility> #include <kernel/timemgr.H> +#include <kernel/console.H> class SyncTest: public CxxTest::TestSuite { @@ -80,10 +82,79 @@ class SyncTest: public CxxTest::TestSuite barrier_destroy(&barrier); } + void testConditionVariable() + { + + mutex_init(&mutex); + sync_cond_init(&cond_var); + + counter = 0; + + barrier_init(&barrier,4); + + TASK_INFO t1(this,1); + TASK_INFO t2(this,2); + TASK_INFO t3(this,3); + + task_create(watch_counter, &t1); + task_create(increment, &t2); + task_create(increment, &t3); + + barrier_wait(&barrier); + TS_TRACE("Conditional Variable test final count = %ld",counter); + barrier_destroy(&barrier); + sync_cond_destroy(&cond_var); + mutex_destroy(&mutex); + // test is success if it completes w/o hang + } + + void testConditionVariableBroadcast() + { + mutex_init(&mutex); + sync_cond_init(&cond_var); + + counter = 0; + + barrier_init(&barrier,5); + + TASK_INFO t1(this,4); + TASK_INFO t2(this,5); + TASK_INFO t3(this,6); + TASK_INFO t4(this,7); + + task_create(watch_counter, &t1); + task_create(watch_counter, &t2); + task_create(increment1, &t3); + task_create(increment1, &t4); + + barrier_wait(&barrier); + TS_TRACE("Conditional Variable test final count = %ld",counter); + barrier_destroy(&barrier); + sync_cond_destroy(&cond_var); + mutex_destroy(&mutex); + // test is success if it completes w/o hang + } + + + + + private: + enum + { + TO_COUNT = 10, + COUNT_SIGNAL = 13 + }; + + typedef std::pair<SyncTest*, size_t> TASK_INFO; + mutex_t mutex; barrier_t barrier; + sync_cond_t cond_var; + + size_t counter; + static void func1(void * i_p) { @@ -127,6 +198,74 @@ class SyncTest: public CxxTest::TestSuite task_end(); } + static void watch_counter(void * i_p) + { + TASK_INFO * info = (TASK_INFO *) i_p; + SyncTest * my = info->first; + TS_TRACE("CONDVAR task %ld. Start watching counter",info->second); + mutex_lock(&(my->mutex)); + while(my->counter < COUNT_SIGNAL) + { + sync_cond_wait(&(my->cond_var),&(my->mutex)); + TS_TRACE("CONDVAR task %ld. Condition signal received", + info->second); + my->counter += 100; + TS_TRACE("CONDVAR task %ld. Counter = %ld", + info->second,my->counter); + } + mutex_unlock(&(my->mutex)); + barrier_wait(&(my->barrier)); + } + + + static void increment(void * i_p) + { + TASK_INFO * info = (TASK_INFO *) i_p; + SyncTest * my = info->first; + TS_TRACE("CONDVAR task %ld. start Increment counter",info->second); + for(size_t i = 0; i < TO_COUNT; ++i) + { + mutex_lock(&(my->mutex)); + ++(my->counter); + + if(my->counter == COUNT_SIGNAL) + { + sync_cond_signal(&(my->cond_var)); + TS_TRACE("CONDVAR task %ld. INCR counter = %ld Threshold" + " reached",info->second,my->counter); + } + TS_TRACE("CONDVAR task %ld INCR counter = %ld Unlocking mutex", + info->second, my->counter); + mutex_unlock(&(my->mutex)); + nanosleep(0,TEN_CTX_SWITCHES_NS); + } + barrier_wait(&(my->barrier)); + } + + static void increment1(void * i_p) + { + TASK_INFO * info = (TASK_INFO *) i_p; + SyncTest * my = info->first; + TS_TRACE("CONDVAR task %ld. start Increment counter",info->second); + for(size_t i = 0; i < TO_COUNT; ++i) + { + mutex_lock(&(my->mutex)); + ++(my->counter); + + if(my->counter == COUNT_SIGNAL) + { + sync_cond_broadcast(&(my->cond_var)); + TS_TRACE("CONDVAR task %ld. INCR counter = %ld Threshold" + " reached",info->second,my->counter); + } + TS_TRACE("CONDVAR task %ld INCR counter = %ld Unlocking mutex", + info->second, my->counter); + mutex_unlock(&(my->mutex)); + nanosleep(0,TEN_CTX_SWITCHES_NS); + } + barrier_wait(&(my->barrier)); + } + }; #endif |