OpenMPI  0.1.1
condition.h
1 /*
2  * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
3  * University Research and Technology
4  * Corporation. All rights reserved.
5  * Copyright (c) 2004-2005 The University of Tennessee and The University
6  * of Tennessee Research Foundation. All rights
7  * reserved.
8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9  * University of Stuttgart. All rights reserved.
10  * Copyright (c) 2004-2005 The Regents of the University of California.
11  * All rights reserved.
12  * Copyright (c) 2007 Los Alamos National Security, LLC. All rights
13  * reserved.
14  * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
15  * $COPYRIGHT$
16  *
17  * Additional copyrights may follow
18  *
19  * $HEADER$
20  */
21 #ifndef ORTE_CONDITION_H
22 #define ORTE_CONDITION_H
23 
24 #include "orte_config.h"
25 #ifdef HAVE_SYS_TIME_H
26 #include <sys/time.h>
27 #endif
28 #ifdef HAVE_TIME_H
29 #include <time.h>
30 #endif
31 #if OPAL_HAVE_POSIX_THREADS
32 #include <pthread.h>
33 #elif OPAL_HAVE_SOLARIS_THREADS
34 #include <thread.h>
35 #include <synch.h>
36 #endif
37 
38 #include "opal/threads/threads.h"
39 #include "opal/runtime/opal_cr.h"
40 
42 
43 BEGIN_C_DECLS
44 
45 static inline int orte_condition_wait(opal_condition_t *c, opal_mutex_t *m)
46 {
47  int rc = 0;
48  c->c_waiting++;
49 
50 /*
51 #if OPAL_HAVE_POSIX_THREADS
52  if (orte_progress_threads_enabled) {
53  rc = pthread_cond_wait(&c->c_cond, &m->m_lock_pthread);
54  } else {
55 #endif
56 */
57  if (c->c_signaled) {
58  c->c_waiting--;
60  opal_progress();
61  OPAL_CR_TEST_CHECKPOINT_READY_STALL();
62  opal_mutex_lock(m);
63  return 0;
64  }
65  while (c->c_signaled == 0) {
67  opal_progress();
68  OPAL_CR_TEST_CHECKPOINT_READY_STALL();
69  opal_mutex_lock(m);
70  }
71 /*
72 #if OPAL_HAVE_POSIX_THREADS
73  }
74 #endif
75  */
76 
77  c->c_signaled--;
78  c->c_waiting--;
79  return rc;
80 }
81 
82 #if OPAL_ENABLE_DEBUG
83 #define ORTE_CONDITION_WAIT(x, y) \
84  do { \
85  if (opal_debug_threads) { \
86  opal_output(0, "Entering condition wait for %s at %s:%d", \
87  (NULL == (x)->name) ? "NULL" : (x)->name, \
88  __FILE__, __LINE__); \
89  } \
90  orte_condition_wait((x), (y)); \
91  } while (0);
92 #else
93 #define ORTE_CONDITION_WAIT(x, y) orte_condition_wait(x, y)
94 #endif
95 
96 static inline int orte_condition_timedwait(opal_condition_t *c,
97  opal_mutex_t *m,
98  const struct timespec *abstime)
99 {
100  int rc = 0;
101  struct timeval tv;
102  struct timeval absolute;
103 
104  c->c_waiting++;
105 
106 /*
107 #if OPAL_HAVE_POSIX_THREADS
108  if (orte_progress_threads_enabled) {
109  rc = pthread_cond_timedwait(&c->c_cond, &m->m_lock_pthread, abstime);
110  } else {
111 #endif
112 */
113  absolute.tv_sec = abstime->tv_sec;
114  absolute.tv_usec = abstime->tv_nsec * 1000;
115  gettimeofday(&tv,NULL);
116  if (c->c_signaled == 0) {
117  do {
119  opal_progress();
120  gettimeofday(&tv,NULL);
121  opal_mutex_lock(m);
122  } while (c->c_signaled == 0 &&
123  (tv.tv_sec <= absolute.tv_sec ||
124  (tv.tv_sec == absolute.tv_sec && tv.tv_usec < absolute.tv_usec)));
125  }
126 /*
127 #if OPAL_HAVE_POSIX_THREADS
128  }
129 #endif
130 */
131  if (c->c_signaled != 0) c->c_signaled--;
132  c->c_waiting--;
133  return rc;
134 }
135 
136 static inline int orte_condition_signal(opal_condition_t *c)
137 {
138  if (c->c_waiting) {
139  c->c_signaled++;
140 /*
141 #if OPAL_HAVE_POSIX_THREADS
142  if (orte_progress_threads_enabled) {
143  pthread_cond_signal(&c->c_cond);
144  }
145 #endif
146 */
147  }
148  return 0;
149 }
150 
151 #if OPAL_ENABLE_DEBUG
152 #define ORTE_CONDITION_SIGNAL(x) \
153  do { \
154  if (opal_debug_threads) { \
155  opal_output(0, "Signaling condition %s at %s:%d", \
156  (NULL == (x)->name) ? "NULL" : (x)->name, \
157  __FILE__, __LINE__); \
158  } \
159  orte_condition_signal((x)); \
160  } while(0);
161 #else
162 #define ORTE_CONDITION_SIGNAL(x) orte_condition_signal(x)
163 #endif
164 
165 static inline int orte_condition_broadcast(opal_condition_t *c)
166 {
167  c->c_signaled = c->c_waiting;
168  /*
169 #if OPAL_HAVE_POSIX_THREADS
170  if (orte_progress_threads_enabled) {
171  if( 1 == c->c_waiting ) {
172  pthread_cond_signal(&c->c_cond);
173  } else {
174  pthread_cond_broadcast(&c->c_cond);
175  }
176  }
177 #endif
178  */
179  return 0;
180 }
181 
182 #if OPAL_ENABLE_DEBUG
183 #define ORTE_CONDITION_BROADCAST(x) \
184  do { \
185  if (opal_debug_threads) { \
186  opal_output(0, "Broadcasting condition %s at %s:%d", \
187  (NULL == (x)->name) ? "NULL" : (x)->name, \
188  __FILE__, __LINE__); \
189  } \
190  orte_condition_broadcast((x)); \
191  } while(0);
192 #else
193 #define ORTE_CONDITION_BROADCAST(x) orte_condition_broadcast(x)
194 #endif
195 
196 END_C_DECLS
197 
198 #endif
199 
static void opal_mutex_lock(opal_mutex_t *mutex)
Acquire a mutex.
Definition: condition.h:49
Definition: mutex_unix.h:53
Functions for multi-threaded applications using Libevent.
static void opal_mutex_unlock(opal_mutex_t *mutex)
Release a mutex.
Global params for OpenRTE.
Definition: ompi_time.h:160
OPAL_DECLSPEC void opal_progress(void)
Progress all pending events.
Definition: opal_progress.c:165
Checkpoint functionality for Open MPI.