/* *- * BSD LICENSE * * Copyright(c) 2015 Intel Corporation. All rights reserved. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * Neither the name of Intel Corporation nor the names of its * contributors may be used to endorse or promote products derived * from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ /* * Some portions of this software is derived from the producer * consumer queues described by Dmitry Vyukov and published here * http://www.1024cores.net * * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * The views and conclusions contained in the software and documentation are * those of the authors and should not be interpreted as representing official * policies, either expressed or implied, of Dmitry Vyukov. */ #ifndef LTHREAD_POOL_H_ #define LTHREAD_POOL_H_ #include #include #include #include "lthread_int.h" #include "lthread_diag.h" /* * This file implements pool of queue nodes used by the queue implemented * in lthread_queue.h. * * The pool is an intrusive lock free MPSC queue. * * The pool is created empty and populated lazily, i.e. on first attempt to * allocate a the pool. * * Whenever the pool is empty more nodes are added to the pool * The number of nodes preallocated in this way is a parameter of * _qnode_pool_create. Freeing an object returns it to the pool. * * Each lthread scheduler maintains its own pool of nodes. L-threads must always * allocate from this local pool ( because it is a single consumer queue ). * L-threads can free nodes to any pool (because it is a multi producer queue) * This enables threads that have affined to a different scheduler to free * nodes safely. */ struct qnode; struct qnode_cache; /* * define intermediate node */ struct qnode { struct qnode *next; void *data; struct qnode_pool *pool; } __rte_cache_aligned; /* * a pool structure */ struct qnode_pool { struct qnode *head; struct qnode *stub; struct qnode *fast_alloc; struct qnode *tail __rte_cache_aligned; int pre_alloc; char name[LT_MAX_NAME_SIZE]; DIAG_COUNT_DEFINE(rd); DIAG_COUNT_DEFINE(wr); DIAG_COUNT_DEFINE(available); DIAG_COUNT_DEFINE(prealloc); DIAG_COUNT_DEFINE(capacity); } __rte_cache_aligned; /* * Create a pool of qnodes */ static inline struct qnode_pool * _qnode_pool_create(const char *name, int prealloc_size) { struct qnode_pool *p = rte_malloc_socket(NULL, sizeof(struct qnode_pool), RTE_CACHE_LINE_SIZE, rte_socket_id()); LTHREAD_ASSERT(p); p->stub = rte_malloc_socket(NULL, sizeof(struct qnode), RTE_CACHE_LINE_SIZE, rte_socket_id()); LTHREAD_ASSERT(p->stub); if (name != NULL) strncpy(p->name, name, LT_MAX_NAME_SIZE); p->name[sizeof(p->name)-1] = 0; p->stub->pool = p; p->stub->next = NULL; p->tail = p->stub; p->head = p->stub; p->pre_alloc = prealloc_size; DIAG_COUNT_INIT(p, rd); DIAG_COUNT_INIT(p, wr); DIAG_COUNT_INIT(p, available); DIAG_COUNT_INIT(p, prealloc); DIAG_COUNT_INIT(p, capacity); return p; } /* * Insert a node into the pool */ static inline void __attribute__ ((always_inline)) _qnode_pool_insert(struct qnode_pool *p, struct qnode *n) { n->next = NULL; struct qnode *prev = n; /* We insert at the head */ prev = (struct qnode *) __sync_lock_test_and_set((uint64_t *)&p->head, (uint64_t) prev); /* there is a window of inconsistency until prev next is set */ /* which is why remove must retry */ prev->next = (n); } /* * Remove a node from the pool * * There is a race with _qnode_pool_insert() whereby the queue could appear * empty during a concurrent insert, this is handled by retrying * * The queue uses a stub node, which must be swung as the queue becomes * empty, this requires an insert of the stub, which means that removing the * last item from the queue incurs the penalty of an atomic exchange. Since the * pool is maintained with a bulk pre-allocation the cost of this is amortised. */ static inline struct qnode *__attribute__ ((always_inline)) _pool_remove(struct qnode_pool *p) { struct qnode *head; struct qnode *tail = p->tail; struct qnode *next = tail->next; /* we remove from the tail */ if (tail == p->stub) { if (next == NULL) return NULL; /* advance the tail */ p->tail = next; tail = next; next = next->next; } if (likely(next != NULL)) { p->tail = next; return tail; } head = p->head; if (tail == head) return NULL; /* swing stub node */ _qnode_pool_insert(p, p->stub); next = tail->next; if (next) { p->tail = next; return tail; } return NULL; } /* * This adds a retry to the _pool_remove function * defined above */ static inline struct qnode *__attribute__ ((always_inline)) _qnode_pool_remove(struct qnode_pool *p) { struct qnode *n; do { n = _pool_remove(p); if (likely(n != NULL)) return n; rte_compiler_barrier(); } while ((p->head != p->tail) && (p->tail != p->stub)); return NULL; } /* * Allocate a node from the pool * If the pool is empty add mode nodes */ static inline struct qnode *__attribute__ ((always_inline)) _qnode_alloc(void) { struct qnode_pool *p = (THIS_SCHED)->qnode_pool; int prealloc_size = p->pre_alloc; struct qnode *n; int i; if (likely(p->fast_alloc != NULL)) { n = p->fast_alloc; p->fast_alloc = NULL; return n; } n = _qnode_pool_remove(p); if (unlikely(n == NULL)) { DIAG_COUNT_INC(p, prealloc); for (i = 0; i < prealloc_size; i++) { n = rte_malloc_socket(NULL, sizeof(struct qnode), RTE_CACHE_LINE_SIZE, rte_socket_id()); if (n == NULL) return NULL; DIAG_COUNT_INC(p, available); DIAG_COUNT_INC(p, capacity); n->pool = p; _qnode_pool_insert(p, n); } n = _qnode_pool_remove(p); } n->pool = p; DIAG_COUNT_INC(p, rd); DIAG_COUNT_DEC(p, available); return n; } /* * free a queue node to the per scheduler pool from which it came */ static inline void __attribute__ ((always_inline)) _qnode_free(struct qnode *n) { struct qnode_pool *p = n->pool; if (unlikely(p->fast_alloc != NULL) || unlikely(n->pool != (THIS_SCHED)->qnode_pool)) { DIAG_COUNT_INC(p, wr); DIAG_COUNT_INC(p, available); _qnode_pool_insert(p, n); return; } p->fast_alloc = n; } /* * Destroy an qnode pool * queue must be empty when this is called */ static inline int _qnode_pool_destroy(struct qnode_pool *p) { rte_free(p->stub); rte_free(p); return 0; } #endif /* LTHREAD_POOL_H_ */