class Mutex
{
public:
int acquire (void);
int release (void);
// ...
private:
pthread_mutex_t mutex_;
};
class Condition
{
public:
Condition (Mutex &m);
~Condition (void);
int wait (Mutex &m);
int wait (void);
int signal (void);
int broadcast (void);
private:
Mutex &mutex_;
pthread_cond_t cond_;
};
pthread_mutex_t
typedef from a HANDLE back to a
CRITICAL_SECTION:
typedef CRITICAL_SECTION pthread_mutex_t;
Then, we'll revise the implementation of our
pthread_cond_wait function, as follows:
int
pthread_cond_wait (pthread_cond_t *cv,
pthread_mutex_t *external_mutex)
{
// Acquire lock to avoid race conditions.
EnterCriticalSection (&cv->waiters_count_lock_);
cv->waiters_count_++;
LeaveCriticalSection (&cv->waiters_count_lock_);
// The following two function calls are used in place of
// <SignalObjectAndWait> for versions of Win32 that lack this
// function.
// Keep the lock held just long enough to increment the count of
// waiters by one. We can't keep it held across the call to
// <WaitForSingleObject> since that will deadlock other calls to
// <pthread_cond_signal> and <pthread_cond_broadcast>.
ExitCriticalSection (external_mutex);
// Wait to be awakened by a <pthread_cond_signal> or
// <pthread_cond_broadcast>.
WaitForSingleObject (cv->sema_, INFINITE);
// Reacquire lock to avoid race conditions.
EnterCriticalSection (&cv->waiters_count_lock_);
// We're no longer waiting...
cv->waiters_count_--;
// Check to see if we're the last waiter after a
// <pthread_cond_broadcast> call.
int last_waiter =
cv->was_broadcast_ && cv->waiters_count_ == 0;
LeaveCriticalSection (&cv->waiters_count_lock_);
// If we're the last waiter thread during this particular broadcast
// then let all the other threads proceed.
if (last_waiter)
SetEvent (cv->waiters_done_);
// Always regain the external mutex since that's the guarantee that
// we give to our callers.
EnterCriticalSection (external_mutex);
}
pthread_cond_broadcast
implementation waits on the waiters_done_ manual-reset
event until all waiters are finished. However, the last waiter
thread, which signals this event, may not get the chance to wait on
the external_mutex before another waiter gets it,
performs some processing, and then releases and immediately reacquires
the external_mutex. To implement a portable ``fair''
condition variable without using SignalObjectAndWait
requires yet another approach, which is described next.
This approach uses the Specific Notification pattern
[Cargill].
The
typedef struct
{
std::dequepthread_cond_init initializes this implementation:
int
pthread_cond_init (pthread_cond_t *cv,
const pthread_condattr_t *)
{
broadcast_ = CreateSemaphore (NULL, // no security
0, // initially 0
0x7fffffff, // max
NULL); // unnamed
mutex_ = CreateMutex (NULL, // no security
FALSE, // unsignaled initially
NULL); // unnamed
}
int
pthread_cond_destroy (pthread_cond_t *cv)
{
}
int
pthread_cond_wait (pthread_cond_t *cv,
pthread_mutex_t *external_mutex)
{
}
int
pthread_cond_signal (pthread_cond_t *cv)
{
}
int
pthread_cond_broadcast (pthread_cond_t *cv)
{
}
pthread_cond_broadcast wakes up all threads currently
blocked on the cv so they can recheck the condition
expression before any other thread can successfully complete the wait.
pthread_cond_wait. However, it is portable to all
versions of Win32.
event_wait (event_t *event) -- This function waits
for event to be signaled. This function emulates a call
to WaitForSingleObject where the event is
the waitable object and a timeout of INFINITE is
specified.
event_timedwait (event_t *event, const timespec
*timeout) -- This function is the same as
event_wait, except that the caller waits upto
timeout in WaitForSingleObject for the
event to become signaled.
event_signal (event_t *event) -- If the
event is a manual-reset event, it remains signaled until
it is set explicitly to the nonsignaled state by the
event_reset function. Any number of waiting threads, or
threads that subsequently begin wait operations for the specified
event object by calling one of the wait functions, can be
released while the event's state is signaled.
If the event is an auto-reset event, it remains signaled
until a single waiting thread is released, at which time the system
automatically sets the state to nonsignaled. If no threads are
waiting, the event's state remains signaled.
event_pulse (event_t *event) -- If event
is a manual-reset event, all waiting threads that can be released
immediately are released. The function then resets the
event's state to nonsignaled and returns.
If event is an auto-reset event, the function resets the
state to nonsignaled and returns after releasing a single waiting
thread, even if multiple threads are waiting.
If no threads are waiting, or if no thread can be released
immediately, event_pulse simply sets the
event's state to nonsignaled and returns.
event_reset (event_t *event) --
event_reset function is used primarily for manual-reset
events, which must be set explicitly to the nonsignaled state.
struct event_t
{
mutex_t lock_;
// Protect critical section.
cond_t condition_;
// Keeps track of waiters.
int manual_reset_;
// Specifies if this is an auto- or manual-reset event.
int is_signaled_;
// "True" if signaled.
u_long waiting_threads_;
// Number of waiting threads.
};
event_init is used to initialize an event structure:
void
event_init (event_t *event,
int manual_reset,
int initial_state,
int type,
char *name,
void *arg)
{
event->manual_reset_ = manual_reset;
event->is_signaled_ = initial_state;
event->waiting_threads_ = 0;
cond_init (&event->condition_,
type,
name,
arg);
mutex_init (&event->lock_,
type,
name,
arg);
}
event_destroy is used to destroy an event:
void
event_destroy (event_t *event)
{
mutex_destroy (&event->lock_);
cond_destroy (&event->condition_);
}
event_wait is used to wait on an event:
void
event_wait (event_t *event)
{
// grab the lock first
mutex_lock (&event->lock_);
if (event->is_signaled_ == 1)
// Event is currently signaled.
{
if (event->manual_reset_ == 0)
// AUTO: reset state
event->is_signaled_ = 0;
}
else
// event is currently not signaled
{
event->waiting_threads_++;
cond_wait (&event->condition_,
&event->lock_);
event->waiting_threads_--;
}
// Now we can let go of the lock.
mutex_unlock (&event->lock_);
}
event_signal is used to signal an event:
void
event_signal (event_t *event)
{
// grab the lock first
mutex_lock (&event->lock_);
// Manual-reset event.
if (event->manual_reset_ == 1)
{
// signal event
event->is_signaled_ = 1;
// wakeup all
cond_broadcast (&event->condition_);
}
// Auto-reset event
else
{
if (event->waiting_threads_ == 0)
// No waiters: signal event.
event->is_signaled_ = 1;
else
// Waiters: wakeup one waiter.
cond_signal (&event->condition_);
}
// Now we can let go of the lock.
mutex_unlock (&event->lock_);
}
event_pulse is used to pulse an event:
void
event_pulse (event_t *event)
{
// grab the lock first
mutex_lock (&event->lock_);
// Manual-reset event.
if (event->manual_reset_ == 1)
{
// Wakeup all waiters.
cond_broadcast (&event->condition_);
}
// Auto-reset event: wakeup one waiter.
else
{
cond_signal (&event->condition_);
// Reset event.
event->is_signaled_ = 0;
}
// Now we can let go of the lock.
mutex_unlock (&event->lock_);
}
event_reset resets an event:
void
event_reset (event_t *event)
{
// Grab the lock first.
mutex_lock (&event->lock_);
// Reset event.
event->is_signaled_ = 0;
// Now we can let go of the lock.
mutex_unlock (&event->lock_);
}
[GoF] Erich Gamma, Richard Helm, Ralph Johnson, and John Vlissides, Design Patterns: Elements of Reusable Software Architecture, Addison-Wesley, 1995.
[Pthreads] ``Threads Extension for Portable Operating Systems,'' IEEE P1003.4a, February, 1996.
[Richter] Jeffrey Richter, Advanced Windows, 1997, Microsoft Press, Redmond, WA.