1 /* SPDX-License-Identifier: BSD-3-Clause 2 * Copyright(c) 2018 Ericsson AB 3 */ 4 5 #include <stdbool.h> 6 7 #include <rte_cycles.h> 8 #include <rte_eventdev_pmd.h> 9 #include <rte_eventdev_pmd_vdev.h> 10 #include <rte_random.h> 11 12 #include "dsw_evdev.h" 13 14 #define EVENTDEV_NAME_DSW_PMD event_dsw 15 16 static int 17 dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id, 18 const struct rte_event_port_conf *conf) 19 { 20 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 21 struct dsw_port *port; 22 struct rte_event_ring *in_ring; 23 struct rte_ring *ctl_in_ring; 24 char ring_name[RTE_RING_NAMESIZE]; 25 26 port = &dsw->ports[port_id]; 27 28 *port = (struct dsw_port) { 29 .id = port_id, 30 .dsw = dsw, 31 .dequeue_depth = conf->dequeue_depth, 32 .enqueue_depth = conf->enqueue_depth, 33 .new_event_threshold = conf->new_event_threshold 34 }; 35 36 snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id, 37 port_id); 38 39 in_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE, 40 dev->data->socket_id, 41 RING_F_SC_DEQ|RING_F_EXACT_SZ); 42 43 if (in_ring == NULL) 44 return -ENOMEM; 45 46 snprintf(ring_name, sizeof(ring_name), "dswctl%d_p%u", 47 dev->data->dev_id, port_id); 48 49 ctl_in_ring = rte_ring_create(ring_name, DSW_CTL_IN_RING_SIZE, 50 dev->data->socket_id, 51 RING_F_SC_DEQ|RING_F_EXACT_SZ); 52 53 if (ctl_in_ring == NULL) { 54 rte_event_ring_free(in_ring); 55 return -ENOMEM; 56 } 57 58 port->in_ring = in_ring; 59 port->ctl_in_ring = ctl_in_ring; 60 61 rte_atomic16_init(&port->load); 62 63 port->load_update_interval = 64 (DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S; 65 66 port->migration_interval = 67 (DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S; 68 69 dev->data->ports[port_id] = port; 70 71 return 0; 72 } 73 74 static void 75 dsw_port_def_conf(struct rte_eventdev *dev __rte_unused, 76 uint8_t port_id __rte_unused, 77 struct rte_event_port_conf *port_conf) 78 { 79 *port_conf = (struct rte_event_port_conf) { 80 .new_event_threshold = 1024, 81 .dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4, 82 .enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4 83 }; 84 } 85 86 static void 87 dsw_port_release(void *p) 88 { 89 struct dsw_port *port = p; 90 91 rte_event_ring_free(port->in_ring); 92 rte_ring_free(port->ctl_in_ring); 93 } 94 95 static int 96 dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id, 97 const struct rte_event_queue_conf *conf) 98 { 99 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 100 struct dsw_queue *queue = &dsw->queues[queue_id]; 101 102 if (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg) 103 return -ENOTSUP; 104 105 /* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it 106 * avoid the "fake" TYPE_PARALLEL flow_id assignment. Since 107 * the queue will only have a single serving port, no 108 * migration will ever happen, so the extra TYPE_ATOMIC 109 * migration overhead is avoided. 110 */ 111 if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg) 112 queue->schedule_type = RTE_SCHED_TYPE_ATOMIC; 113 else { 114 if (conf->schedule_type == RTE_SCHED_TYPE_ORDERED) 115 return -ENOTSUP; 116 /* atomic or parallel */ 117 queue->schedule_type = conf->schedule_type; 118 } 119 120 queue->num_serving_ports = 0; 121 122 return 0; 123 } 124 125 static void 126 dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused, 127 uint8_t queue_id __rte_unused, 128 struct rte_event_queue_conf *queue_conf) 129 { 130 *queue_conf = (struct rte_event_queue_conf) { 131 .nb_atomic_flows = 4096, 132 .schedule_type = RTE_SCHED_TYPE_ATOMIC, 133 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL 134 }; 135 } 136 137 static void 138 dsw_queue_release(struct rte_eventdev *dev __rte_unused, 139 uint8_t queue_id __rte_unused) 140 { 141 } 142 143 static void 144 queue_add_port(struct dsw_queue *queue, uint16_t port_id) 145 { 146 queue->serving_ports[queue->num_serving_ports] = port_id; 147 queue->num_serving_ports++; 148 } 149 150 static bool 151 queue_remove_port(struct dsw_queue *queue, uint16_t port_id) 152 { 153 uint16_t i; 154 155 for (i = 0; i < queue->num_serving_ports; i++) 156 if (queue->serving_ports[i] == port_id) { 157 uint16_t last_idx = queue->num_serving_ports - 1; 158 if (i != last_idx) 159 queue->serving_ports[i] = 160 queue->serving_ports[last_idx]; 161 queue->num_serving_ports--; 162 return true; 163 } 164 return false; 165 } 166 167 static int 168 dsw_port_link_unlink(struct rte_eventdev *dev, void *port, 169 const uint8_t queues[], uint16_t num, bool link) 170 { 171 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 172 struct dsw_port *p = port; 173 uint16_t i; 174 uint16_t count = 0; 175 176 for (i = 0; i < num; i++) { 177 uint8_t qid = queues[i]; 178 struct dsw_queue *q = &dsw->queues[qid]; 179 if (link) { 180 queue_add_port(q, p->id); 181 count++; 182 } else { 183 bool removed = queue_remove_port(q, p->id); 184 if (removed) 185 count++; 186 } 187 } 188 189 return count; 190 } 191 192 static int 193 dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[], 194 const uint8_t priorities[] __rte_unused, uint16_t num) 195 { 196 return dsw_port_link_unlink(dev, port, queues, num, true); 197 } 198 199 static int 200 dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[], 201 uint16_t num) 202 { 203 return dsw_port_link_unlink(dev, port, queues, num, false); 204 } 205 206 static void 207 dsw_info_get(struct rte_eventdev *dev __rte_unused, 208 struct rte_event_dev_info *info) 209 { 210 *info = (struct rte_event_dev_info) { 211 .driver_name = DSW_PMD_NAME, 212 .max_event_queues = DSW_MAX_QUEUES, 213 .max_event_queue_flows = DSW_MAX_FLOWS, 214 .max_event_queue_priority_levels = 1, 215 .max_event_priority_levels = 1, 216 .max_event_ports = DSW_MAX_PORTS, 217 .max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH, 218 .max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH, 219 .max_num_events = DSW_MAX_EVENTS, 220 .event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE| 221 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED| 222 RTE_EVENT_DEV_CAP_NONSEQ_MODE| 223 RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT 224 }; 225 } 226 227 static int 228 dsw_configure(const struct rte_eventdev *dev) 229 { 230 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 231 const struct rte_event_dev_config *conf = &dev->data->dev_conf; 232 int32_t min_max_in_flight; 233 234 dsw->num_ports = conf->nb_event_ports; 235 dsw->num_queues = conf->nb_event_queues; 236 237 /* Avoid a situation where consumer ports are holding all the 238 * credits, without making use of them. 239 */ 240 min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS; 241 242 dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight); 243 244 return 0; 245 } 246 247 248 static void 249 initial_flow_to_port_assignment(struct dsw_evdev *dsw) 250 { 251 uint8_t queue_id; 252 for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) { 253 struct dsw_queue *queue = &dsw->queues[queue_id]; 254 uint16_t flow_hash; 255 for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) { 256 uint8_t port_idx = 257 rte_rand() % queue->num_serving_ports; 258 uint8_t port_id = 259 queue->serving_ports[port_idx]; 260 dsw->queues[queue_id].flow_to_port_map[flow_hash] = 261 port_id; 262 } 263 } 264 } 265 266 static int 267 dsw_start(struct rte_eventdev *dev) 268 { 269 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 270 uint16_t i; 271 uint64_t now; 272 273 rte_atomic32_init(&dsw->credits_on_loan); 274 275 initial_flow_to_port_assignment(dsw); 276 277 now = rte_get_timer_cycles(); 278 for (i = 0; i < dsw->num_ports; i++) { 279 dsw->ports[i].measurement_start = now; 280 dsw->ports[i].busy_start = now; 281 } 282 283 return 0; 284 } 285 286 static void 287 dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len, 288 eventdev_stop_flush_t flush, void *flush_arg) 289 { 290 uint16_t i; 291 292 for (i = 0; i < buf_len; i++) 293 flush(dev_id, buf[i], flush_arg); 294 } 295 296 static void 297 dsw_port_drain_paused(uint8_t dev_id, struct dsw_port *port, 298 eventdev_stop_flush_t flush, void *flush_arg) 299 { 300 dsw_port_drain_buf(dev_id, port->paused_events, port->paused_events_len, 301 flush, flush_arg); 302 } 303 304 static void 305 dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port, 306 eventdev_stop_flush_t flush, void *flush_arg) 307 { 308 uint16_t dport_id; 309 310 for (dport_id = 0; dport_id < dsw->num_ports; dport_id++) 311 if (dport_id != port->id) 312 dsw_port_drain_buf(dev_id, port->out_buffer[dport_id], 313 port->out_buffer_len[dport_id], 314 flush, flush_arg); 315 } 316 317 static void 318 dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port, 319 eventdev_stop_flush_t flush, void *flush_arg) 320 { 321 struct rte_event ev; 322 323 while (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL)) 324 flush(dev_id, ev, flush_arg); 325 } 326 327 static void 328 dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw, 329 eventdev_stop_flush_t flush, void *flush_arg) 330 { 331 uint16_t port_id; 332 333 if (flush == NULL) 334 return; 335 336 for (port_id = 0; port_id < dsw->num_ports; port_id++) { 337 struct dsw_port *port = &dsw->ports[port_id]; 338 339 dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg); 340 dsw_port_drain_paused(dev_id, port, flush, flush_arg); 341 dsw_port_drain_in_ring(dev_id, port, flush, flush_arg); 342 } 343 } 344 345 static void 346 dsw_stop(struct rte_eventdev *dev) 347 { 348 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 349 uint8_t dev_id; 350 eventdev_stop_flush_t flush; 351 void *flush_arg; 352 353 dev_id = dev->data->dev_id; 354 flush = dev->dev_ops->dev_stop_flush; 355 flush_arg = dev->data->dev_stop_flush_arg; 356 357 dsw_drain(dev_id, dsw, flush, flush_arg); 358 } 359 360 static int 361 dsw_close(struct rte_eventdev *dev) 362 { 363 struct dsw_evdev *dsw = dsw_pmd_priv(dev); 364 365 dsw->num_ports = 0; 366 dsw->num_queues = 0; 367 368 return 0; 369 } 370 371 static struct rte_eventdev_ops dsw_evdev_ops = { 372 .port_setup = dsw_port_setup, 373 .port_def_conf = dsw_port_def_conf, 374 .port_release = dsw_port_release, 375 .queue_setup = dsw_queue_setup, 376 .queue_def_conf = dsw_queue_def_conf, 377 .queue_release = dsw_queue_release, 378 .port_link = dsw_port_link, 379 .port_unlink = dsw_port_unlink, 380 .dev_infos_get = dsw_info_get, 381 .dev_configure = dsw_configure, 382 .dev_start = dsw_start, 383 .dev_stop = dsw_stop, 384 .dev_close = dsw_close, 385 .xstats_get = dsw_xstats_get, 386 .xstats_get_names = dsw_xstats_get_names, 387 .xstats_get_by_name = dsw_xstats_get_by_name 388 }; 389 390 static int 391 dsw_probe(struct rte_vdev_device *vdev) 392 { 393 const char *name; 394 struct rte_eventdev *dev; 395 struct dsw_evdev *dsw; 396 397 name = rte_vdev_device_name(vdev); 398 399 dev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev), 400 rte_socket_id()); 401 if (dev == NULL) 402 return -EFAULT; 403 404 dev->dev_ops = &dsw_evdev_ops; 405 dev->enqueue = dsw_event_enqueue; 406 dev->enqueue_burst = dsw_event_enqueue_burst; 407 dev->enqueue_new_burst = dsw_event_enqueue_new_burst; 408 dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst; 409 dev->dequeue = dsw_event_dequeue; 410 dev->dequeue_burst = dsw_event_dequeue_burst; 411 412 if (rte_eal_process_type() != RTE_PROC_PRIMARY) 413 return 0; 414 415 dsw = dev->data->dev_private; 416 dsw->data = dev->data; 417 418 return 0; 419 } 420 421 static int 422 dsw_remove(struct rte_vdev_device *vdev) 423 { 424 const char *name; 425 426 name = rte_vdev_device_name(vdev); 427 if (name == NULL) 428 return -EINVAL; 429 430 return rte_event_pmd_vdev_uninit(name); 431 } 432 433 static struct rte_vdev_driver evdev_dsw_pmd_drv = { 434 .probe = dsw_probe, 435 .remove = dsw_remove 436 }; 437 438 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv); 439