DPDK 22.11.4
rte_rcu_qsbr.h
Go to the documentation of this file.
1/* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright (c) 2018-2020 Arm Limited
3 */
4
5#ifndef _RTE_RCU_QSBR_H_
6#define _RTE_RCU_QSBR_H_
7
28#ifdef __cplusplus
29extern "C" {
30#endif
31
32#include <inttypes.h>
33#include <stdbool.h>
34#include <stdio.h>
35#include <stdint.h>
36#include <rte_compat.h>
37#include <rte_common.h>
38#include <rte_debug.h>
39#include <rte_atomic.h>
40#include <rte_ring.h>
41
42extern int rte_rcu_log_type;
43
44#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
45#define __RTE_RCU_DP_LOG(level, fmt, args...) \
46 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
47 "%s(): " fmt "\n", __func__, ## args)
48#else
49#define __RTE_RCU_DP_LOG(level, fmt, args...)
50#endif
51
52#if defined(RTE_LIBRTE_RCU_DEBUG)
53#define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
54 if (v->qsbr_cnt[thread_id].lock_cnt) \
55 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
56 "%s(): " fmt "\n", __func__, ## args); \
57} while (0)
58#else
59#define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
60#endif
61
62/* Registered thread IDs are stored as a bitmap of 64b element array.
63 * Given thread id needs to be converted to index into the array and
64 * the id within the array element.
65 */
66#define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
67#define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
68 RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
69 __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
70#define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
71 ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
72#define __RTE_QSBR_THRID_INDEX_SHIFT 6
73#define __RTE_QSBR_THRID_MASK 0x3f
74#define RTE_QSBR_THRID_INVALID 0xffffffff
75
76/* Worker thread counter */
77struct rte_rcu_qsbr_cnt {
78 uint64_t cnt;
84 uint32_t lock_cnt;
87
88#define __RTE_QSBR_CNT_THR_OFFLINE 0
89#define __RTE_QSBR_CNT_INIT 1
90#define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
91#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
92
93/* RTE Quiescent State variable structure.
94 * This structure has two elements that vary in size based on the
95 * 'max_threads' parameter.
96 * 1) Quiescent state counter array
97 * 2) Register thread ID array
98 */
99struct rte_rcu_qsbr {
100 uint64_t token __rte_cache_aligned;
102 uint64_t acked_token;
107 uint32_t num_elems __rte_cache_aligned;
109 uint32_t num_threads;
111 uint32_t max_threads;
114 struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
121
135typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
136
137#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
138
147#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
148
153 const char *name;
155 uint32_t flags;
157 uint32_t size;
164 uint32_t esize;
186 void *p;
191 struct rte_rcu_qsbr *v;
193};
194
195/* RTE defer queue structure.
196 * This structure holds the defer queue. The defer queue is used to
197 * hold the deleted entries from the data structure that are not
198 * yet freed.
199 */
200struct rte_rcu_qsbr_dq;
201
213size_t
214rte_rcu_qsbr_get_memsize(uint32_t max_threads);
215
231int
232rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
233
254int
255rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
256
272int
273rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
274
300static __rte_always_inline void
301rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
302{
303 uint64_t t;
304
305 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
306
307 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
308 v->qsbr_cnt[thread_id].lock_cnt);
309
310 /* Copy the current value of token.
311 * The fence at the end of the function will ensure that
312 * the following will not move down after the load of any shared
313 * data structure.
314 */
315 t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
316
317 /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
318 * 'cnt' (64b) is accessed atomically.
319 */
320 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
321 t, __ATOMIC_RELAXED);
322
323 /* The subsequent load of the data structure should not
324 * move above the store. Hence a store-load barrier
325 * is required.
326 * If the load of the data structure moves above the store,
327 * writer might not see that the reader is online, even though
328 * the reader is referencing the shared data structure.
329 */
330 rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
331}
332
353static __rte_always_inline void
354rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
355{
356 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
357
358 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
359 v->qsbr_cnt[thread_id].lock_cnt);
360
361 /* The reader can go offline only after the load of the
362 * data structure is completed. i.e. any load of the
363 * data structure can not move after this store.
364 */
365
366 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
367 __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
368}
369
390static __rte_always_inline void
391rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
392 __rte_unused unsigned int thread_id)
393{
394 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
395
396#if defined(RTE_LIBRTE_RCU_DEBUG)
397 /* Increment the lock counter */
398 __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
399 1, __ATOMIC_ACQUIRE);
400#endif
401}
402
423static __rte_always_inline void
424rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
425 __rte_unused unsigned int thread_id)
426{
427 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
428
429#if defined(RTE_LIBRTE_RCU_DEBUG)
430 /* Decrement the lock counter */
431 __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
432 1, __ATOMIC_RELEASE);
433
434 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
435 "Lock counter %u. Nested locks?\n",
436 v->qsbr_cnt[thread_id].lock_cnt);
437#endif
438}
439
453static __rte_always_inline uint64_t
454rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
455{
456 uint64_t t;
457
458 RTE_ASSERT(v != NULL);
459
460 /* Release the changes to the shared data structure.
461 * This store release will ensure that changes to any data
462 * structure are visible to the workers before the token
463 * update is visible.
464 */
465 t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
466
467 return t;
468}
469
482static __rte_always_inline void
483rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
484{
485 uint64_t t;
486
487 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
488
489 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
490 v->qsbr_cnt[thread_id].lock_cnt);
491
492 /* Acquire the changes to the shared data structure released
493 * by rte_rcu_qsbr_start.
494 * Later loads of the shared data structure should not move
495 * above this load. Hence, use load-acquire.
496 */
497 t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
498
499 /* Check if there are updates available from the writer.
500 * Inform the writer that updates are visible to this reader.
501 * Prior loads of the shared data structure should not move
502 * beyond this store. Hence use store-release.
503 */
504 if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
505 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
506 t, __ATOMIC_RELEASE);
507
508 __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
509 __func__, t, thread_id);
510}
511
512/* Check the quiescent state counter for registered threads only, assuming
513 * that not all threads have registered.
514 */
515static __rte_always_inline int
516__rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
517{
518 uint32_t i, j, id;
519 uint64_t bmap;
520 uint64_t c;
521 uint64_t *reg_thread_id;
522 uint64_t acked_token = __RTE_QSBR_CNT_MAX;
523
524 for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
525 i < v->num_elems;
526 i++, reg_thread_id++) {
527 /* Load the current registered thread bit map before
528 * loading the reader thread quiescent state counters.
529 */
530 bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
531 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
532
533 while (bmap) {
534 j = __builtin_ctzl(bmap);
535 __RTE_RCU_DP_LOG(DEBUG,
536 "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
537 __func__, t, wait, bmap, id + j);
538 c = __atomic_load_n(
539 &v->qsbr_cnt[id + j].cnt,
540 __ATOMIC_ACQUIRE);
541 __RTE_RCU_DP_LOG(DEBUG,
542 "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
543 __func__, t, wait, c, id+j);
544
545 /* Counter is not checked for wrap-around condition
546 * as it is a 64b counter.
547 */
548 if (unlikely(c !=
549 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
550 /* This thread is not in quiescent state */
551 if (!wait)
552 return 0;
553
554 rte_pause();
555 /* This thread might have unregistered.
556 * Re-read the bitmap.
557 */
558 bmap = __atomic_load_n(reg_thread_id,
559 __ATOMIC_ACQUIRE);
560
561 continue;
562 }
563
564 /* This thread is in quiescent state. Use the counter
565 * to find the least acknowledged token among all the
566 * readers.
567 */
568 if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
569 acked_token = c;
570
571 bmap &= ~(1UL << j);
572 }
573 }
574
575 /* All readers are checked, update least acknowledged token.
576 * There might be multiple writers trying to update this. There is
577 * no need to update this very accurately using compare-and-swap.
578 */
579 if (acked_token != __RTE_QSBR_CNT_MAX)
580 __atomic_store_n(&v->acked_token, acked_token,
581 __ATOMIC_RELAXED);
582
583 return 1;
584}
585
586/* Check the quiescent state counter for all threads, assuming that
587 * all the threads have registered.
588 */
589static __rte_always_inline int
590__rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
591{
592 uint32_t i;
593 struct rte_rcu_qsbr_cnt *cnt;
594 uint64_t c;
595 uint64_t acked_token = __RTE_QSBR_CNT_MAX;
596
597 for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
598 __RTE_RCU_DP_LOG(DEBUG,
599 "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
600 __func__, t, wait, i);
601 while (1) {
602 c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
603 __RTE_RCU_DP_LOG(DEBUG,
604 "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
605 __func__, t, wait, c, i);
606
607 /* Counter is not checked for wrap-around condition
608 * as it is a 64b counter.
609 */
610 if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
611 break;
612
613 /* This thread is not in quiescent state */
614 if (!wait)
615 return 0;
616
617 rte_pause();
618 }
619
620 /* This thread is in quiescent state. Use the counter to find
621 * the least acknowledged token among all the readers.
622 */
623 if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
624 acked_token = c;
625 }
626
627 /* All readers are checked, update least acknowledged token.
628 * There might be multiple writers trying to update this. There is
629 * no need to update this very accurately using compare-and-swap.
630 */
631 if (acked_token != __RTE_QSBR_CNT_MAX)
632 __atomic_store_n(&v->acked_token, acked_token,
633 __ATOMIC_RELAXED);
634
635 return 1;
636}
637
669static __rte_always_inline int
670rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
671{
672 RTE_ASSERT(v != NULL);
673
674 /* Check if all the readers have already acknowledged this token */
675 if (likely(t <= v->acked_token)) {
676 __RTE_RCU_DP_LOG(DEBUG,
677 "%s: check: token = %" PRIu64 ", wait = %d",
678 __func__, t, wait);
679 __RTE_RCU_DP_LOG(DEBUG,
680 "%s: status: least acked token = %" PRIu64,
681 __func__, v->acked_token);
682 return 1;
683 }
684
685 if (likely(v->num_threads == v->max_threads))
686 return __rte_rcu_qsbr_check_all(v, t, wait);
687 else
688 return __rte_rcu_qsbr_check_selective(v, t, wait);
689}
690
709void
710rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
711
727int
728rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
729
746__rte_experimental
747struct rte_rcu_qsbr_dq *
749
781__rte_experimental
782int
783rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
784
810__rte_experimental
811int
812rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
813 unsigned int *freed, unsigned int *pending, unsigned int *available);
814
836__rte_experimental
837int
838rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
839
840#ifdef __cplusplus
841}
842#endif
843
844#endif /* _RTE_RCU_QSBR_H_ */
static void rte_atomic_thread_fence(int memorder)
#define likely(x)
#define unlikely(x)
#define __rte_cache_aligned
Definition: rte_common.h:440
#define __rte_unused
Definition: rte_common.h:120
#define __rte_always_inline
Definition: rte_common.h:255
static void rte_pause(void)
static __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:454
__rte_experimental struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
static __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:424
int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:301
int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:391
__rte_experimental int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
static __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:354
__rte_experimental int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
__rte_experimental int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
Definition: rte_rcu_qsbr.h:135
void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
static __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:670
size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
static __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:483
struct rte_rcu_qsbr * v
Definition: rte_rcu_qsbr.h:191
rte_rcu_qsbr_free_resource_t free_fn
Definition: rte_rcu_qsbr.h:184