52 int elsize,
int consumer_pid,
int signal_when_queue_non_empty)
55 pthread_mutexattr_t attr;
56 pthread_condattr_t cattr;
60 memset (q, 0,
sizeof (*q));
64 q->consumer_pid = consumer_pid;
65 q->signal_when_queue_non_empty = signal_when_queue_non_empty;
67 memset (&attr, 0,
sizeof (attr));
68 memset (&cattr, 0,
sizeof (cattr));
70 if (pthread_mutexattr_init (&attr))
72 if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
74 if (pthread_mutex_init (&q->mutex, &attr))
76 if (pthread_mutexattr_destroy (&attr))
78 if (pthread_condattr_init (&cattr))
81 if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
83 if (pthread_cond_init (&q->condvar, &cattr))
85 if (pthread_condattr_destroy (&cattr))
97 (void) pthread_mutex_destroy (&q->mutex);
98 (void) pthread_cond_destroy (&q->condvar);
105 pthread_mutex_lock (&q->mutex);
111 pthread_mutex_unlock (&q->mutex);
117 return q->cursize == q->maxsize;
127 int need_broadcast = 0;
131 while (q->cursize == q->maxsize)
133 (void) pthread_cond_wait (&q->condvar, &q->mutex);
137 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
143 need_broadcast = (q->cursize == 1);
145 if (q->tail == q->maxsize)
150 (void) pthread_cond_broadcast (&q->condvar);
151 if (q->signal_when_queue_non_empty)
152 kill (q->consumer_pid, q->signal_when_queue_non_empty);
164 while (q->cursize == q->maxsize)
168 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
174 if (q->tail == q->maxsize)
187 int need_broadcast = 0;
192 if (pthread_mutex_trylock (&q->mutex))
198 pthread_mutex_lock (&q->mutex);
204 pthread_mutex_unlock (&q->mutex);
207 while (q->cursize == q->maxsize)
209 (void) pthread_cond_wait (&q->condvar, &q->mutex);
213 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
219 need_broadcast = (q->cursize == 1);
221 if (q->tail == q->maxsize)
226 (void) pthread_cond_broadcast (&q->condvar);
227 if (q->signal_when_queue_non_empty)
228 kill (q->consumer_pid, q->signal_when_queue_non_empty);
230 pthread_mutex_unlock (&q->mutex);
242 int need_broadcast = 0;
247 if (pthread_mutex_trylock (&q->mutex))
253 pthread_mutex_lock (&q->mutex);
259 pthread_mutex_unlock (&q->mutex);
262 while (q->cursize + 1 == q->maxsize)
264 (void) pthread_cond_wait (&q->condvar, &q->mutex);
268 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
274 if (q->tail == q->maxsize)
277 need_broadcast = (q->cursize == 1);
279 tailp = (
i8 *) (&q->data[0] + q->elsize * q->tail);
285 if (q->tail == q->maxsize)
290 (void) pthread_cond_broadcast (&q->condvar);
291 if (q->signal_when_queue_non_empty)
292 kill (q->consumer_pid, q->signal_when_queue_non_empty);
294 pthread_mutex_unlock (&q->mutex);
307 int need_broadcast = 0;
313 if (pthread_mutex_trylock (&q->mutex))
319 pthread_mutex_lock (&q->mutex);
325 pthread_mutex_unlock (&q->mutex);
333 while (q->cursize == 0 && rc == 0)
335 rc = pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
339 pthread_mutex_unlock (&q->mutex);
345 while (q->cursize == 0)
347 (void) pthread_cond_wait (&q->condvar, &q->mutex);
352 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
357 if (q->cursize == q->maxsize)
362 if (q->head == q->maxsize)
366 (void) pthread_cond_broadcast (&q->condvar);
368 pthread_mutex_unlock (&q->mutex);
379 pthread_mutex_lock (&q->mutex);
382 pthread_mutex_unlock (&q->mutex);
386 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
390 need_broadcast = (q->cursize == q->maxsize / 2);
395 pthread_mutex_unlock (&q->mutex);
398 (void) pthread_cond_broadcast (&q->condvar);
410 while (q->cursize == 0)
414 headp = (
i8 *) (&q->data[0] + q->elsize * q->head);
420 if (q->head == q->maxsize)
int svm_queue_is_full(svm_queue_t *q)
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
int svm_queue_sub2(svm_queue_t *q, u8 *elem)
int svm_queue_add_raw(svm_queue_t *q, u8 *elem)
void svm_queue_unlock(svm_queue_t *q)
svm_queue_t * svm_queue_init(int nels, int elsize, int consumer_pid, int signal_when_queue_non_empty)
blocking call, return on signal or time-out
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
static f64 unix_time_now(void)
int svm_queue_add2(svm_queue_t *q, u8 *elem, u8 *elem2, int nowait)
void svm_queue_lock(svm_queue_t *q)
#define clib_memcpy(a, b, c)
void svm_queue_free(svm_queue_t *q)
static void clib_mem_free(void *p)
int svm_queue_sub_raw(svm_queue_t *q, u8 *elem)
#define clib_unix_warning(format, args...)
int svm_queue_add_nolock(svm_queue_t *q, u8 *elem)
struct _svm_queue svm_queue_t
static void * clib_mem_alloc_aligned(uword size, uword align)
#define CLIB_CACHE_LINE_BYTES