Skip to content
Navigation Menu
{{ message }}
forked from GraphChi/graphchi-cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpthread_tools.hpp
More file actions
348 lines (298 loc) · 8.86 KB
/
Copy pathpthread_tools.hpp
File metadata and controls
348 lines (298 loc) · 8.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
#ifndef DEF_PTHREAD_TOOLS_HPP
#define DEF_PTHREAD_TOOLS_HPP
// Stolen from GraphLab
#include <cstdlib>
#include <memory.h>
#include <pthread.h>
#include <sched.h>
#include <signal.h>
#include <sys/time.h>
#include <vector>
#include <cassert>
#include <list>
#include <iostream>
#undef _POSIX_SPIN_LOCKS
#define _POSIX_SPIN_LOCKS -1
/**
* \file pthread_tools.hpp A collection of utilities for threading
*/
namespace graphchi {
/**
* \class mutex
*
* Wrapper around pthread's mutex On single core systems mutex
* should be used. On multicore systems, spinlock should be used.
*/
class mutex {
private:
// mutable not actually needed
mutable pthread_mutex_t m_mut;
public:
mutex() {
int error = pthread_mutex_init(&m_mut, NULL);
assert(!error);
}
inline void lock() const {
int error = pthread_mutex_lock( &m_mut );
assert(!error);
}
inline void unlock() const {
int error = pthread_mutex_unlock( &m_mut );
assert(!error);
}
inline bool try_lock() const {
return pthread_mutex_trylock( &m_mut ) == 0;
}
~mutex(){
int error = pthread_mutex_destroy( &m_mut );
if (error)
perror("Error: failed to destroy mutex");
assert(!error);
}
friend class conditional;
}; // End of Mutex
#if _POSIX_SPIN_LOCKS >= 0
// We should change this to use a test for posix_spin_locks eventually
// #ifdef __linux__
/**
* \class spinlock
*
* Wrapper around pthread's spinlock On single core systems mutex
* should be used. On multicore systems, spinlock should be used.
* If pthread_spinlock is not available, the spinlock will be
* typedefed to a mutex
*/
class spinlock {
private:
// mutable not actually needed
mutable pthread_spinlock_t m_spin;
public:
spinlock () {
int error = pthread_spin_init(&m_spin, PTHREAD_PROCESS_PRIVATE);
assert(!error);
}
inline void lock() const {
int error = pthread_spin_lock( &m_spin );
assert(!error);
}
inline void unlock() const {
int error = pthread_spin_unlock( &m_spin );
assert(!error);
}
inline bool try_lock() const {
return pthread_spin_trylock( &m_spin ) == 0;
}
~spinlock(){
int error = pthread_spin_destroy( &m_spin );
assert(!error);
}
friend class conditional;
}; // End of spinlock
#define SPINLOCK_SUPPORTED 1
#else
//! if spinlock not supported, it is typedef it to a mutex.
typedef mutex spinlock;
#define SPINLOCK_SUPPORTED 0
#endif
/**
* \class conditional
* Wrapper around pthread's condition variable
*/
class conditional {
private:
mutable pthread_cond_t m_cond;
public:
conditional() {
int error = pthread_cond_init(&m_cond, NULL);
assert(!error);
}
inline void wait(const mutex& mut) const {
int error = pthread_cond_wait(&m_cond, &mut.m_mut);
assert(!error);
}
inline int timedwait(const mutex& mut, int sec) const {
struct timespec timeout;
struct timeval tv;
struct timezone tz;
gettimeofday(&tv, &tz);
timeout.tv_nsec = 0;
timeout.tv_sec = tv.tv_sec + sec;
return pthread_cond_timedwait(&m_cond, &mut.m_mut, &timeout);
}
inline void signal() const {
int error = pthread_cond_signal(&m_cond);
assert(!error);
}
inline void broadcast() const {
int error = pthread_cond_broadcast(&m_cond);
assert(!error);
}
~conditional() {
int error = pthread_cond_destroy(&m_cond);
assert(!error);
}
}; // End conditional
#define atomic_xadd(P, V) __sync_fetch_and_add((P), (V))
#define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N))
#define atomic_inc(P) __sync_add_and_fetch((P), 1)
/**
* \class spinrwlock
* rwlock built around "spinning"
* source adapted from http://locklessinc.com/articles/locks/
* "Scalable Reader-Writer Synchronization for Shared-Memory Multiprocessors"
* John Mellor-Crummey and Michael Scott
*/
class spinrwlock {
union rwticket {
unsigned u;
unsigned short us;
__extension__ struct {
unsigned char write;
unsigned char read;
unsigned char users;
} s;
};
mutable bool writing;
mutable volatile rwticket l;
public:
spinrwlock() {
memset(const_cast<rwticket*>(&l), 0, sizeof(rwticket));
}
inline void writelock() const {
unsigned me = atomic_xadd(&l.u, (1<<16));
unsigned char val = me >> 16;
while (val != l.s.write) sched_yield();
writing = true;
}
inline void wrunlock() const{
rwticket t = *const_cast<rwticket*>(&l);
t.s.write++;
t.s.read++;
*(volatile unsigned short *) (&l) = t.us;
writing = false;
__asm("mfence");
}
inline void readlock() const {
unsigned me = atomic_xadd(&l.u, (1<<16));
unsigned char val = me >> 16;
while (val != l.s.read) sched_yield();
l.s.read++;
}
inline void rdunlock() const {
atomic_inc(&l.s.write);
}
inline void unlock() const {
if (!writing) rdunlock();
else wrunlock();
}
};
#undef atomic_xadd
#undef cmpxchg
#undef atomic_inc
/**
* \class rwlock
* Wrapper around pthread's rwlock
*/
class rwlock {
private:
mutable pthread_rwlock_t m_rwlock;
public:
rwlock() {
int error = pthread_rwlock_init(&m_rwlock, NULL);
assert(!error);
}
~rwlock() {
int error = pthread_rwlock_destroy(&m_rwlock);
assert(!error);
}
inline void readlock() const {
pthread_rwlock_rdlock(&m_rwlock);
//assert(!error);
}
inline void writelock() const {
pthread_rwlock_wrlock(&m_rwlock);
//assert(!error);
}
inline void unlock() const {
pthread_rwlock_unlock(&m_rwlock);
//assert(!error);
}
inline void rdunlock() const {
unlock();
}
inline void wrunlock() const {
unlock();
}
}; // End rwlock
/**
* \class barrier
* Wrapper around pthread's barrier
*/
#ifdef __linux__
/**
* \class barrier
* Wrapper around pthread's barrier
*/
class barrier {
private:
mutable pthread_barrier_t m_barrier;
public:
barrier(size_t numthreads) { pthread_barrier_init(&m_barrier, NULL, numthreads); }
~barrier() { pthread_barrier_destroy(&m_barrier); }
inline void wait() const { pthread_barrier_wait(&m_barrier); }
};
#else
/**
* \class barrier
* Wrapper around pthread's barrier
*/
class barrier {
private:
mutex m;
int needed;
int called;
conditional c;
// we need the following to protect against spurious wakeups
std::vector<unsigned char> waiting;
public:
barrier(size_t numthreads) {
needed = (int)numthreads;
called = 0;
waiting.resize(numthreads);
std::fill(waiting.begin(), waiting.end(), 0);
}
~barrier() {}
inline void wait() {
m.lock();
// set waiting;
size_t myid = called;
waiting[myid] = 1;
called++;
if (called == needed) {
// if I have reached the required limit, wait up. Set waiting
// to 0 to make sure everyone wakes up
called = 0;
// clear all waiting
std::fill(waiting.begin(), waiting.end(), 0);
c.broadcast();
}
else {
// while no one has broadcasted, sleep
while(waiting[myid]) c.wait(m);
}
m.unlock();
}
};
#endif
inline void prefetch_range(void *addr, size_t len) {
char *cp;
char *end = (char*)(addr) + len;
for (cp = (char*)(addr); cp < end; cp += 64) __builtin_prefetch(cp, 0);
}
inline void prefetch_range_write(void *addr, size_t len) {
char *cp;
char *end = (char*)(addr) + len;
for (cp = (char*)(addr); cp < end; cp += 64) __builtin_prefetch(cp, 1);
}
};
#endif
You can’t perform that action at this time.
