FD.io VPP  v18.11-rc0-18-g2a3fb1a
Vector Packet Processing
message_queue.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <svm/message_queue.h>
17 #include <vppinfra/mem.h>
18 
19 static inline svm_msg_q_ring_t *
21 {
22  return vec_elt_at_index (mq->rings, ring_index);
23 }
24 
26 svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index)
27 {
28  return svm_msg_q_ring_inline (mq, ring_index);
29 }
30 
31 static inline void *
33 {
34  ASSERT (elt_index < ring->nitems);
35  return (ring->data + elt_index * ring->elsize);
36 }
37 
40 {
41  svm_msg_q_ring_t *ring;
42  svm_msg_q_t *mq;
43  uword size;
44  int i;
45 
46  if (!cfg)
47  return 0;
48 
50  memset (mq, 0, sizeof (*mq));
51  mq->q = svm_queue_init (cfg->q_nitems, sizeof (svm_msg_q_msg_t),
52  cfg->consumer_pid, 0);
53  vec_validate (mq->rings, cfg->n_rings - 1);
54  for (i = 0; i < cfg->n_rings; i++)
55  {
56  ring = &mq->rings[i];
57  ring->elsize = cfg->ring_cfgs[i].elsize;
58  ring->nitems = cfg->ring_cfgs[i].nitems;
59  if (cfg->ring_cfgs[i].data)
60  ring->data = cfg->ring_cfgs[i].data;
61  else
62  {
63  size = (uword) ring->nitems * ring->elsize;
65  }
66  }
67 
68  return mq;
69 }
70 
71 void
73 {
74  svm_msg_q_ring_t *ring;
75 
76  vec_foreach (ring, mq->rings)
77  {
78  clib_mem_free (ring->data);
79  }
80  vec_free (mq->rings);
81  clib_mem_free (mq);
82 }
83 
86 {
87  svm_msg_q_msg_t msg = {.as_u64 = ~0 };
88  svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index);
89 
90  ASSERT (ring->cursize != ring->nitems);
91  msg.ring_index = ring - mq->rings;
92  msg.elt_index = ring->tail;
93  ring->tail = (ring->tail + 1) % ring->nitems;
94  __sync_fetch_and_add (&ring->cursize, 1);
95  return msg;
96 }
97 
98 int
100  u8 noblock, svm_msg_q_msg_t * msg)
101 {
102  if (noblock)
103  {
104  if (svm_msg_q_try_lock (mq))
105  return -1;
106  if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, ring_index)))
107  {
108  svm_msg_q_unlock (mq);
109  return -2;
110  }
111  *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
113  {
114  svm_msg_q_unlock (mq);
115  return -2;
116  }
117  }
118  else
119  {
120  svm_msg_q_lock (mq);
121  *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
122  while (svm_msg_q_msg_is_invalid (msg))
123  {
124  svm_msg_q_wait (mq);
125  *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
126  }
127  }
128  return 0;
129 }
130 
133 {
134  svm_msg_q_msg_t msg = {.as_u64 = ~0 };
135  svm_msg_q_ring_t *ring;
136 
137  vec_foreach (ring, mq->rings)
138  {
139  if (ring->elsize < nbytes || ring->cursize == ring->nitems)
140  continue;
141  msg.ring_index = ring - mq->rings;
142  msg.elt_index = ring->tail;
143  ring->tail = (ring->tail + 1) % ring->nitems;
144  __sync_fetch_and_add (&ring->cursize, 1);
145  break;
146  }
147  return msg;
148 }
149 
150 void *
152 {
154  return svm_msg_q_ring_data (ring, msg->elt_index);
155 }
156 
157 void
159 {
160  svm_msg_q_ring_t *ring;
161 
162  if (vec_len (mq->rings) <= msg->ring_index)
163  return;
164  ring = &mq->rings[msg->ring_index];
165  if (msg->elt_index == ring->head)
166  {
167  ring->head = (ring->head + 1) % ring->nitems;
168  }
169  else
170  {
171  /* for now, expect messages to be processed in order */
172  ASSERT (0);
173  }
174  __sync_fetch_and_sub (&ring->cursize, 1);
175 }
176 
177 static int
179 {
180  svm_msg_q_ring_t *ring;
181  u32 dist1, dist2;
182 
183  if (vec_len (mq->rings) <= msg->ring_index)
184  return 0;
185  ring = &mq->rings[msg->ring_index];
186 
187  dist1 = ((ring->nitems + msg->elt_index) - ring->head) % ring->nitems;
188  if (ring->tail == ring->head)
189  dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
190  else
191  dist2 = ((ring->nitems + ring->tail) - ring->head) % ring->nitems;
192  return (dist1 < dist2);
193 }
194 
195 int
196 svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait)
197 {
198  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
199  return svm_queue_add (mq->q, (u8 *) msg, nowait);
200 }
201 
202 void
204 {
205  ASSERT (svm_msq_q_msg_is_valid (mq, msg));
206  svm_queue_add_raw (mq->q, (u8 *) msg);
207  svm_msg_q_unlock (mq);
208 }
209 
210 int
212  svm_q_conditional_wait_t cond, u32 time)
213 {
214  return svm_queue_sub (mq->q, (u8 *) msg, cond, time);
215 }
216 
217 void
219 {
220  svm_queue_sub_raw (mq->q, (u8 *) msg);
221 }
222 
223 /*
224  * fd.io coding-style-patch-verification: ON
225  *
226  * Local Variables:
227  * eval: (c-set-style "gnu")
228  * End:
229  */
#define vec_validate(V, I)
Make sure vector is long enough for given index (no header, unspecified alignment) ...
Definition: vec.h:437
void svm_queue_add_raw(svm_queue_t *q, u8 *elem)
Add element to queue with mutex held.
Definition: queue.c:158
static u8 svm_msg_q_msg_is_invalid(svm_msg_q_msg_t *msg)
Check if message is invalid.
svm_msg_q_ring_t * rings
rings with message data
Definition: message_queue.h:40
void * svm_msg_q_msg_data(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Get data for message in queue.
static u8 svm_msg_q_ring_is_full(svm_msg_q_t *mq, u32 ring_index)
int svm_queue_add(svm_queue_t *q, u8 *elem, int nowait)
Definition: queue.c:174
svm_msg_q_msg_t svm_msg_q_alloc_msg(svm_msg_q_t *mq, u32 nbytes)
Allocate message buffer.
int i
volatile u32 head
current head (for dequeue)
Definition: message_queue.h:31
unsigned char u8
Definition: types.h:56
static svm_msg_q_ring_t * svm_msg_q_ring_inline(svm_msg_q_t *mq, u32 ring_index)
Definition: message_queue.c:20
svm_queue_t * svm_queue_init(int nels, int elsize, int consumer_pid, int signal_when_queue_non_empty)
Definition: queue.c:51
int svm_msg_q_lock_and_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index, u8 noblock, svm_msg_q_msg_t *msg)
Lock message queue and allocate message buffer on ring.
Definition: message_queue.c:99
int svm_queue_sub(svm_queue_t *q, u8 *elem, svm_q_conditional_wait_t cond, u32 time)
Definition: queue.c:293
svm_msg_q_t * svm_msg_q_alloc(svm_msg_q_cfg_t *cfg)
Allocate message queue.
Definition: message_queue.c:39
#define vec_elt_at_index(v, i)
Get vector value at index i checking that i is in bounds.
volatile u32 tail
current tail (for enqueue)
Definition: message_queue.h:32
unsigned int u32
Definition: types.h:88
int svm_msg_q_sub(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, svm_q_conditional_wait_t cond, u32 time)
Consumer dequeue one message from queue.
void svm_msg_q_free(svm_msg_q_t *mq)
Free message queue.
Definition: message_queue.c:72
uword size
#define PREDICT_FALSE(x)
Definition: clib.h:105
svm_msg_q_ring_t * svm_msg_q_ring(svm_msg_q_t *mq, u32 ring_index)
Get message queue ring.
Definition: message_queue.c:26
static int svm_msq_q_msg_is_valid(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
static void svm_msg_q_wait(svm_msg_q_t *mq)
volatile u32 cursize
current size of the ring
Definition: message_queue.h:29
u32 n_rings
number of msg rings
Definition: message_queue.h:54
svm_q_conditional_wait_t
Definition: queue.h:39
Unidirectional shared-memory multi-ring message queue.
static void svm_msg_q_unlock(svm_msg_q_t *mq)
Unlock message queue.
#define vec_free(V)
Free vector&#39;s memory (no header).
Definition: vec.h:339
svm_queue_t * q
queue for exchanging messages
Definition: message_queue.h:39
int svm_msg_q_add(svm_msg_q_t *mq, svm_msg_q_msg_t *msg, int nowait)
Producer enqueue one message to queue.
u32 elt_index
index in ring
Definition: message_queue.h:63
static int svm_msg_q_try_lock(svm_msg_q_t *mq)
Try locking message queue.
u32 ring_index
ring index, could be u8
Definition: message_queue.h:62
#define ASSERT(truth)
void svm_msg_q_add_and_unlock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Producer enqueue one message to queue with mutex held.
static void clib_mem_free(void *p)
Definition: mem.h:179
u8 * data
chunk of memory for msg data
Definition: message_queue.h:34
int svm_queue_sub_raw(svm_queue_t *q, u8 *elem)
Definition: queue.c:394
svm_msg_q_ring_cfg_t * ring_cfgs
array of ring cfgs
Definition: message_queue.h:55
u32 q_nitems
msg queue size (not rings)
Definition: message_queue.h:53
void svm_msg_q_free_msg(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Free message buffer.
#define vec_len(v)
Number of elements in vector (rvalue-only, NULL tolerant)
u64 uword
Definition: types.h:112
u32 elsize
size of an element
Definition: message_queue.h:33
static void * clib_mem_alloc_aligned(uword size, uword align)
Definition: mem.h:120
#define vec_foreach(var, vec)
Vector iterator.
int consumer_pid
pid of msg consumer
Definition: message_queue.h:52
#define CLIB_CACHE_LINE_BYTES
Definition: cache.h:62
static int svm_msg_q_lock(svm_msg_q_t *mq)
Lock, or block trying, the message queue.
svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring(svm_msg_q_t *mq, u32 ring_index)
Allocate message buffer on ring.
Definition: message_queue.c:85
static void * svm_msg_q_ring_data(svm_msg_q_ring_t *ring, u32 elt_index)
Definition: message_queue.c:32
void svm_msg_q_sub_w_lock(svm_msg_q_t *mq, svm_msg_q_msg_t *msg)
Consumer dequeue one message from queue with mutex held.
u32 nitems
max size of the ring
Definition: message_queue.h:30