1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* Processing of received RxRPC packets 3 * 4 * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells ([email protected]) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include "ar-internal.h" 11 12 /* Override priority when generating ACKs for received DATA */ 13 static const u8 rxrpc_ack_priority[RXRPC_ACK__INVALID] = { 14 [RXRPC_ACK_IDLE] = 1, 15 [RXRPC_ACK_DELAY] = 2, 16 [RXRPC_ACK_REQUESTED] = 3, 17 [RXRPC_ACK_DUPLICATE] = 4, 18 [RXRPC_ACK_EXCEEDS_WINDOW] = 5, 19 [RXRPC_ACK_NOSPACE] = 6, 20 [RXRPC_ACK_OUT_OF_SEQUENCE] = 7, 21 }; 22 23 static void rxrpc_proto_abort(struct rxrpc_call *call, rxrpc_seq_t seq, 24 enum rxrpc_abort_reason why) 25 { 26 rxrpc_abort_call(call, seq, RX_PROTOCOL_ERROR, -EBADMSG, why); 27 } 28 29 /* 30 * Do TCP-style congestion management [RFC 5681]. 31 */ 32 static void rxrpc_congestion_management(struct rxrpc_call *call, 33 struct sk_buff *skb, 34 struct rxrpc_ack_summary *summary, 35 rxrpc_serial_t acked_serial) 36 { 37 enum rxrpc_congest_change change = rxrpc_cong_no_change; 38 unsigned int cumulative_acks = call->cong_cumul_acks; 39 unsigned int cwnd = call->cong_cwnd; 40 bool resend = false; 41 42 summary->flight_size = 43 (call->tx_top - call->acks_hard_ack) - summary->nr_acks; 44 45 if (test_and_clear_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags)) { 46 summary->retrans_timeo = true; 47 call->cong_ssthresh = umax(summary->flight_size / 2, 2); 48 cwnd = 1; 49 if (cwnd >= call->cong_ssthresh && 50 call->cong_mode == RXRPC_CALL_SLOW_START) { 51 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; 52 call->cong_tstamp = skb->tstamp; 53 cumulative_acks = 0; 54 } 55 } 56 57 cumulative_acks += summary->nr_new_acks; 58 if (cumulative_acks > 255) 59 cumulative_acks = 255; 60 61 summary->cwnd = call->cong_cwnd; 62 summary->ssthresh = call->cong_ssthresh; 63 summary->cumulative_acks = cumulative_acks; 64 summary->dup_acks = call->cong_dup_acks; 65 66 switch (call->cong_mode) { 67 case RXRPC_CALL_SLOW_START: 68 if (summary->saw_nacks) 69 goto packet_loss_detected; 70 if (summary->cumulative_acks > 0) 71 cwnd += 1; 72 if (cwnd >= call->cong_ssthresh) { 73 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; 74 call->cong_tstamp = skb->tstamp; 75 } 76 goto out; 77 78 case RXRPC_CALL_CONGEST_AVOIDANCE: 79 if (summary->saw_nacks) 80 goto packet_loss_detected; 81 82 /* We analyse the number of packets that get ACK'd per RTT 83 * period and increase the window if we managed to fill it. 84 */ 85 if (call->peer->rtt_count == 0) 86 goto out; 87 if (ktime_before(skb->tstamp, 88 ktime_add_us(call->cong_tstamp, 89 call->peer->srtt_us >> 3))) 90 goto out_no_clear_ca; 91 change = rxrpc_cong_rtt_window_end; 92 call->cong_tstamp = skb->tstamp; 93 if (cumulative_acks >= cwnd) 94 cwnd++; 95 goto out; 96 97 case RXRPC_CALL_PACKET_LOSS: 98 if (!summary->saw_nacks) 99 goto resume_normality; 100 101 if (summary->new_low_nack) { 102 change = rxrpc_cong_new_low_nack; 103 call->cong_dup_acks = 1; 104 if (call->cong_extra > 1) 105 call->cong_extra = 1; 106 goto send_extra_data; 107 } 108 109 call->cong_dup_acks++; 110 if (call->cong_dup_acks < 3) 111 goto send_extra_data; 112 113 change = rxrpc_cong_begin_retransmission; 114 call->cong_mode = RXRPC_CALL_FAST_RETRANSMIT; 115 call->cong_ssthresh = umax(summary->flight_size / 2, 2); 116 cwnd = call->cong_ssthresh + 3; 117 call->cong_extra = 0; 118 call->cong_dup_acks = 0; 119 resend = true; 120 goto out; 121 122 case RXRPC_CALL_FAST_RETRANSMIT: 123 if (!summary->new_low_nack) { 124 if (summary->nr_new_acks == 0) 125 cwnd += 1; 126 call->cong_dup_acks++; 127 if (call->cong_dup_acks == 2) { 128 change = rxrpc_cong_retransmit_again; 129 call->cong_dup_acks = 0; 130 resend = true; 131 } 132 } else { 133 change = rxrpc_cong_progress; 134 cwnd = call->cong_ssthresh; 135 if (!summary->saw_nacks) 136 goto resume_normality; 137 } 138 goto out; 139 140 default: 141 BUG(); 142 goto out; 143 } 144 145 resume_normality: 146 change = rxrpc_cong_cleared_nacks; 147 call->cong_dup_acks = 0; 148 call->cong_extra = 0; 149 call->cong_tstamp = skb->tstamp; 150 if (cwnd < call->cong_ssthresh) 151 call->cong_mode = RXRPC_CALL_SLOW_START; 152 else 153 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; 154 out: 155 cumulative_acks = 0; 156 out_no_clear_ca: 157 if (cwnd >= RXRPC_TX_MAX_WINDOW) 158 cwnd = RXRPC_TX_MAX_WINDOW; 159 call->cong_cwnd = cwnd; 160 call->cong_cumul_acks = cumulative_acks; 161 summary->mode = call->cong_mode; 162 trace_rxrpc_congest(call, summary, acked_serial, change); 163 if (resend) 164 rxrpc_resend(call, skb); 165 return; 166 167 packet_loss_detected: 168 change = rxrpc_cong_saw_nack; 169 call->cong_mode = RXRPC_CALL_PACKET_LOSS; 170 call->cong_dup_acks = 0; 171 goto send_extra_data; 172 173 send_extra_data: 174 /* Send some previously unsent DATA if we have some to advance the ACK 175 * state. 176 */ 177 if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) || 178 summary->nr_acks != call->tx_top - call->acks_hard_ack) { 179 call->cong_extra++; 180 wake_up(&call->waitq); 181 } 182 goto out_no_clear_ca; 183 } 184 185 /* 186 * Degrade the congestion window if we haven't transmitted a packet for >1RTT. 187 */ 188 void rxrpc_congestion_degrade(struct rxrpc_call *call) 189 { 190 ktime_t rtt, now; 191 192 if (call->cong_mode != RXRPC_CALL_SLOW_START && 193 call->cong_mode != RXRPC_CALL_CONGEST_AVOIDANCE) 194 return; 195 if (__rxrpc_call_state(call) == RXRPC_CALL_CLIENT_AWAIT_REPLY) 196 return; 197 198 rtt = ns_to_ktime(call->peer->srtt_us * (1000 / 8)); 199 now = ktime_get_real(); 200 if (!ktime_before(ktime_add(call->tx_last_sent, rtt), now)) 201 return; 202 203 trace_rxrpc_reset_cwnd(call, now); 204 rxrpc_inc_stat(call->rxnet, stat_tx_data_cwnd_reset); 205 call->tx_last_sent = now; 206 call->cong_mode = RXRPC_CALL_SLOW_START; 207 call->cong_ssthresh = umax(call->cong_ssthresh, call->cong_cwnd * 3 / 4); 208 call->cong_cwnd = umax(call->cong_cwnd / 2, RXRPC_MIN_CWND); 209 } 210 211 /* 212 * Apply a hard ACK by advancing the Tx window. 213 */ 214 static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to, 215 struct rxrpc_ack_summary *summary) 216 { 217 struct rxrpc_txqueue *tq = call->tx_queue; 218 rxrpc_seq_t seq = call->tx_bottom + 1; 219 bool rot_last = false; 220 221 _enter("%x,%x,%x", call->tx_bottom, call->acks_hard_ack, to); 222 223 trace_rxrpc_tx_rotate(call, seq, to); 224 trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate); 225 226 /* We may have a left over fully-consumed buffer at the front that we 227 * couldn't drop before (rotate_and_keep below). 228 */ 229 if (seq == call->tx_qbase + RXRPC_NR_TXQUEUE) { 230 call->tx_qbase += RXRPC_NR_TXQUEUE; 231 call->tx_queue = tq->next; 232 trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free); 233 kfree(tq); 234 tq = call->tx_queue; 235 } 236 237 do { 238 unsigned int ix = seq - call->tx_qbase; 239 240 _debug("tq=%x seq=%x i=%d f=%x", tq->qbase, seq, ix, tq->bufs[ix]->flags); 241 if (tq->bufs[ix]->flags & RXRPC_LAST_PACKET) { 242 set_bit(RXRPC_CALL_TX_LAST, &call->flags); 243 rot_last = true; 244 } 245 rxrpc_put_txbuf(tq->bufs[ix], rxrpc_txbuf_put_rotated); 246 tq->bufs[ix] = NULL; 247 248 WRITE_ONCE(call->tx_bottom, seq); 249 WRITE_ONCE(call->acks_hard_ack, seq); 250 trace_rxrpc_txqueue(call, (rot_last ? 251 rxrpc_txqueue_rotate_last : 252 rxrpc_txqueue_rotate)); 253 254 seq++; 255 if (!(seq & RXRPC_TXQ_MASK)) { 256 prefetch(tq->next); 257 if (tq != call->tx_qtail) { 258 call->tx_qbase += RXRPC_NR_TXQUEUE; 259 call->tx_queue = tq->next; 260 trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free); 261 kfree(tq); 262 tq = call->tx_queue; 263 } else { 264 trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_keep); 265 tq = NULL; 266 break; 267 } 268 } 269 270 } while (before_eq(seq, to)); 271 272 if (rot_last) { 273 set_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags); 274 if (tq) { 275 trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free); 276 kfree(tq); 277 call->tx_queue = NULL; 278 } 279 } 280 281 _debug("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last); 282 283 if (call->acks_lowest_nak == call->acks_hard_ack) { 284 call->acks_lowest_nak = to; 285 } else if (after(to, call->acks_lowest_nak)) { 286 summary->new_low_nack = true; 287 call->acks_lowest_nak = to; 288 } 289 290 wake_up(&call->waitq); 291 return rot_last; 292 } 293 294 /* 295 * End the transmission phase of a call. 296 * 297 * This occurs when we get an ACKALL packet, the first DATA packet of a reply, 298 * or a final ACK packet. 299 */ 300 static void rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun, 301 enum rxrpc_abort_reason abort_why) 302 { 303 ASSERT(test_bit(RXRPC_CALL_TX_LAST, &call->flags)); 304 305 call->resend_at = KTIME_MAX; 306 trace_rxrpc_timer_can(call, rxrpc_timer_trace_resend); 307 308 if (unlikely(call->cong_last_nack)) { 309 rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack); 310 call->cong_last_nack = NULL; 311 } 312 313 switch (__rxrpc_call_state(call)) { 314 case RXRPC_CALL_CLIENT_SEND_REQUEST: 315 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 316 if (reply_begun) { 317 rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_RECV_REPLY); 318 trace_rxrpc_txqueue(call, rxrpc_txqueue_end); 319 break; 320 } 321 322 rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_REPLY); 323 trace_rxrpc_txqueue(call, rxrpc_txqueue_await_reply); 324 break; 325 326 case RXRPC_CALL_SERVER_AWAIT_ACK: 327 rxrpc_call_completed(call); 328 trace_rxrpc_txqueue(call, rxrpc_txqueue_end); 329 break; 330 331 default: 332 kdebug("end_tx %s", rxrpc_call_states[__rxrpc_call_state(call)]); 333 rxrpc_proto_abort(call, call->tx_top, abort_why); 334 break; 335 } 336 } 337 338 /* 339 * Begin the reply reception phase of a call. 340 */ 341 static bool rxrpc_receiving_reply(struct rxrpc_call *call) 342 { 343 struct rxrpc_ack_summary summary = { 0 }; 344 rxrpc_seq_t top = READ_ONCE(call->tx_top); 345 346 if (call->ackr_reason) { 347 call->delay_ack_at = KTIME_MAX; 348 trace_rxrpc_timer_can(call, rxrpc_timer_trace_delayed_ack); 349 } 350 351 if (!test_bit(RXRPC_CALL_TX_LAST, &call->flags)) { 352 if (!rxrpc_rotate_tx_window(call, top, &summary)) { 353 rxrpc_proto_abort(call, top, rxrpc_eproto_early_reply); 354 return false; 355 } 356 } 357 358 rxrpc_end_tx_phase(call, true, rxrpc_eproto_unexpected_reply); 359 return true; 360 } 361 362 /* 363 * End the packet reception phase. 364 */ 365 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) 366 { 367 rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq); 368 369 _enter("%d,%s", call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)]); 370 371 trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh); 372 373 switch (__rxrpc_call_state(call)) { 374 case RXRPC_CALL_CLIENT_RECV_REPLY: 375 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack); 376 rxrpc_call_completed(call); 377 break; 378 379 case RXRPC_CALL_SERVER_RECV_REQUEST: 380 rxrpc_set_call_state(call, RXRPC_CALL_SERVER_ACK_REQUEST); 381 call->expect_req_by = KTIME_MAX; 382 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_processing_op); 383 break; 384 385 default: 386 break; 387 } 388 } 389 390 static void rxrpc_input_update_ack_window(struct rxrpc_call *call, 391 rxrpc_seq_t window, rxrpc_seq_t wtop) 392 { 393 call->ackr_window = window; 394 call->ackr_wtop = wtop; 395 } 396 397 /* 398 * Push a DATA packet onto the Rx queue. 399 */ 400 static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb, 401 rxrpc_seq_t window, rxrpc_seq_t wtop, 402 enum rxrpc_receive_trace why) 403 { 404 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 405 bool last = sp->hdr.flags & RXRPC_LAST_PACKET; 406 407 __skb_queue_tail(&call->recvmsg_queue, skb); 408 rxrpc_input_update_ack_window(call, window, wtop); 409 trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq); 410 if (last) 411 rxrpc_end_rx_phase(call, sp->hdr.serial); 412 } 413 414 /* 415 * Process a DATA packet. 416 */ 417 static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb, 418 bool *_notify, rxrpc_serial_t *_ack_serial, int *_ack_reason) 419 { 420 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 421 struct sk_buff *oos; 422 rxrpc_serial_t serial = sp->hdr.serial; 423 unsigned int sack = call->ackr_sack_base; 424 rxrpc_seq_t window = call->ackr_window; 425 rxrpc_seq_t wtop = call->ackr_wtop; 426 rxrpc_seq_t wlimit = window + call->rx_winsize - 1; 427 rxrpc_seq_t seq = sp->hdr.seq; 428 bool last = sp->hdr.flags & RXRPC_LAST_PACKET; 429 int ack_reason = -1; 430 431 rxrpc_inc_stat(call->rxnet, stat_rx_data); 432 if (sp->hdr.flags & RXRPC_REQUEST_ACK) 433 rxrpc_inc_stat(call->rxnet, stat_rx_data_reqack); 434 if (sp->hdr.flags & RXRPC_JUMBO_PACKET) 435 rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo); 436 437 if (last) { 438 if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) && 439 seq + 1 != wtop) 440 return rxrpc_proto_abort(call, seq, rxrpc_eproto_different_last); 441 } else { 442 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && 443 after_eq(seq, wtop)) { 444 pr_warn("Packet beyond last: c=%x q=%x window=%x-%x wlimit=%x\n", 445 call->debug_id, seq, window, wtop, wlimit); 446 return rxrpc_proto_abort(call, seq, rxrpc_eproto_data_after_last); 447 } 448 } 449 450 if (after(seq, call->rx_highest_seq)) 451 call->rx_highest_seq = seq; 452 453 trace_rxrpc_rx_data(call->debug_id, seq, serial, sp->hdr.flags); 454 455 if (before(seq, window)) { 456 ack_reason = RXRPC_ACK_DUPLICATE; 457 goto send_ack; 458 } 459 if (after(seq, wlimit)) { 460 ack_reason = RXRPC_ACK_EXCEEDS_WINDOW; 461 goto send_ack; 462 } 463 464 /* Queue the packet. */ 465 if (seq == window) { 466 if (sp->hdr.flags & RXRPC_REQUEST_ACK) 467 ack_reason = RXRPC_ACK_REQUESTED; 468 /* Send an immediate ACK if we fill in a hole */ 469 else if (!skb_queue_empty(&call->rx_oos_queue)) 470 ack_reason = RXRPC_ACK_DELAY; 471 472 window++; 473 if (after(window, wtop)) { 474 trace_rxrpc_sack(call, seq, sack, rxrpc_sack_none); 475 wtop = window; 476 } else { 477 trace_rxrpc_sack(call, seq, sack, rxrpc_sack_advance); 478 sack = (sack + 1) % RXRPC_SACK_SIZE; 479 } 480 481 482 rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg); 483 484 spin_lock(&call->recvmsg_queue.lock); 485 rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue); 486 *_notify = true; 487 488 while ((oos = skb_peek(&call->rx_oos_queue))) { 489 struct rxrpc_skb_priv *osp = rxrpc_skb(oos); 490 491 if (after(osp->hdr.seq, window)) 492 break; 493 494 __skb_unlink(oos, &call->rx_oos_queue); 495 last = osp->hdr.flags & RXRPC_LAST_PACKET; 496 seq = osp->hdr.seq; 497 call->ackr_sack_table[sack] = 0; 498 trace_rxrpc_sack(call, seq, sack, rxrpc_sack_fill); 499 sack = (sack + 1) % RXRPC_SACK_SIZE; 500 501 window++; 502 rxrpc_input_queue_data(call, oos, window, wtop, 503 rxrpc_receive_queue_oos); 504 } 505 506 spin_unlock(&call->recvmsg_queue.lock); 507 508 call->ackr_sack_base = sack; 509 } else { 510 unsigned int slot; 511 512 ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE; 513 514 slot = seq - window; 515 sack = (sack + slot) % RXRPC_SACK_SIZE; 516 517 if (call->ackr_sack_table[sack % RXRPC_SACK_SIZE]) { 518 ack_reason = RXRPC_ACK_DUPLICATE; 519 goto send_ack; 520 } 521 522 call->ackr_sack_table[sack % RXRPC_SACK_SIZE] |= 1; 523 trace_rxrpc_sack(call, seq, sack, rxrpc_sack_oos); 524 525 if (after(seq + 1, wtop)) { 526 wtop = seq + 1; 527 rxrpc_input_update_ack_window(call, window, wtop); 528 } 529 530 skb_queue_walk(&call->rx_oos_queue, oos) { 531 struct rxrpc_skb_priv *osp = rxrpc_skb(oos); 532 533 if (after(osp->hdr.seq, seq)) { 534 rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos); 535 __skb_queue_before(&call->rx_oos_queue, oos, skb); 536 goto oos_queued; 537 } 538 } 539 540 rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos); 541 __skb_queue_tail(&call->rx_oos_queue, skb); 542 oos_queued: 543 trace_rxrpc_receive(call, last ? rxrpc_receive_oos_last : rxrpc_receive_oos, 544 sp->hdr.serial, sp->hdr.seq); 545 } 546 547 send_ack: 548 if (ack_reason >= 0) { 549 if (rxrpc_ack_priority[ack_reason] > rxrpc_ack_priority[*_ack_reason]) { 550 *_ack_serial = serial; 551 *_ack_reason = ack_reason; 552 } else if (rxrpc_ack_priority[ack_reason] == rxrpc_ack_priority[*_ack_reason] && 553 ack_reason == RXRPC_ACK_REQUESTED) { 554 *_ack_serial = serial; 555 *_ack_reason = ack_reason; 556 } 557 } 558 } 559 560 /* 561 * Split a jumbo packet and file the bits separately. 562 */ 563 static bool rxrpc_input_split_jumbo(struct rxrpc_call *call, struct sk_buff *skb) 564 { 565 struct rxrpc_jumbo_header jhdr; 566 struct rxrpc_skb_priv *sp = rxrpc_skb(skb), *jsp; 567 struct sk_buff *jskb; 568 rxrpc_serial_t ack_serial = 0; 569 unsigned int offset = sizeof(struct rxrpc_wire_header); 570 unsigned int len = skb->len - offset; 571 bool notify = false; 572 int ack_reason = 0; 573 574 while (sp->hdr.flags & RXRPC_JUMBO_PACKET) { 575 if (len < RXRPC_JUMBO_SUBPKTLEN) 576 goto protocol_error; 577 if (sp->hdr.flags & RXRPC_LAST_PACKET) 578 goto protocol_error; 579 if (skb_copy_bits(skb, offset + RXRPC_JUMBO_DATALEN, 580 &jhdr, sizeof(jhdr)) < 0) 581 goto protocol_error; 582 583 jskb = skb_clone(skb, GFP_NOFS); 584 if (!jskb) { 585 kdebug("couldn't clone"); 586 return false; 587 } 588 rxrpc_new_skb(jskb, rxrpc_skb_new_jumbo_subpacket); 589 jsp = rxrpc_skb(jskb); 590 jsp->offset = offset; 591 jsp->len = RXRPC_JUMBO_DATALEN; 592 rxrpc_input_data_one(call, jskb, ¬ify, &ack_serial, &ack_reason); 593 rxrpc_free_skb(jskb, rxrpc_skb_put_jumbo_subpacket); 594 595 sp->hdr.flags = jhdr.flags; 596 sp->hdr._rsvd = ntohs(jhdr._rsvd); 597 sp->hdr.seq++; 598 sp->hdr.serial++; 599 offset += RXRPC_JUMBO_SUBPKTLEN; 600 len -= RXRPC_JUMBO_SUBPKTLEN; 601 } 602 603 sp->offset = offset; 604 sp->len = len; 605 rxrpc_input_data_one(call, skb, ¬ify, &ack_serial, &ack_reason); 606 607 if (ack_reason > 0) { 608 rxrpc_send_ACK(call, ack_reason, ack_serial, 609 rxrpc_propose_ack_input_data); 610 } else { 611 call->ackr_nr_unacked++; 612 rxrpc_propose_delay_ACK(call, sp->hdr.serial, 613 rxrpc_propose_ack_input_data); 614 } 615 if (notify) { 616 trace_rxrpc_notify_socket(call->debug_id, sp->hdr.serial); 617 rxrpc_notify_socket(call); 618 } 619 return true; 620 621 protocol_error: 622 return false; 623 } 624 625 /* 626 * Process a DATA packet, adding the packet to the Rx ring. The caller's 627 * packet ref must be passed on or discarded. 628 */ 629 static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) 630 { 631 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 632 rxrpc_serial_t serial = sp->hdr.serial; 633 rxrpc_seq_t seq0 = sp->hdr.seq; 634 635 _enter("{%x,%x,%x},{%u,%x}", 636 call->ackr_window, call->ackr_wtop, call->rx_highest_seq, 637 skb->len, seq0); 638 639 if (__rxrpc_call_is_complete(call)) 640 return; 641 642 switch (__rxrpc_call_state(call)) { 643 case RXRPC_CALL_CLIENT_SEND_REQUEST: 644 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 645 /* Received data implicitly ACKs all of the request 646 * packets we sent when we're acting as a client. 647 */ 648 if (!rxrpc_receiving_reply(call)) 649 goto out_notify; 650 break; 651 652 case RXRPC_CALL_SERVER_RECV_REQUEST: { 653 unsigned long timo = READ_ONCE(call->next_req_timo); 654 655 if (timo) { 656 ktime_t delay = ms_to_ktime(timo); 657 658 call->expect_req_by = ktime_add(ktime_get_real(), delay); 659 trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_idle); 660 } 661 break; 662 } 663 664 default: 665 break; 666 } 667 668 if (!rxrpc_input_split_jumbo(call, skb)) { 669 rxrpc_proto_abort(call, sp->hdr.seq, rxrpc_badmsg_bad_jumbo); 670 goto out_notify; 671 } 672 return; 673 674 out_notify: 675 trace_rxrpc_notify_socket(call->debug_id, serial); 676 rxrpc_notify_socket(call); 677 _leave(" [queued]"); 678 } 679 680 /* 681 * See if there's a cached RTT probe to complete. 682 */ 683 static void rxrpc_complete_rtt_probe(struct rxrpc_call *call, 684 ktime_t resp_time, 685 rxrpc_serial_t acked_serial, 686 rxrpc_serial_t ack_serial, 687 enum rxrpc_rtt_rx_trace type) 688 { 689 rxrpc_serial_t orig_serial; 690 unsigned long avail; 691 ktime_t sent_at; 692 bool matched = false; 693 int i; 694 695 avail = READ_ONCE(call->rtt_avail); 696 smp_rmb(); /* Read avail bits before accessing data. */ 697 698 for (i = 0; i < ARRAY_SIZE(call->rtt_serial); i++) { 699 if (!test_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &avail)) 700 continue; 701 702 sent_at = call->rtt_sent_at[i]; 703 orig_serial = call->rtt_serial[i]; 704 705 if (orig_serial == acked_serial) { 706 clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail); 707 smp_mb(); /* Read data before setting avail bit */ 708 set_bit(i, &call->rtt_avail); 709 rxrpc_peer_add_rtt(call, type, i, acked_serial, ack_serial, 710 sent_at, resp_time); 711 matched = true; 712 } 713 714 /* If a later serial is being acked, then mark this slot as 715 * being available. 716 */ 717 if (after(acked_serial, orig_serial)) { 718 trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_obsolete, i, 719 orig_serial, acked_serial, 0, 0); 720 clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail); 721 smp_wmb(); 722 set_bit(i, &call->rtt_avail); 723 } 724 } 725 726 if (!matched) 727 trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_lost, 9, 0, acked_serial, 0, 0); 728 } 729 730 /* 731 * Process the extra information that may be appended to an ACK packet 732 */ 733 static void rxrpc_input_ack_trailer(struct rxrpc_call *call, struct sk_buff *skb, 734 struct rxrpc_acktrailer *trailer) 735 { 736 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 737 struct rxrpc_peer *peer = call->peer; 738 unsigned int max_data, capacity; 739 bool wake = false; 740 u32 max_mtu = ntohl(trailer->maxMTU); 741 //u32 if_mtu = ntohl(trailer->ifMTU); 742 u32 rwind = ntohl(trailer->rwind); 743 u32 jumbo_max = ntohl(trailer->jumbo_max); 744 745 if (rwind > RXRPC_TX_MAX_WINDOW) 746 rwind = RXRPC_TX_MAX_WINDOW; 747 if (call->tx_winsize != rwind) { 748 if (rwind > call->tx_winsize) 749 wake = true; 750 trace_rxrpc_rx_rwind_change(call, sp->hdr.serial, rwind, wake); 751 call->tx_winsize = rwind; 752 } 753 754 max_mtu = clamp(max_mtu, 500, 65535); 755 peer->ackr_max_data = max_mtu; 756 757 if (max_mtu < peer->max_data) { 758 trace_rxrpc_pmtud_reduce(peer, sp->hdr.serial, max_mtu, 759 rxrpc_pmtud_reduce_ack); 760 write_seqcount_begin(&peer->mtu_lock); 761 peer->max_data = max_mtu; 762 write_seqcount_end(&peer->mtu_lock); 763 } 764 765 max_data = umin(max_mtu, peer->max_data); 766 capacity = max_data; 767 capacity += sizeof(struct rxrpc_jumbo_header); /* First subpacket has main hdr, not jumbo */ 768 capacity /= sizeof(struct rxrpc_jumbo_header) + RXRPC_JUMBO_DATALEN; 769 770 if (jumbo_max == 0) { 771 /* The peer says it supports pmtu discovery */ 772 peer->ackr_adv_pmtud = true; 773 } else { 774 peer->ackr_adv_pmtud = false; 775 } 776 777 if (wake) 778 wake_up(&call->waitq); 779 } 780 781 /* 782 * Determine how many nacks from the previous ACK have now been satisfied. 783 */ 784 static rxrpc_seq_t rxrpc_input_check_prev_ack(struct rxrpc_call *call, 785 struct rxrpc_ack_summary *summary, 786 rxrpc_seq_t seq) 787 { 788 struct sk_buff *skb = call->cong_last_nack; 789 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 790 unsigned int i, new_acks = 0, retained_nacks = 0; 791 rxrpc_seq_t old_seq = sp->ack.first_ack; 792 u8 *acks = skb->data + sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket); 793 794 if (after_eq(seq, old_seq + sp->ack.nr_acks)) { 795 summary->nr_new_acks += sp->ack.nr_nacks; 796 summary->nr_new_acks += seq - (old_seq + sp->ack.nr_acks); 797 summary->nr_retained_nacks = 0; 798 } else if (seq == old_seq) { 799 summary->nr_retained_nacks = sp->ack.nr_nacks; 800 } else { 801 for (i = 0; i < sp->ack.nr_acks; i++) { 802 if (acks[i] == RXRPC_ACK_TYPE_NACK) { 803 if (before(old_seq + i, seq)) 804 new_acks++; 805 else 806 retained_nacks++; 807 } 808 } 809 810 summary->nr_new_acks += new_acks; 811 summary->nr_retained_nacks = retained_nacks; 812 } 813 814 return old_seq + sp->ack.nr_acks; 815 } 816 817 /* 818 * Process individual soft ACKs. 819 * 820 * Each ACK in the array corresponds to one packet and can be either an ACK or 821 * a NAK. If we get find an explicitly NAK'd packet we resend immediately; 822 * packets that lie beyond the end of the ACK list are scheduled for resend by 823 * the timer on the basis that the peer might just not have processed them at 824 * the time the ACK was sent. 825 */ 826 static void rxrpc_input_soft_acks(struct rxrpc_call *call, 827 struct rxrpc_ack_summary *summary, 828 struct sk_buff *skb, 829 rxrpc_seq_t seq, 830 rxrpc_seq_t since) 831 { 832 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 833 unsigned int i, old_nacks = 0; 834 rxrpc_seq_t lowest_nak = seq + sp->ack.nr_acks; 835 u8 *acks = skb->data + sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket); 836 837 for (i = 0; i < sp->ack.nr_acks; i++) { 838 if (acks[i] == RXRPC_ACK_TYPE_ACK) { 839 summary->nr_acks++; 840 if (after_eq(seq, since)) 841 summary->nr_new_acks++; 842 } else { 843 summary->saw_nacks = true; 844 if (before(seq, since)) { 845 /* Overlap with previous ACK */ 846 old_nacks++; 847 } else { 848 summary->nr_new_nacks++; 849 sp->ack.nr_nacks++; 850 } 851 852 if (before(seq, lowest_nak)) 853 lowest_nak = seq; 854 } 855 seq++; 856 } 857 858 if (lowest_nak != call->acks_lowest_nak) { 859 call->acks_lowest_nak = lowest_nak; 860 summary->new_low_nack = true; 861 } 862 863 /* We *can* have more nacks than we did - the peer is permitted to drop 864 * packets it has soft-acked and re-request them. Further, it is 865 * possible for the nack distribution to change whilst the number of 866 * nacks stays the same or goes down. 867 */ 868 if (old_nacks < summary->nr_retained_nacks) 869 summary->nr_new_acks += summary->nr_retained_nacks - old_nacks; 870 summary->nr_retained_nacks = old_nacks; 871 } 872 873 /* 874 * Return true if the ACK is valid - ie. it doesn't appear to have regressed 875 * with respect to the ack state conveyed by preceding ACKs. 876 */ 877 static bool rxrpc_is_ack_valid(struct rxrpc_call *call, 878 rxrpc_seq_t first_pkt, rxrpc_seq_t prev_pkt) 879 { 880 rxrpc_seq_t base = READ_ONCE(call->acks_first_seq); 881 882 if (after(first_pkt, base)) 883 return true; /* The window advanced */ 884 885 if (before(first_pkt, base)) 886 return false; /* firstPacket regressed */ 887 888 if (after_eq(prev_pkt, call->acks_prev_seq)) 889 return true; /* previousPacket hasn't regressed. */ 890 891 /* Some rx implementations put a serial number in previousPacket. */ 892 if (after_eq(prev_pkt, base + call->tx_winsize)) 893 return false; 894 return true; 895 } 896 897 /* 898 * Process an ACK packet. 899 * 900 * ack.firstPacket is the sequence number of the first soft-ACK'd/NAK'd packet 901 * in the ACK array. Anything before that is hard-ACK'd and may be discarded. 902 * 903 * A hard-ACK means that a packet has been processed and may be discarded; a 904 * soft-ACK means that the packet may be discarded and retransmission 905 * requested. A phase is complete when all packets are hard-ACK'd. 906 */ 907 static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) 908 { 909 struct rxrpc_ack_summary summary = { 0 }; 910 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 911 struct rxrpc_acktrailer trailer; 912 rxrpc_serial_t ack_serial, acked_serial; 913 rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt, since; 914 int nr_acks, offset, ioffset; 915 916 _enter(""); 917 918 offset = sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket); 919 920 ack_serial = sp->hdr.serial; 921 acked_serial = sp->ack.acked_serial; 922 first_soft_ack = sp->ack.first_ack; 923 prev_pkt = sp->ack.prev_ack; 924 nr_acks = sp->ack.nr_acks; 925 hard_ack = first_soft_ack - 1; 926 summary.ack_reason = (sp->ack.reason < RXRPC_ACK__INVALID ? 927 sp->ack.reason : RXRPC_ACK__INVALID); 928 929 trace_rxrpc_rx_ack(call, ack_serial, acked_serial, 930 first_soft_ack, prev_pkt, 931 summary.ack_reason, nr_acks); 932 rxrpc_inc_stat(call->rxnet, stat_rx_acks[summary.ack_reason]); 933 934 if (acked_serial != 0) { 935 switch (summary.ack_reason) { 936 case RXRPC_ACK_PING_RESPONSE: 937 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial, 938 rxrpc_rtt_rx_ping_response); 939 break; 940 case RXRPC_ACK_REQUESTED: 941 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial, 942 rxrpc_rtt_rx_requested_ack); 943 break; 944 default: 945 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial, 946 rxrpc_rtt_rx_other_ack); 947 break; 948 } 949 } 950 951 /* If we get an EXCEEDS_WINDOW ACK from the server, it probably 952 * indicates that the client address changed due to NAT. The server 953 * lost the call because it switched to a different peer. 954 */ 955 if (unlikely(summary.ack_reason == RXRPC_ACK_EXCEEDS_WINDOW) && 956 first_soft_ack == 1 && 957 prev_pkt == 0 && 958 rxrpc_is_client_call(call)) { 959 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, 960 0, -ENETRESET); 961 goto send_response; 962 } 963 964 /* If we get an OUT_OF_SEQUENCE ACK from the server, that can also 965 * indicate a change of address. However, we can retransmit the call 966 * if we still have it buffered to the beginning. 967 */ 968 if (unlikely(summary.ack_reason == RXRPC_ACK_OUT_OF_SEQUENCE) && 969 first_soft_ack == 1 && 970 prev_pkt == 0 && 971 call->acks_hard_ack == 0 && 972 rxrpc_is_client_call(call)) { 973 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, 974 0, -ENETRESET); 975 goto send_response; 976 } 977 978 /* Discard any out-of-order or duplicate ACKs (outside lock). */ 979 if (!rxrpc_is_ack_valid(call, first_soft_ack, prev_pkt)) { 980 trace_rxrpc_rx_discard_ack(call->debug_id, ack_serial, 981 first_soft_ack, call->acks_first_seq, 982 prev_pkt, call->acks_prev_seq); 983 goto send_response; 984 } 985 986 trailer.maxMTU = 0; 987 ioffset = offset + nr_acks + 3; 988 if (skb->len >= ioffset + sizeof(trailer) && 989 skb_copy_bits(skb, ioffset, &trailer, sizeof(trailer)) < 0) 990 return rxrpc_proto_abort(call, 0, rxrpc_badmsg_short_ack_trailer); 991 992 if (nr_acks > 0) 993 skb_condense(skb); 994 995 if (call->cong_last_nack) { 996 since = rxrpc_input_check_prev_ack(call, &summary, first_soft_ack); 997 rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack); 998 call->cong_last_nack = NULL; 999 } else { 1000 summary.nr_new_acks = first_soft_ack - call->acks_first_seq; 1001 call->acks_lowest_nak = first_soft_ack + nr_acks; 1002 since = first_soft_ack; 1003 } 1004 1005 call->acks_latest_ts = skb->tstamp; 1006 call->acks_first_seq = first_soft_ack; 1007 call->acks_prev_seq = prev_pkt; 1008 1009 switch (summary.ack_reason) { 1010 case RXRPC_ACK_PING: 1011 break; 1012 default: 1013 if (acked_serial && after(acked_serial, call->acks_highest_serial)) 1014 call->acks_highest_serial = acked_serial; 1015 break; 1016 } 1017 1018 /* Parse rwind and mtu sizes if provided. */ 1019 if (trailer.maxMTU) 1020 rxrpc_input_ack_trailer(call, skb, &trailer); 1021 1022 if (first_soft_ack == 0) 1023 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_zero); 1024 1025 /* Ignore ACKs unless we are or have just been transmitting. */ 1026 switch (__rxrpc_call_state(call)) { 1027 case RXRPC_CALL_CLIENT_SEND_REQUEST: 1028 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 1029 case RXRPC_CALL_SERVER_SEND_REPLY: 1030 case RXRPC_CALL_SERVER_AWAIT_ACK: 1031 break; 1032 default: 1033 goto send_response; 1034 } 1035 1036 if (before(hard_ack, call->acks_hard_ack) || 1037 after(hard_ack, call->tx_top)) 1038 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_outside_window); 1039 if (nr_acks > call->tx_top - hard_ack) 1040 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_sack_overflow); 1041 1042 if (after(hard_ack, call->acks_hard_ack)) { 1043 if (rxrpc_rotate_tx_window(call, hard_ack, &summary)) { 1044 rxrpc_end_tx_phase(call, false, rxrpc_eproto_unexpected_ack); 1045 goto send_response; 1046 } 1047 } 1048 1049 if (nr_acks > 0) { 1050 if (offset > (int)skb->len - nr_acks) 1051 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_short_sack); 1052 rxrpc_input_soft_acks(call, &summary, skb, first_soft_ack, since); 1053 rxrpc_get_skb(skb, rxrpc_skb_get_last_nack); 1054 call->cong_last_nack = skb; 1055 } 1056 1057 if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) && 1058 summary.nr_acks == call->tx_top - hard_ack && 1059 rxrpc_is_client_call(call)) 1060 rxrpc_propose_ping(call, ack_serial, 1061 rxrpc_propose_ack_ping_for_lost_reply); 1062 1063 rxrpc_congestion_management(call, skb, &summary, acked_serial); 1064 1065 send_response: 1066 if (summary.ack_reason == RXRPC_ACK_PING) 1067 rxrpc_send_ACK(call, RXRPC_ACK_PING_RESPONSE, ack_serial, 1068 rxrpc_propose_ack_respond_to_ping); 1069 else if (sp->hdr.flags & RXRPC_REQUEST_ACK) 1070 rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, ack_serial, 1071 rxrpc_propose_ack_respond_to_ack); 1072 } 1073 1074 /* 1075 * Process an ACKALL packet. 1076 */ 1077 static void rxrpc_input_ackall(struct rxrpc_call *call, struct sk_buff *skb) 1078 { 1079 struct rxrpc_ack_summary summary = { 0 }; 1080 1081 if (rxrpc_rotate_tx_window(call, call->tx_top, &summary)) 1082 rxrpc_end_tx_phase(call, false, rxrpc_eproto_unexpected_ackall); 1083 } 1084 1085 /* 1086 * Process an ABORT packet directed at a call. 1087 */ 1088 static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb) 1089 { 1090 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 1091 1092 trace_rxrpc_rx_abort(call, sp->hdr.serial, skb->priority); 1093 1094 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, 1095 skb->priority, -ECONNABORTED); 1096 } 1097 1098 /* 1099 * Process an incoming call packet. 1100 */ 1101 void rxrpc_input_call_packet(struct rxrpc_call *call, struct sk_buff *skb) 1102 { 1103 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 1104 unsigned long timo; 1105 1106 _enter("%p,%p", call, skb); 1107 1108 if (sp->hdr.serviceId != call->dest_srx.srx_service) 1109 call->dest_srx.srx_service = sp->hdr.serviceId; 1110 if ((int)sp->hdr.serial - (int)call->rx_serial > 0) 1111 call->rx_serial = sp->hdr.serial; 1112 if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags)) 1113 set_bit(RXRPC_CALL_RX_HEARD, &call->flags); 1114 1115 timo = READ_ONCE(call->next_rx_timo); 1116 if (timo) { 1117 ktime_t delay = ms_to_ktime(timo); 1118 1119 call->expect_rx_by = ktime_add(ktime_get_real(), delay); 1120 trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_expect_rx); 1121 } 1122 1123 switch (sp->hdr.type) { 1124 case RXRPC_PACKET_TYPE_DATA: 1125 return rxrpc_input_data(call, skb); 1126 1127 case RXRPC_PACKET_TYPE_ACK: 1128 return rxrpc_input_ack(call, skb); 1129 1130 case RXRPC_PACKET_TYPE_BUSY: 1131 /* Just ignore BUSY packets from the server; the retry and 1132 * lifespan timers will take care of business. BUSY packets 1133 * from the client don't make sense. 1134 */ 1135 return; 1136 1137 case RXRPC_PACKET_TYPE_ABORT: 1138 return rxrpc_input_abort(call, skb); 1139 1140 case RXRPC_PACKET_TYPE_ACKALL: 1141 return rxrpc_input_ackall(call, skb); 1142 1143 default: 1144 break; 1145 } 1146 } 1147 1148 /* 1149 * Handle a new service call on a channel implicitly completing the preceding 1150 * call on that channel. This does not apply to client conns. 1151 * 1152 * TODO: If callNumber > call_id + 1, renegotiate security. 1153 */ 1154 void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb) 1155 { 1156 switch (__rxrpc_call_state(call)) { 1157 case RXRPC_CALL_SERVER_AWAIT_ACK: 1158 rxrpc_call_completed(call); 1159 fallthrough; 1160 case RXRPC_CALL_COMPLETE: 1161 break; 1162 default: 1163 rxrpc_abort_call(call, 0, RX_CALL_DEAD, -ESHUTDOWN, 1164 rxrpc_eproto_improper_term); 1165 trace_rxrpc_improper_term(call); 1166 break; 1167 } 1168 1169 rxrpc_input_call_event(call); 1170 } 1171