/*************************************************************************** * RCS INFORMATION: * * $RCSfile: WKFThreads.C,v $ * $Author: johns $ $Locker: $ $State: Exp $ * $Revision: 1.20 $ $Date: 2020/02/26 15:43:31 $ * ***************************************************************************/ /** * \file WKFThreads.C * \brief Cross-platform APIs for multithreading and atomic ops. * * Code for spawning threads on various platforms. * Code donated by John Stone, john.stone@gmail.com * This code was originally written for the * Tachyon Parallel/Multiprocessor Ray Tracer. * Improvements have been donated by Mr. Stone on an * ongoing basis. This code is provided under the * three clause BSD Open Source License. * NOTE: The latest version of this source file can be re-generated by * running the sequence of 'sed' commands shown at the top of the * "threads.c" file within the Tachyon source distribution. * * $Revision: 1.20 $ $Date: 2020/02/26 15:43:31 $ * * \author John E. Stone - john.stone@gmail.com * \copyright * This trivial code is made available under the "new" 3-clause BSD license, * and/or any of the GPL licenses you prefer. * Feel free to use the code and modify as you see fit. */ /* Tachyon copyright reproduced below */ /* * Copyright (c) 1994-2016 John E. Stone * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include #include #include /** * If compiling on Linux, enable the GNU CPU affinity functions in both * libc and the libpthreads */ #if defined(__linux) #define _GNU_SOURCE 1 #include #endif #include "WKFThreads.h" /* needed for CPU info APIs and flag macros */ #if defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 1300) #include #include #endif #ifdef _MSC_VER #if 0 #define WKFUSENEWWIN32APIS 1 #define _WIN32_WINNT 0x0400 /**< needed for TryEnterCriticalSection(), etc */ #define WINVER 0x0400 /**< needed for TryEnterCriticalSection(), etc */ #endif #include /**< main Win32 APIs and types */ #include /**< system services headers */ #endif #if defined(_AIX) || defined(_CRAY) || defined(__irix) || defined(__linux) || defined(__osf__) || defined(__sun) #include /**< sysconf() headers, used by most systems */ #endif #if defined(__APPLE__) && defined(WKFTHREADS) #if 1 #include #include /**< OSX >= 10.7 queries sysctl() for CPU count */ #else #include /**< Deprecated Carbon APIs for Multiprocessing */ #endif #endif #if defined(__hpux) #include /**< HP-UX Multiprocessing headers */ #endif #ifdef __cplusplus extern "C" { #endif int wkf_thread_numphysprocessors(void) { int a=1; #ifdef WKFTHREADS #if defined(__APPLE__) #if 1 int rc; int mib[2]; u_int miblen; size_t alen = sizeof(a); mib[0] = CTL_HW; mib[1] = HW_AVAILCPU; miblen = 2; rc = sysctl(mib, miblen, &a, &alen, NULL, 0); /**< Number of active CPUs */ if (rc < 0) { perror("Error during sysctl() query for CPU count"); a = 1; } #else a = MPProcessorsScheduled(); /**< Number of active/running CPUs */ #endif #endif #ifdef _MSC_VER struct _SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); a = sysinfo.dwNumberOfProcessors; /**< total number of CPUs */ #endif /* _MSC_VER */ #if defined(__PARAGON__) a=2; /**< Threads-capable Paragons have 2 CPUs for computation */ #endif /* __PARAGON__ */ #if defined(_CRAY) a = sysconf(_SC_CRAY_NCPU); /**< total number of CPUs */ #endif #if defined(ANDROID) || defined(USEPHYSCPUCOUNT) /* Android devices and the NVIDIA/SECO "CARMA" and "Kayla" */ /* boards toggles cores on/off according to system activity, */ /* thermal management, and battery state. For now, we will */ /* use as many threads as the number of physical cores since */ /* the number that are online may vary even over a 2 second */ /* time window. We will likely have this issue on many more */ /* platforms as power management becomes more important... */ /* use sysconf() for initial guess, although it produces incorrect */ /* results on the older android releases due to a bug in the platform */ a = sysconf(_SC_NPROCESSORS_CONF); /**< Number of physical CPU cores */ /* check CPU count by parsing /sys/devices/system/cpu/present and use */ /* whichever result gives the larger CPU count... */ { int rc=0, b=1, i=-1, j=-1; FILE *ifp; ifp = fopen("/sys/devices/system/cpu/present", "r"); if (ifp != NULL) { rc = fscanf(ifp, "%d-%d", &i, &j); /* read and interpret line */ fclose(ifp); if (rc == 2 && i == 0) { b = j+1; /* 2 or more cores exist */ } } /* return the greater CPU count result... */ a = (a > b) ? a : b; } #else #if defined(__sun) || defined(__linux) || defined(__osf__) || defined(_AIX) a = sysconf(_SC_NPROCESSORS_ONLN); /**< Number of active/running CPUs */ #endif /* SunOS, and similar... */ #endif /* Android */ #if defined(__irix) a = sysconf(_SC_NPROC_ONLN); /**< Number of active/running CPUs */ #endif /* IRIX */ #if defined(__hpux) a = mpctl(MPC_GETNUMSPUS, 0, 0); /**< total number of CPUs */ #endif /* HPUX */ #endif /* WKFTHREADS */ return a; } int wkf_thread_numprocessors(void) { int a=1; #ifdef WKFTHREADS /* Allow the user to override the number of CPUs for use */ /* in scalability testing, debugging, etc. */ char *forcecount = getenv("WKFFORCECPUCOUNT"); /* VMD specific env variable */ if (forcecount == NULL) forcecount = getenv("VMDFORCECPUCOUNT"); if (forcecount != NULL) { if (sscanf(forcecount, "%d", &a) == 1) { return a; /* if we got a valid count, return it */ } else { a=1; /* otherwise use the real available hardware CPU count */ } } /* otherwise return the number of physical processors currently available */ a = wkf_thread_numphysprocessors(); /* XXX we should add checking for the current CPU affinity masks here, */ /* and return the min of the physical processor count and CPU affinity */ /* mask enabled CPU count. */ #endif /* WKFTHREADS */ return a; } int wkf_cpu_capability_flags(wkf_cpu_caps_t *cpucaps) { int flags=CPU_UNKNOWN; #if defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 1300) flags = 0; flags |= _may_i_use_cpu_feature(_FEATURE_SSE2) * CPU_SSE2; flags |= _may_i_use_cpu_feature(_FEATURE_AVX) * CPU_AVX; flags |= _may_i_use_cpu_feature(_FEATURE_AVX2) * CPU_AVX2; flags |= _may_i_use_cpu_feature(_FEATURE_FMA) * CPU_FMA; flags |= _may_i_use_cpu_feature(_FEATURE_AVX512F) * CPU_AVX512F; flags |= _may_i_use_cpu_feature(_FEATURE_AVX512CD) * CPU_AVX512CD; flags |= _may_i_use_cpu_feature(_FEATURE_AVX512ER) * CPU_AVX512ER; flags |= _may_i_use_cpu_feature(_FEATURE_AVX512PF) * CPU_AVX512PF; #endif cpucaps->flags = flags; if (flags == CPU_UNKNOWN) return 1; return 0; } int * wkf_cpu_affinitylist(int *cpuaffinitycount) { int *affinitylist = NULL; *cpuaffinitycount = -1; /* return count -1 if unimplemented or err occurs */ /* Win32 process affinity mask query */ #if 0 && defined(_MSC_VER) /* XXX untested, but based on the linux code, may work with a few tweaks */ HANDLE myproc = GetCurrentProcess(); /* returns a psuedo-handle */ DWORD affinitymask, sysaffinitymask; if (!GetProcessAffinityMask(myproc, &affinitymask, &sysaffinitymask)) { /* count length of affinity list */ int affinitycount=0; int i; for (i=0; i<31; i++) { affinitycount += (affinitymask >> i) & 0x1; } /* build affinity list */ if (affinitycount > 0) { affinitylist = (int *) malloc(affinitycount * sizeof(int)); if (affinitylist == NULL) return NULL; int curcount = 0; for (i=0; i 0) { affinitylist = (int *) malloc(affinitycount * sizeof(int)); if (affinitylist == NULL) return NULL; int curcount = 0; for (i=0; i= 0x0400 */ status = (!(TryEnterCriticalSection(mp))); #endif #endif /* _MSC_VER */ #ifdef USEPOSIXTHREADS status = (pthread_mutex_lock(mp) != 0); #endif /* USEPOSIXTHREADS */ #endif /* WKFTHREADS */ return status; } int wkf_mutex_spin_lock(wkf_mutex_t * mp) { int status=0; #ifdef WKFTHREADS #ifdef _MSC_VER #if defined(WKFUSENEWWIN32APIS) /* TryEnterCriticalSection() is only available on newer */ /* versions of Win32: _WIN32_WINNT/WINVER >= 0x0400 */ while (!TryEnterCriticalSection(mp)); #else EnterCriticalSection(mp); #endif #endif /* _MSC_VER */ #ifdef USEPOSIXTHREADS while ((status = pthread_mutex_trylock(mp)) != 0); #endif /* USEPOSIXTHREADS */ #endif /* WKFTHREADS */ return status; } int wkf_mutex_unlock(wkf_mutex_t * mp) { int status=0; #ifdef WKFTHREADS #ifdef _MSC_VER LeaveCriticalSection(mp); #endif /* _MSC_VER */ #ifdef USEPOSIXTHREADS status = pthread_mutex_unlock(mp); #endif /* USEPOSIXTHREADS */ #ifdef USEUITHREADS status = mutex_unlock(mp); #endif /* USEUITHREADS */ #endif /* WKFTHREADS */ return status; } int wkf_mutex_destroy(wkf_mutex_t * mp) { int status=0; #ifdef WKFTHREADS #ifdef _MSC_VER DeleteCriticalSection(mp); #endif /* _MSC_VER */ #ifdef USEPOSIXTHREADS status = pthread_mutex_destroy(mp); #endif /* USEPOSIXTHREADS */ #ifdef USEUITHREADS status = mutex_destroy(mp); #endif /* USEUITHREADS */ #endif /* WKFTHREADS */ return status; } /* * Condition variables */ int wkf_cond_init(wkf_cond_t * cvp) { int status=0; #ifdef WKFTHREADS #ifdef _MSC_VER #if defined(WKFUSEWIN2008CONDVARS) InitializeConditionVariable(cvp); #else /* XXX not implemented */ cvp->waiters = 0; /* Create an auto-reset event. */ cvp->events[WKF_COND_SIGNAL] = CreateEvent(NULL, /* no security */ FALSE, /* auto-reset event */ FALSE, /* non-signaled initially */ NULL); /* unnamed */ /* Create a manual-reset event. */ cvp->events[WKF_COND_BROADCAST] = CreateEvent(NULL, /* no security */ TRUE, /* manual-reset */ FALSE, /* non-signaled initially */ NULL); /* unnamed */ #endif #endif /* _MSC_VER */ #ifdef USEPOSIXTHREADS status = pthread_cond_init(cvp, NULL); #endif /* USEPOSIXTHREADS */ #ifdef USEUITHREADS status = cond_init(cvp, USYNC_THREAD, NULL); #endif #endif /* WKFTHREADS */ return status; } int wkf_cond_destroy(wkf_cond_t * cvp) { int status=0; #ifdef WKFTHREADS #ifdef _MSC_VER #if defined(WKFUSEWIN2008CONDVARS) /* XXX not implemented */ #else CloseHandle(cvp->events[WKF_COND_SIGNAL]); CloseHandle(cvp->events[WKF_COND_BROADCAST]); #endif #endif /* _MSC_VER */ #ifdef USEPOSIXTHREADS status = pthread_cond_destroy(cvp); #endif /* USEPOSIXTHREADS */ #ifdef USEUITHREADS status = cond_destroy(cvp); #endif #endif /* WKFTHREADS */ return status; } int wkf_cond_wait(wkf_cond_t * cvp, wkf_mutex_t * mp) { int status=0; #if defined(WKFTHREADS) && defined(_MSC_VER) int result=0; LONG last_waiter; LONG my_waiter; #endif #ifdef WKFTHREADS #ifdef _MSC_VER #if defined(WKFUSEWIN2008CONDVARS) SleepConditionVariableCS(cvp, mp, INFINITE) #else #if !defined(WKFUSEINTERLOCKEDATOMICOPS) EnterCriticalSection(&cvp->waiters_lock); cvp->waiters++; LeaveCriticalSection(&cvp->waiters_lock); #else InterlockedIncrement(&cvp->waiters); #endif LeaveCriticalSection(mp); /* SetEvent() keeps state, avoids lost wakeup */ /* Wait either a single or broadcast even to become signalled */ result = WaitForMultipleObjects(2, cvp->events, FALSE, INFINITE); #if !defined(WKFUSEINTERLOCKEDATOMICOPS) EnterCriticalSection (&cvp->waiters_lock); cvp->waiters--; last_waiter = ((result == (WAIT_OBJECT_0 + WKF_COND_BROADCAST)) && cvp->waiters == 0); LeaveCriticalSection (&cvp->waiters_lock); #else my_waiter = InterlockedDecrement(&cvp->waiters); last_waiter = ((result == (WAIT_OBJECT_0 + WKF_COND_BROADCAST)) && my_waiter == 0); #endif /* Some thread called cond_broadcast() */ if (last_waiter) /* We're the last waiter to be notified or to stop waiting, so */ /* reset the manual event. */ ResetEvent(cvp->events[WKF_COND_BROADCAST]); EnterCriticalSection(mp); #endif #endif /* _MSC_VER */ #ifdef USEPOSIXTHREADS status = pthread_cond_wait(cvp, mp); #endif /* USEPOSIXTHREADS */ #ifdef USEUITHREADS status = cond_wait(cvp, mp); #endif #endif /* WKFTHREADS */ return status; } int wkf_cond_signal(wkf_cond_t * cvp) { int status=0; #ifdef WKFTHREADS #ifdef _MSC_VER #if defined(WKFUSEWIN2008CONDVARS) WakeConditionVariable(cvp); #else #if !defined(WKFUSEINTERLOCKEDATOMICOPS) EnterCriticalSection(&cvp->waiters_lock); int have_waiters = (cvp->waiters > 0); LeaveCriticalSection(&cvp->waiters_lock); if (have_waiters) SetEvent (cvp->events[WKF_COND_SIGNAL]); #else if (InterlockedExchangeAdd(&cvp->waiters, 0) > 0) SetEvent(cvp->events[WKF_COND_SIGNAL]); #endif #endif #endif /* _MSC_VER */ #ifdef USEPOSIXTHREADS status = pthread_cond_signal(cvp); #endif /* USEPOSIXTHREADS */ #ifdef USEUITHREADS status = cond_signal(cvp); #endif #endif /* WKFTHREADS */ return status; } int wkf_cond_broadcast(wkf_cond_t * cvp) { int status=0; #ifdef WKFTHREADS #ifdef _MSC_VER #if defined(WKFUSEWIN2008CONDVARS) WakeAllConditionVariable(cvp); #else #if !defined(WKFUSEINTERLOCKEDATOMICOPS) EnterCriticalSection(&cvp->waiters_lock); int have_waiters = (cvp->waiters > 0); LeaveCriticalSection(&cvp->waiters_lock); if (have_waiters) SetEvent(cvp->events[WKF_COND_BROADCAST]); #else if (InterlockedExchangeAdd(&cvp->waiters, 0) > 0) SetEvent(cvp->events[WKF_COND_BROADCAST]); #endif #endif #endif /* _MSC_VER */ #ifdef USEPOSIXTHREADS status = pthread_cond_broadcast(cvp); #endif /* USEPOSIXTHREADS */ #ifdef USEUITHREADS status = cond_broadcast(cvp); #endif #endif /* WKFTHREADS */ return status; } /* * Atomic integer ops -- Ideally implemented by fast machine instruction * fetch-and-add operations. Worst-case implementation * based on mutex locks and math ops if no other choice. */ int wkf_atomic_int_init(wkf_atomic_int_t * atomp, int val) { memset(atomp, 0, sizeof(wkf_atomic_int_t)); #ifdef WKFTHREADS #if defined(USEGCCATOMICS) /* nothing to do here */ #elif defined(USENETBSDATOMICS) /* nothing to do here */ #elif defined(USESOLARISATOMICS) /* nothing to do here */ #elif defined(USEWIN32ATOMICS) /* nothing to do here */ #else /* use mutexes */ wkf_mutex_init(&atomp->lock); #endif #else /* nothing to do for non-threaded builds */ #endif atomp->val = val; return 0; } int wkf_atomic_int_destroy(wkf_atomic_int_t * atomp) { #ifdef WKFTHREADS #if defined(USEGCCATOMICS) /* nothing to do here */ #elif defined(USENETBSDATOMICS) /* nothing to do here */ #elif defined(USESOLARISATOMICS) /* nothing to do here */ #elif defined(USEWIN32ATOMICS) /* nothing to do here */ #else /* use mutexes */ wkf_mutex_destroy(&atomp->lock); #endif #else /* nothing to do for non-threaded builds */ #endif return 0; } int wkf_atomic_int_set(wkf_atomic_int_t * atomp, int val) { int retval; #ifdef WKFTHREADS #if defined(USEGCCATOMICS) /* nothing special to do here? */ atomp->val = val; retval = val; #elif defined(USENETBSDATOMICS) /* nothing special to do here? */ atomp->val = val; retval = val; #elif defined(USESOLARISATOMICS) /* nothing special to do here? */ atomp->val = val; retval = val; #elif defined(USEWIN32ATOMICS) /* nothing special to do here? */ atomp->val = val; retval = val; #else /* use mutexes */ wkf_mutex_lock(&atomp->lock); atomp->val = val; retval = atomp->val; wkf_mutex_unlock(&atomp->lock); #endif #else /* nothing special to do here */ atomp->val = val; retval = atomp->val; #endif return retval; } int wkf_atomic_int_get(wkf_atomic_int_t * atomp) { int retval; #ifdef WKFTHREADS #if defined(USEGCCATOMICS) /* nothing special to do here? */ retval = atomp->val; #elif defined(USENETBSDATOMICS) /* nothing special to do here? */ retval = atomp->val; #elif defined(USESOLARISATOMICS) /* nothing special to do here? */ retval = atomp->val; #elif defined(USEWIN32ATOMICS) /* nothing special to do here? */ retval = atomp->val; #else /* use mutexes */ wkf_mutex_lock(&atomp->lock); retval = atomp->val; wkf_mutex_unlock(&atomp->lock); #endif #else /* nothing special to do here */ retval = atomp->val; #endif return retval; } int wkf_atomic_int_fetch_and_add(wkf_atomic_int_t * atomp, int inc) { #ifdef WKFTHREADS #if defined(USEGCCATOMICS) return __sync_fetch_and_add(&atomp->val, inc); #elif defined(USENETBSDATOMICS) /* value returned is the new value, so we have to subtract it off again */ return atomic_add_int_nv(&atomp->val, inc) - inc; #elif defined(USESOLARISATOMICS) /* value returned is the new value, so we have to subtract it off again */ return atomic_add_int_nv(&atomp->val, inc) - inc; #elif defined(USEWIN32ATOMICS) return InterlockedExchangeAdd(&atomp->val, inc); #else /* use mutexes */ int retval; wkf_mutex_lock(&atomp->lock); retval = atomp->val; atomp->val+=inc; wkf_mutex_unlock(&atomp->lock); return retval; #endif #else int retval = atomp->val; atomp->val+=inc; return retval; #endif } int wkf_atomic_int_add_and_fetch(wkf_atomic_int_t * atomp, int inc) { #ifdef WKFTHREADS #if defined(USEGCCATOMICS) return __sync_add_and_fetch(&atomp->val, inc); #elif defined(USENETBSDATOMICS) return atomic_add_int_nv(&atomp->val, inc); #elif defined(USESOLARISATOMICS) return atomic_add_int_nv(&atomp->val, inc); #elif defined(USEWIN32ATOMICS) /* value returned is the old value, so we have to add it on again */ return InterlockedExchangeAdd(&atomp->val, inc) + inc; #else /* use mutexes */ int retval; wkf_mutex_lock(&atomp->lock); atomp->val+=inc; retval = atomp->val; wkf_mutex_unlock(&atomp->lock); return retval; #endif #else int retval; atomp->val+=inc; retval = atomp->val; return retval; #endif } /* * Reader/Writer locks -- slower than mutexes but good for some purposes */ int wkf_rwlock_init(wkf_rwlock_t * rwp) { int status=0; #ifdef WKFTHREADS #ifdef _MSC_VER wkf_mutex_init(&rwp->lock); wkf_cond_init(&rwp->rdrs_ok); wkf_cond_init(&rwp->wrtr_ok); rwp->rwlock = 0; rwp->waiting_writers = 0; #endif #ifdef USEPOSIXTHREADS pthread_mutex_init(&rwp->lock, NULL); pthread_cond_init(&rwp->rdrs_ok, NULL); pthread_cond_init(&rwp->wrtr_ok, NULL); rwp->rwlock = 0; rwp->waiting_writers = 0; #endif /* USEPOSIXTHREADS */ #ifdef USEUITHREADS status = rwlock_init(rwp, USYNC_THREAD, NULL); #endif /* USEUITHREADS */ #endif /* WKFTHREADS */ return status; } int wkf_rwlock_readlock(wkf_rwlock_t * rwp) { int status=0; #ifdef WKFTHREADS #ifdef _MSC_VER wkf_mutex_lock(&rwp->lock); while (rwp->rwlock < 0 || rwp->waiting_writers) wkf_cond_wait(&rwp->rdrs_ok, &rwp->lock); rwp->rwlock++; /* increment number of readers holding the lock */ wkf_mutex_unlock(&rwp->lock); #endif #ifdef USEPOSIXTHREADS pthread_mutex_lock(&rwp->lock); while (rwp->rwlock < 0 || rwp->waiting_writers) pthread_cond_wait(&rwp->rdrs_ok, &rwp->lock); rwp->rwlock++; /* increment number of readers holding the lock */ pthread_mutex_unlock(&rwp->lock); #endif /* USEPOSIXTHREADS */ #ifdef USEUITHREADS status = rw_rdlock(rwp); #endif /* USEUITHREADS */ #endif /* WKFTHREADS */ return status; } int wkf_rwlock_writelock(wkf_rwlock_t * rwp) { int status=0; #ifdef WKFTHREADS #ifdef _MSC_VER wkf_mutex_lock(&rwp->lock); while (rwp->rwlock != 0) { rwp->waiting_writers++; wkf_cond_wait(&rwp->wrtr_ok, &rwp->lock); rwp->waiting_writers--; } rwp->rwlock=-1; wkf_mutex_unlock(&rwp->lock); #endif #ifdef USEPOSIXTHREADS pthread_mutex_lock(&rwp->lock); while (rwp->rwlock != 0) { rwp->waiting_writers++; pthread_cond_wait(&rwp->wrtr_ok, &rwp->lock); rwp->waiting_writers--; } rwp->rwlock=-1; pthread_mutex_unlock(&rwp->lock); #endif /* USEPOSIXTHREADS */ #ifdef USEUITHREADS status = rw_wrlock(rwp); #endif /* USEUITHREADS */ #endif /* WKFTHREADS */ return status; } int wkf_rwlock_unlock(wkf_rwlock_t * rwp) { int status=0; #ifdef WKFTHREADS #ifdef _MSC_VER int ww, wr; wkf_mutex_lock(&rwp->lock); if (rwp->rwlock > 0) { rwp->rwlock--; } else { rwp->rwlock = 0; } ww = (rwp->waiting_writers && rwp->rwlock == 0); wr = (rwp->waiting_writers == 0); wkf_mutex_unlock(&rwp->lock); if (ww) wkf_cond_signal(&rwp->wrtr_ok); else if (wr) wkf_cond_signal(&rwp->rdrs_ok); #endif #ifdef USEPOSIXTHREADS int ww, wr; pthread_mutex_lock(&rwp->lock); if (rwp->rwlock > 0) { rwp->rwlock--; } else { rwp->rwlock = 0; } ww = (rwp->waiting_writers && rwp->rwlock == 0); wr = (rwp->waiting_writers == 0); pthread_mutex_unlock(&rwp->lock); if (ww) pthread_cond_signal(&rwp->wrtr_ok); else if (wr) pthread_cond_signal(&rwp->rdrs_ok); #endif /* USEPOSIXTHREADS */ #ifdef USEUITHREADS status = rw_unlock(rwp); #endif /* USEUITHREADS */ #endif /* WKFTHREADS */ return status; } /* * Simple counting barrier primitive */ wkf_barrier_t * wkf_thread_barrier_init(int n_clients) { wkf_barrier_t *barrier = (wkf_barrier_t *) malloc(sizeof(wkf_barrier_t)); #ifdef WKFTHREADS if (barrier != NULL) { barrier->n_clients = n_clients; barrier->n_waiting = 0; barrier->phase = 0; barrier->sum = 0; wkf_mutex_init(&barrier->lock); wkf_cond_init(&barrier->wait_cv); } #endif return barrier; } /* When rendering in the CAVE we use a special synchronization */ /* mode so that shared memory mutexes and condition variables */ /* will work correctly when accessed from multiple processes. */ /* Inter-process synchronization involves the kernel to a greater */ /* degree, so these barriers are substantially more costly to use */ /* than the ones designed for use within a single-process. */ int wkf_thread_barrier_init_proc_shared(wkf_barrier_t *barrier, int n_clients) { #ifdef WKFTHREADS #ifdef USEPOSIXTHREADS if (barrier != NULL) { barrier->n_clients = n_clients; barrier->n_waiting = 0; barrier->phase = 0; barrier->sum = 0; pthread_mutexattr_t mattr; pthread_condattr_t cattr; printf("Setting barriers to have system scope...\n"); pthread_mutexattr_init(&mattr); if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) { printf("WARNING: could not set mutex to process shared scope\n"); } pthread_condattr_init(&cattr); if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED) != 0) { printf("WARNING: could not set mutex to process shared scope\n"); } pthread_mutex_init(&barrier->lock, &mattr); pthread_cond_init(&barrier->wait_cv, &cattr); pthread_condattr_destroy(&cattr); pthread_mutexattr_destroy(&mattr); } #endif #endif return 0; } void wkf_thread_barrier_destroy(wkf_barrier_t *barrier) { #ifdef WKFTHREADS wkf_mutex_destroy(&barrier->lock); wkf_cond_destroy(&barrier->wait_cv); #endif free(barrier); } int wkf_thread_barrier(wkf_barrier_t *barrier, int increment) { #ifdef WKFTHREADS int my_phase; int my_result; wkf_mutex_lock(&barrier->lock); my_phase = barrier->phase; barrier->sum += increment; barrier->n_waiting++; if (barrier->n_waiting == barrier->n_clients) { barrier->result = barrier->sum; barrier->sum = 0; barrier->n_waiting = 0; barrier->phase = 1 - my_phase; wkf_cond_broadcast(&barrier->wait_cv); } while (barrier->phase == my_phase) { wkf_cond_wait(&barrier->wait_cv, &barrier->lock); } my_result = barrier->result; wkf_mutex_unlock(&barrier->lock); return my_result; #else return 0; #endif } /* * Barriers used for sleepable thread pools */ /* symmetric run barrier for use within a single process */ int wkf_thread_run_barrier_init(wkf_run_barrier_t *barrier, int n_clients) { #ifdef WKFTHREADS if (barrier != NULL) { barrier->n_clients = n_clients; barrier->n_waiting = 0; barrier->phase = 0; barrier->fctn = NULL; wkf_mutex_init(&barrier->lock); wkf_cond_init(&barrier->wait_cv); } #endif return 0; } void wkf_thread_run_barrier_destroy(wkf_run_barrier_t *barrier) { #ifdef WKFTHREADS wkf_mutex_destroy(&barrier->lock); wkf_cond_destroy(&barrier->wait_cv); #endif } /** * Wait until all threads reach barrier, and return the function * pointer passed in by the master thread. */ void * (*wkf_thread_run_barrier(wkf_run_barrier_t *barrier, void * fctn(void*), void * parms, void **rsltparms))(void *) { #if defined(WKFTHREADS) int my_phase; void * (*my_result)(void*); wkf_mutex_lock(&barrier->lock); my_phase = barrier->phase; if (fctn != NULL) barrier->fctn = fctn; if (parms != NULL) barrier->parms = parms; barrier->n_waiting++; if (barrier->n_waiting == barrier->n_clients) { barrier->rslt = barrier->fctn; barrier->rsltparms = barrier->parms; barrier->fctn = NULL; barrier->parms = NULL; barrier->n_waiting = 0; barrier->phase = 1 - my_phase; wkf_cond_broadcast(&barrier->wait_cv); } while (barrier->phase == my_phase) { wkf_cond_wait(&barrier->wait_cv, &barrier->lock); } my_result = barrier->rslt; if (rsltparms != NULL) *rsltparms = barrier->rsltparms; wkf_mutex_unlock(&barrier->lock); #else void * (*my_result)(void*) = fctn; if (rsltparms != NULL) *rsltparms = parms; #endif return my_result; } /** non-blocking poll to see if peers are already at the barrier */ int wkf_thread_run_barrier_poll(wkf_run_barrier_t *barrier) { int rc=0; #if defined(WKFTHREADS) wkf_mutex_lock(&barrier->lock); if (barrier->n_waiting == (barrier->n_clients-1)) { rc=1; } wkf_mutex_unlock(&barrier->lock); #endif return rc; } /* * task tile stack */ int wkf_tilestack_init(wkf_tilestack_t *s, int size) { if (s == NULL) return -1; #if defined(WKFTHREADS) wkf_mutex_init(&s->mtx); #endif s->growthrate = 512; s->top = -1; if (size > 0) { s->size = size; s->s = (wkf_tasktile_t *) malloc(s->size * sizeof(wkf_tasktile_t)); } else { s->size = 0; s->s = NULL; } return 0; } void wkf_tilestack_destroy(wkf_tilestack_t *s) { #if defined(WKFTHREADS) wkf_mutex_destroy(&s->mtx); #endif free(s->s); s->s = NULL; /* prevent access after free */ } int wkf_tilestack_compact(wkf_tilestack_t *s) { #if defined(WKFTHREADS) wkf_mutex_lock(&s->mtx); #endif if (s->size > (s->top + 1)) { int newsize = s->top + 1; wkf_tasktile_t *tmp = (wkf_tasktile_t *) realloc(s->s, newsize * sizeof(wkf_tasktile_t)); if (tmp == NULL) { #if defined(WKFTHREADS) wkf_mutex_unlock(&s->mtx); #endif return -1; /* out of space! */ } s->s = tmp; s->size = newsize; } #if defined(WKFTHREADS) wkf_mutex_unlock(&s->mtx); #endif return 0; } int wkf_tilestack_push(wkf_tilestack_t *s, const wkf_tasktile_t *t) { #if defined(WKFTHREADS) wkf_mutex_lock(&s->mtx); #endif s->top++; if (s->top >= s->size) { int newsize = s->size + s->growthrate; wkf_tasktile_t *tmp = (wkf_tasktile_t *) realloc(s->s, newsize * sizeof(wkf_tasktile_t)); if (tmp == NULL) { s->top--; #if defined(WKFTHREADS) wkf_mutex_unlock(&s->mtx); #endif return -1; /* out of space! */ } s->s = tmp; s->size = newsize; } s->s[s->top] = *t; /* push onto the stack */ #if defined(WKFTHREADS) wkf_mutex_unlock(&s->mtx); #endif return 0; } int wkf_tilestack_pop(wkf_tilestack_t *s, wkf_tasktile_t *t) { #if defined(WKFTHREADS) wkf_mutex_lock(&s->mtx); #endif if (s->top < 0) { #if defined(WKFTHREADS) wkf_mutex_unlock(&s->mtx); #endif return WKF_TILESTACK_EMPTY; /* empty stack */ } *t = s->s[s->top]; s->top--; #if defined(WKFTHREADS) wkf_mutex_unlock(&s->mtx); #endif return 0; } int wkf_tilestack_popall(wkf_tilestack_t *s) { #if defined(WKFTHREADS) wkf_mutex_lock(&s->mtx); #endif s->top = -1; #if defined(WKFTHREADS) wkf_mutex_unlock(&s->mtx); #endif return 0; } int wkf_tilestack_empty(wkf_tilestack_t *s) { #if defined(WKFTHREADS) wkf_mutex_lock(&s->mtx); #endif if (s->top < 0) { #if defined(WKFTHREADS) wkf_mutex_unlock(&s->mtx); #endif return 1; } #if defined(WKFTHREADS) wkf_mutex_unlock(&s->mtx); #endif return 0; } /* * shared iterators */ /** initialize a shared iterator */ int wkf_shared_iterator_init(wkf_shared_iterator_t *it) { memset(it, 0, sizeof(wkf_shared_iterator_t)); #if defined(WKFTHREADS) wkf_mutex_init(&it->mtx); #endif return 0; } /** destroy a shared iterator */ int wkf_shared_iterator_destroy(wkf_shared_iterator_t *it) { #if defined(WKFTHREADS) wkf_mutex_destroy(&it->mtx); #endif return 0; } /** set shared iterator parameters */ int wkf_shared_iterator_set(wkf_shared_iterator_t *it, wkf_tasktile_t *tile) { #if defined(WKFTHREADS) wkf_mutex_lock(&it->mtx); #endif it->start = tile->start; it->current = tile->start; it->end = tile->end; it->fatalerror = 0; #if defined(WKFTHREADS) wkf_mutex_unlock(&it->mtx); #endif return 0; } /** iterate the shared iterator, over a requested half-open interval */ int wkf_shared_iterator_next_tile(wkf_shared_iterator_t *it, int reqsize, wkf_tasktile_t *tile) { int rc=WKF_SCHED_CONTINUE; #if defined(WKFTHREADS) wkf_mutex_spin_lock(&it->mtx); #endif if (!it->fatalerror) { tile->start=it->current; /* set start to the current work unit */ it->current+=reqsize; /* increment by the requested tile size */ tile->end=it->current; /* set the (exclusive) endpoint */ /* if start is beyond the last work unit, we're done */ if (tile->start >= it->end) { tile->start=0; tile->end=0; rc = WKF_SCHED_DONE; } /* if the endpoint (exclusive) for the requested tile size */ /* is beyond the last work unit, roll it back as needed */ if (tile->end > it->end) { tile->end = it->end; } } else { rc = WKF_SCHED_DONE; } #if defined(WKFTHREADS) wkf_mutex_unlock(&it->mtx); #endif return rc; } /** worker thread calls this to indicate a fatal error */ int wkf_shared_iterator_setfatalerror(wkf_shared_iterator_t *it) { #if defined(WKFTHREADS) wkf_mutex_spin_lock(&it->mtx); #endif it->fatalerror=1; #if defined(WKFTHREADS) wkf_mutex_unlock(&it->mtx); #endif return 0; } /** master thread calls this to query for fatal errors */ int wkf_shared_iterator_getfatalerror(wkf_shared_iterator_t *it) { int rc=0; #if defined(WKFTHREADS) wkf_mutex_lock(&it->mtx); #endif if (it->fatalerror) rc = -1; #if defined(WKFTHREADS) wkf_mutex_unlock(&it->mtx); #endif return rc; } #if defined(WKFTHREADS) /* * Thread pool. */ static void * wkf_threadpool_workerproc(void *voidparms) { void *(*fctn)(void*); wkf_threadpool_workerdata_t *workerdata = (wkf_threadpool_workerdata_t *) voidparms; wkf_threadpool_t *thrpool = (wkf_threadpool_t *) workerdata->thrpool; while ((fctn = wkf_thread_run_barrier(&thrpool->runbar, NULL, NULL, &workerdata->parms)) != NULL) { (*fctn)(workerdata); } return NULL; } static void * wkf_threadpool_workersync(void *voidparms) { return NULL; } #endif wkf_threadpool_t * wkf_threadpool_create(int workercount, int *devlist) { int i; wkf_threadpool_t *thrpool = NULL; thrpool = (wkf_threadpool_t *) malloc(sizeof(wkf_threadpool_t)); if (thrpool == NULL) return NULL; memset(thrpool, 0, sizeof(wkf_threadpool_t)); #if !defined(WKFTHREADS) workercount=1; #endif /* if caller provides a device list, use it, otherwise we assume */ /* all workers are CPU cores */ thrpool->devlist = (int *) malloc(sizeof(int) * workercount); if (devlist == NULL) { for (i=0; idevlist[i] = -1; /* mark as a CPU core */ } else { memcpy(thrpool->devlist, devlist, sizeof(int) * workercount); } /* initialize shared iterator */ wkf_shared_iterator_init(&thrpool->iter); /* initialize tile stack for error handling */ wkf_tilestack_init(&thrpool->errorstack, 64); /* create a run barrier with N+1 threads: N workers, 1 master */ thrpool->workercount = workercount; wkf_thread_run_barrier_init(&thrpool->runbar, workercount+1); /* allocate and initialize thread pool */ thrpool->threads = (wkf_thread_t *) malloc(sizeof(wkf_thread_t) * workercount); thrpool->workerdata = (wkf_threadpool_workerdata_t *) malloc(sizeof(wkf_threadpool_workerdata_t) * workercount); memset(thrpool->workerdata, 0, sizeof(wkf_threadpool_workerdata_t) * workercount); /* setup per-worker data */ for (i=0; iworkerdata[i].iter=&thrpool->iter; thrpool->workerdata[i].errorstack=&thrpool->errorstack; thrpool->workerdata[i].threadid=i; thrpool->workerdata[i].threadcount=workercount; thrpool->workerdata[i].devid=thrpool->devlist[i]; thrpool->workerdata[i].devspeed=1.0f; /* must be reset by dev setup code */ thrpool->workerdata[i].thrpool=thrpool; } #if defined(WKFTHREADS) /* launch thread pool */ for (i=0; ithreads[i], wkf_threadpool_workerproc, &thrpool->workerdata[i]); } #endif return thrpool; } int wkf_threadpool_launch(wkf_threadpool_t *thrpool, void *fctn(void *), void *parms, int blocking) { if (thrpool == NULL) return -1; #if defined(WKFTHREADS) /* wake sleeping threads to run fctn(parms) */ wkf_thread_run_barrier(&thrpool->runbar, fctn, parms, NULL); if (blocking) wkf_thread_run_barrier(&thrpool->runbar, wkf_threadpool_workersync, NULL, NULL); #else thrpool->workerdata[0].parms = parms; (*fctn)(&thrpool->workerdata[0]); #endif return 0; } int wkf_threadpool_wait(wkf_threadpool_t *thrpool) { #if defined(WKFTHREADS) wkf_thread_run_barrier(&thrpool->runbar, wkf_threadpool_workersync, NULL, NULL); #endif return 0; } int wkf_threadpool_poll(wkf_threadpool_t *thrpool) { #if defined(WKFTHREADS) return wkf_thread_run_barrier_poll(&thrpool->runbar); #else return 1; #endif } int wkf_threadpool_destroy(wkf_threadpool_t *thrpool) { #if defined(WKFTHREADS) int i; #endif /* wake threads and tell them to shutdown */ wkf_thread_run_barrier(&thrpool->runbar, NULL, NULL, NULL); #if defined(WKFTHREADS) /* join the pool of worker threads */ for (i=0; iworkercount; i++) { wkf_thread_join(thrpool->threads[i], NULL); } #endif /* destroy the thread barrier */ wkf_thread_run_barrier_destroy(&thrpool->runbar); /* destroy the shared iterator */ wkf_shared_iterator_destroy(&thrpool->iter); /* destroy tile stack for error handling */ wkf_tilestack_destroy(&thrpool->errorstack); free(thrpool->devlist); free(thrpool->threads); free(thrpool->workerdata); free(thrpool); return 0; } /** return the number of worker threads currently in the pool */ int wkf_threadpool_get_workercount(wkf_threadpool_t *thrpool) { return thrpool->workercount; } /** worker thread can call this to get its ID and number of peers */ int wkf_threadpool_worker_getid(void *voiddata, int *threadid, int *threadcount) { wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata; if (threadid != NULL) *threadid = worker->threadid; if (threadcount != NULL) *threadcount = worker->threadcount; return 0; } /** worker thread can call this to get its CPU/GPU device ID */ int wkf_threadpool_worker_getdevid(void *voiddata, int *devid) { wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata; if (devid != NULL) *devid = worker->devid; return 0; } /** * worker thread calls this to set relative speed of this device * as determined by the SM/core count and clock rate * Note: this should only be called once, during the worker's * device initialization process */ int wkf_threadpool_worker_setdevspeed(void *voiddata, float speed) { wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata; worker->devspeed = speed; return 0; } /** * worker thread calls this to get relative speed of this device * as determined by the SM/core count and clock rate */ int wkf_threadpool_worker_getdevspeed(void *voiddata, float *speed) { wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata; if (speed != NULL) *speed = worker->devspeed; return 0; } /** * worker thread calls this to scale max tile size by worker speed * as determined by the SM/core count and clock rate */ int wkf_threadpool_worker_devscaletile(void *voiddata, int *tilesize) { wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata; if (tilesize != NULL) { int scaledtilesize; scaledtilesize = (int) (worker->devspeed * ((float) (*tilesize))); if (scaledtilesize < 1) scaledtilesize = 1; *tilesize = scaledtilesize; } return 0; } /** worker thread can call this to get its client data pointer */ int wkf_threadpool_worker_getdata(void *voiddata, void **clientdata) { wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata; if (clientdata != NULL) *clientdata = worker->parms; return 0; } /** Set shared iterator state to half-open interval defined by tile */ int wkf_threadpool_sched_dynamic(wkf_threadpool_t *thrpool, wkf_tasktile_t *tile) { if (thrpool == NULL) return -1; return wkf_shared_iterator_set(&thrpool->iter, tile); } /** iterate the shared iterator over the requested half-open interval */ int wkf_threadpool_next_tile(void *voidparms, int reqsize, wkf_tasktile_t *tile) { int rc; wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voidparms; rc = wkf_shared_iterator_next_tile(worker->iter, reqsize, tile); if (rc == WKF_SCHED_DONE) { /* if the error stack is empty, then we're done, otherwise pop */ /* a tile off of the error stack and retry it */ if (wkf_tilestack_pop(worker->errorstack, tile) != WKF_TILESTACK_EMPTY) return WKF_SCHED_CONTINUE; } return rc; } /** * worker thread calls this when a failure occurs on a tile it has * already taken from the scheduler */ int wkf_threadpool_tile_failed(void *voidparms, wkf_tasktile_t *tile) { wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voidparms; return wkf_tilestack_push(worker->errorstack, tile); } /* worker thread calls this to indicate that an unrecoverable error occured */ int wkf_threadpool_setfatalerror(void *voidparms) { wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voidparms; wkf_shared_iterator_setfatalerror(worker->iter); return 0; } /* worker thread calls this to indicate that an unrecoverable error occured */ int wkf_threadpool_getfatalerror(void *voidparms) { wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voidparms; /* query error status for return to caller */ return wkf_shared_iterator_getfatalerror(worker->iter); } /* launch up to numprocs threads using shared iterator as a load balancer */ int wkf_threadlaunch(int numprocs, void *clientdata, void * fctn(void *), wkf_tasktile_t *tile) { wkf_shared_iterator_t iter; wkf_threadlaunch_t *parms=NULL; wkf_thread_t * threads=NULL; int i, rc; /* XXX have to ponder what the right thing to do is here */ #if !defined(WKFTHREADS) numprocs=1; #endif /* initialize shared iterator and set the iteration and range */ wkf_shared_iterator_init(&iter); if (wkf_shared_iterator_set(&iter, tile)) return -1; /* allocate array of threads */ threads = (wkf_thread_t *) calloc(numprocs * sizeof(wkf_thread_t), 1); if (threads == NULL) return -1; /* allocate and initialize array of thread parameters */ parms = (wkf_threadlaunch_t *) malloc(numprocs * sizeof(wkf_threadlaunch_t)); if (parms == NULL) { free(threads); return -1; } for (i=0; ithreadid; if (threadcount != NULL) *threadcount = worker->threadcount; return 0; } /** worker thread can call this to get its client data pointer */ int wkf_threadlaunch_getdata(void *voidparms, void **clientdata) { wkf_threadlaunch_t *worker = (wkf_threadlaunch_t *) voidparms; if (clientdata != NULL) *clientdata = worker->clientdata; return 0; } /** iterate the shared iterator over the requested half-open interval */ int wkf_threadlaunch_next_tile(void *voidparms, int reqsize, wkf_tasktile_t *tile) { wkf_threadlaunch_t *worker = (wkf_threadlaunch_t *) voidparms; return wkf_shared_iterator_next_tile(worker->iter, reqsize, tile); } /** worker thread calls this to indicate that an unrecoverable error occured */ int wkf_threadlaunch_setfatalerror(void *voidparms) { wkf_threadlaunch_t *worker = (wkf_threadlaunch_t *) voidparms; return wkf_shared_iterator_setfatalerror(worker->iter); } #ifdef __cplusplus } #endif