1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2016-2017 Intel Corporation 3 */ 4 5 #include <rte_atomic.h> 6 #include <rte_cycles.h> 7 #include <rte_event_ring.h> 8 9 #include "sw_evdev.h" 10 11 #define PORT_ENQUEUE_MAX_BURST_SIZE 64 12 13 static inline void 14 sw_event_release(struct sw_port *p, uint8_t index) 15 { 16 /* 17 * Drops the next outstanding event in our history. Used on dequeue 18 * to clear any history before dequeuing more events. 19 */ 20 RTE_SET_USED(index); 21 22 /* create drop message */ 23 struct rte_event ev; 24 ev.op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE]; 25 26 uint16_t free_count; 27 rte_event_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count); 28 29 /* each release returns one credit */ 30 p->outstanding_releases--; 31 p->inflight_credits++; 32 } 33 34 /* 35 * special-case of rte_event_ring enqueue, with overriding the ops member on 36 * the events that get written to the ring. 37 */ 38 static inline unsigned int 39 enqueue_burst_with_ops(struct rte_event_ring *r, const struct rte_event *events, 40 unsigned int n, uint8_t *ops) 41 { 42 struct rte_event tmp_evs[PORT_ENQUEUE_MAX_BURST_SIZE]; 43 unsigned int i; 44 45 memcpy(tmp_evs, events, n * sizeof(events[0])); 46 for (i = 0; i < n; i++) 47 tmp_evs[i].op = ops[i]; 48 49 return rte_event_ring_enqueue_burst(r, tmp_evs, n, NULL); 50 } 51 52 uint16_t 53 sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) 54 { 55 int32_t i; 56 uint8_t new_ops[PORT_ENQUEUE_MAX_BURST_SIZE]; 57 struct sw_port *p = port; 58 struct sw_evdev *sw = (void *)p->sw; 59 uint32_t sw_inflights = rte_atomic32_read(&sw->inflights); 60 uint32_t credit_update_quanta = sw->credit_update_quanta; 61 int new = 0; 62 63 if (num > PORT_ENQUEUE_MAX_BURST_SIZE) 64 num = PORT_ENQUEUE_MAX_BURST_SIZE; 65 66 for (i = 0; i < num; i++) 67 new += (ev[i].op == RTE_EVENT_OP_NEW); 68 69 if (unlikely(new > 0 && p->inflight_max < sw_inflights)) 70 return 0; 71 72 if (p->inflight_credits < new) { 73 /* check if event enqueue brings port over max threshold */ 74 if (sw_inflights + credit_update_quanta > sw->nb_events_limit) 75 return 0; 76 77 rte_atomic32_add(&sw->inflights, credit_update_quanta); 78 p->inflight_credits += (credit_update_quanta); 79 80 if (p->inflight_credits < new) 81 return 0; 82 } 83 84 for (i = 0; i < num; i++) { 85 int op = ev[i].op; 86 int outstanding = p->outstanding_releases > 0; 87 const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count); 88 89 p->inflight_credits -= (op == RTE_EVENT_OP_NEW); 90 p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) * 91 outstanding; 92 93 new_ops[i] = sw_qe_flag_map[op]; 94 new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT); 95 96 /* FWD and RELEASE packets will both resolve to taken (assuming 97 * correct usage of the API), providing very high correct 98 * prediction rate. 99 */ 100 if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) 101 p->outstanding_releases--; 102 103 /* error case: branch to avoid touching p->stats */ 104 if (unlikely(invalid_qid && op != RTE_EVENT_OP_RELEASE)) { 105 p->stats.rx_dropped++; 106 p->inflight_credits++; 107 } 108 } 109 110 /* returns number of events actually enqueued */ 111 uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i, 112 new_ops); 113 if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) { 114 uint64_t burst_ticks = rte_get_timer_cycles() - 115 p->last_dequeue_ticks; 116 uint64_t burst_pkt_ticks = 117 burst_ticks / p->last_dequeue_burst_sz; 118 p->avg_pkt_ticks -= p->avg_pkt_ticks / NUM_SAMPLES; 119 p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES; 120 p->last_dequeue_ticks = 0; 121 } 122 123 /* Replenish credits if enough releases are performed */ 124 if (p->inflight_credits >= credit_update_quanta * 2) { 125 rte_atomic32_sub(&sw->inflights, credit_update_quanta); 126 p->inflight_credits -= credit_update_quanta; 127 } 128 129 return enq; 130 } 131 132 uint16_t 133 sw_event_enqueue(void *port, const struct rte_event *ev) 134 { 135 return sw_event_enqueue_burst(port, ev, 1); 136 } 137 138 uint16_t 139 sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, 140 uint64_t wait) 141 { 142 RTE_SET_USED(wait); 143 struct sw_port *p = (void *)port; 144 struct rte_event_ring *ring = p->cq_worker_ring; 145 146 /* check that all previous dequeues have been released */ 147 if (p->implicit_release) { 148 struct sw_evdev *sw = (void *)p->sw; 149 uint32_t credit_update_quanta = sw->credit_update_quanta; 150 uint16_t out_rels = p->outstanding_releases; 151 uint16_t i; 152 for (i = 0; i < out_rels; i++) 153 sw_event_release(p, i); 154 155 /* Replenish credits if enough releases are performed */ 156 if (p->inflight_credits >= credit_update_quanta * 2) { 157 rte_atomic32_sub(&sw->inflights, credit_update_quanta); 158 p->inflight_credits -= credit_update_quanta; 159 } 160 } 161 162 /* returns number of events actually dequeued */ 163 uint16_t ndeq = rte_event_ring_dequeue_burst(ring, ev, num, NULL); 164 if (unlikely(ndeq == 0)) { 165 p->zero_polls++; 166 p->total_polls++; 167 goto end; 168 } 169 170 p->outstanding_releases += ndeq; 171 p->last_dequeue_burst_sz = ndeq; 172 p->last_dequeue_ticks = rte_get_timer_cycles(); 173 p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++; 174 p->total_polls++; 175 176 end: 177 return ndeq; 178 } 179 180 uint16_t 181 sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait) 182 { 183 return sw_event_dequeue_burst(port, ev, 1, wait); 184 } 185