1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @filename mt_cache.cpp 22 */ 23 24 #include <stdlib.h> 25 #include <errno.h> 26 #include <sys/types.h> 27 #include <sys/socket.h> 28 #include <string.h> 29 #include "mt_incl.h" 30 #include "kqueue_proxy.h" 31 #include "micro_thread.h" 32 #include "mt_sys_hook.h" 33 #include "ff_hook.h" 34 35 #include "mt_cache.h" 36 37 namespace NS_MICRO_THREAD { 38 39 TSkBuffer* new_sk_buffer(uint32_t size) 40 { 41 uint32_t total = sizeof(TSkBuffer) + size; 42 total = (total + SK_DFLT_ALIGN_SIZE - 1) / SK_DFLT_ALIGN_SIZE * SK_DFLT_ALIGN_SIZE; 43 TSkBuffer* block = (TSkBuffer*)malloc(total); 44 if (block == NULL) 45 { 46 MTLOG_ERROR("malloc failed, no more memory[%u]", total); 47 return NULL; 48 } 49 50 block->last_time = 0; 51 block->size = size; 52 block->head = block->buff; 53 block->end = block->buff + size; 54 55 block->data = block->head; 56 block->data_len = 0; 57 58 return block; 59 } 60 61 void delete_sk_buffer(TSkBuffer* block) 62 { 63 if (NULL == block) { 64 return; 65 } 66 67 free(block); 68 } 69 70 TSkBuffer* reserve_sk_buffer(TSkBuffer* buff, uint32_t size) 71 { 72 if (NULL == buff) { 73 return new_sk_buffer(size); 74 } 75 76 if (buff->size >= size) { 77 return buff; 78 } 79 80 TSkBuffer* new_buff = new_sk_buffer(size); 81 if (NULL == new_buff) { 82 return buff; 83 } 84 memcpy(new_buff->data, buff->data, buff->data_len); 85 new_buff->data_len = buff->data_len; 86 delete_sk_buffer(buff); 87 88 return new_buff; 89 } 90 91 void sk_buffer_mng_init(TSkBuffMng* mng, uint32_t expired, uint32_t size) 92 { 93 TAILQ_INIT(&mng->free_list); 94 mng->expired = expired; 95 mng->count = 0; 96 mng->size = size; 97 } 98 99 void sk_buffer_mng_destroy(TSkBuffMng * mng) 100 { 101 TSkBuffer* item = NULL; 102 TSkBuffer* tmp = NULL; 103 TAILQ_FOREACH_SAFE(item, &mng->free_list, entry, tmp) 104 { 105 TAILQ_REMOVE(&mng->free_list, item, entry); 106 delete_sk_buffer(item); 107 } 108 mng->count = 0; 109 } 110 111 TSkBuffer* alloc_sk_buffer(TSkBuffMng* mng) 112 { 113 if (NULL == mng) { 114 return NULL; 115 } 116 117 TSkBuffer* item = TAILQ_FIRST(&mng->free_list); 118 if (item != NULL) 119 { 120 TAILQ_REMOVE(&mng->free_list, item, entry); 121 mng->count--; 122 return item; 123 } 124 125 item = new_sk_buffer(mng->size); 126 if (NULL == item) 127 { 128 return NULL; 129 } 130 131 return item; 132 } 133 134 void free_sk_buffer(TSkBuffMng* mng, TSkBuffer* buff) 135 { 136 if ((NULL == mng) || (NULL == buff)) { 137 return; 138 } 139 140 TAILQ_INSERT_TAIL(&mng->free_list, buff, entry); 141 mng->count++; 142 143 buff->last_time = (uint32_t)(mt_time_ms() / 1000); 144 buff->data = buff->head; 145 buff->data_len = 0; 146 } 147 148 void recycle_sk_buffer(TSkBuffMng* mng, uint32_t now) 149 { 150 TSkBuffer* item = NULL; 151 TSkBuffer* tmp = NULL; 152 TAILQ_FOREACH_SAFE(item, &mng->free_list, entry, tmp) 153 { 154 if ((now - item->last_time) < mng->expired) 155 { 156 break; 157 } 158 159 TAILQ_REMOVE(&mng->free_list, item, entry); 160 delete_sk_buffer(item); 161 mng->count--; 162 } 163 } 164 165 void rw_cache_init(TRWCache* cache, TSkBuffMng* pool) 166 { 167 TAILQ_INIT(&cache->list); 168 cache->len = 0; 169 cache->count = 0; 170 cache->pool = pool; 171 } 172 173 void rw_cache_destroy(TRWCache* cache) 174 { 175 if ((cache == NULL) || (cache->pool == NULL)) { 176 return; 177 } 178 179 TSkBuffer* item = NULL; 180 TSkBuffer* tmp = NULL; 181 TAILQ_FOREACH_SAFE(item, &cache->list, entry, tmp) 182 { 183 TAILQ_REMOVE(&cache->list, item, entry); 184 free_sk_buffer(cache->pool, item); 185 } 186 cache->count = 0; 187 cache->len = 0; 188 cache->pool = NULL; 189 } 190 191 uint32_t cache_copy_out(TRWCache* cache, void* buff, uint32_t len) 192 { 193 if ((cache == NULL) || (cache->pool == NULL)) { 194 return 0; 195 } 196 197 char* out_buff = (char*)buff; 198 uint32_t left = len, skip_len = 0; 199 TSkBuffer* item = NULL; 200 TSkBuffer* tmp = NULL; 201 TAILQ_FOREACH_SAFE(item, &cache->list, entry, tmp) 202 { 203 skip_len = (item->data_len > left) ? left : item->data_len; 204 if (out_buff != NULL) 205 { 206 memcpy(out_buff, item->data, skip_len); 207 out_buff += skip_len; 208 } 209 210 left -= skip_len; 211 item->data_len -= skip_len; 212 item->data += skip_len; 213 if (item->data_len > 0) 214 { 215 break; 216 } 217 218 if (cache->count > 0) { 219 cache->count--; 220 } 221 TAILQ_REMOVE(&cache->list, item, entry); 222 free_sk_buffer(cache->pool, item); 223 224 if (left == 0) 225 { 226 break; 227 } 228 } 229 230 skip_len = len - left; 231 if (cache->len > skip_len) 232 { 233 cache->len -= skip_len; 234 } 235 else 236 { 237 cache->len = 0; 238 } 239 240 return skip_len; 241 } 242 243 void cache_skip_data(TRWCache* cache, uint32_t len) 244 { 245 cache_copy_out(cache, NULL, len); 246 } 247 248 void cache_append_buffer(TRWCache* cache, TSkBuffer* buff) 249 { 250 if ((NULL == cache) || (NULL == buff)) 251 { 252 return; 253 } 254 255 TAILQ_INSERT_TAIL(&cache->list, buff, entry); 256 cache->len += buff->data_len; 257 cache->count++; 258 } 259 260 TSkBuffer* cache_skip_first_buffer(TRWCache* cache) 261 { 262 TSkBuffer* buff = TAILQ_FIRST(&cache->list); 263 if ((NULL == cache) || (NULL == buff)) 264 { 265 return NULL; 266 } 267 268 TAILQ_REMOVE(&cache->list, buff, entry); 269 if (cache->len >= buff->data_len) 270 { 271 cache->len -= buff->data_len; 272 } 273 274 if (cache->count > 0) 275 { 276 cache->count--; 277 } 278 279 return buff; 280 } 281 282 int32_t cache_append_data(TRWCache* cache, const void* data, uint32_t len) 283 { 284 if ((NULL == data) || (NULL == cache) || (NULL == cache->pool)) 285 { 286 return -1; 287 } 288 289 if (len == 0) 290 { 291 return 0; 292 } 293 294 uint32_t left = len; 295 uint32_t remain = 0; 296 297 TSkBuffer* tail = TAILQ_LAST(&cache->list, __sk_buff_list); 298 if (tail != NULL) 299 { 300 if (tail->end > (tail->data + tail->data_len)) 301 { 302 remain = tail->end - tail->data - tail->data_len; 303 } 304 305 if (remain >= len) 306 { 307 memcpy(tail->data + tail->data_len, data, len); 308 tail->data_len += len; 309 cache->len += len; 310 return (int32_t)len; 311 } 312 } 313 314 TRWCache keep_list; 315 rw_cache_init(&keep_list, cache->pool); 316 left -= remain; 317 while (left > 0) 318 { 319 TSkBuffer* item = alloc_sk_buffer(cache->pool); 320 if (item == NULL) 321 { 322 rw_cache_destroy(&keep_list); 323 return -2; 324 } 325 cache_append_buffer(&keep_list, item); 326 327 if (left <= item->size) 328 { 329 memcpy(item->head, (char*)data + len - left, left); 330 item->data_len = left; 331 break; 332 } 333 334 memcpy(item->head, (char*)data + len - left, item->size); 335 item->data_len = item->size; 336 left -= item->size; 337 } 338 339 if ((tail != NULL) && (remain > 0)) 340 { 341 memcpy(tail->data + tail->data_len, data, remain); 342 tail->data_len += remain; 343 } 344 345 cache->len += len; 346 cache->count += keep_list.count; 347 TAILQ_CONCAT(&cache->list, &keep_list.list, entry); 348 349 return (int32_t)len; 350 } 351 352 int32_t cache_udp_recv(TRWCache* cache, uint32_t fd, struct sockaddr_in* remote_addr) 353 { 354 if (NULL == cache) 355 { 356 return -1; 357 } 358 359 int32_t total = 0; 360 for (uint32_t i = 0; i < 100; i++) 361 { 362 TSkBuffer* item = alloc_sk_buffer(cache->pool); 363 if (NULL == item) 364 { 365 return -2; 366 } 367 368 socklen_t addr_len = sizeof(*remote_addr); 369 mt_hook_syscall(recvfrom); 370 int32_t rc = ff_hook_recvfrom(fd, item->data, item->size, 0, (struct sockaddr*)remote_addr, &addr_len); 371 if (rc <= 0) 372 { 373 free_sk_buffer(cache->pool, item); 374 375 if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) 376 { 377 break; 378 } 379 else 380 { 381 MTLOG_ERROR("recvfrom failed, fd[%d] ret %d[%m]", fd, rc); 382 return -3; 383 } 384 } 385 386 item->data_len += rc; 387 cache_append_buffer(cache, item); 388 total += rc; 389 } 390 391 return total; 392 } 393 394 int32_t cache_tcp_recv(TRWCache* cache, uint32_t fd) 395 { 396 if (NULL == cache) 397 { 398 return -1; 399 } 400 401 int32_t total = 0; 402 for (uint32_t i = 0; i < 100; i++) 403 { 404 TSkBuffer* item = TAILQ_LAST(&cache->list, __sk_buff_list); 405 if ((NULL == item) 406 || ((item->data_len + item->data) >= item->end)) 407 { 408 item = alloc_sk_buffer(cache->pool); 409 if (item == NULL) 410 { 411 return -2; 412 } 413 cache_append_buffer(cache, item); 414 } 415 416 uint8_t* buff = item->data + item->data_len; 417 uint32_t remain = item->end - item->data - item->data_len; 418 mt_hook_syscall(recv); 419 int32_t recvd_len = ff_hook_recv(fd, buff, remain, 0); 420 if (recvd_len == 0) 421 { 422 MTLOG_DEBUG("remote close, socket: %d", fd); 423 return -SK_ERR_NEED_CLOSE; 424 } 425 else if (recvd_len < 0) 426 { 427 if (errno == EAGAIN) 428 { 429 return total; 430 } 431 else 432 { 433 MTLOG_ERROR("recv tcp socket failed, error: %d[%m]", errno); 434 return -2; 435 } 436 } 437 else 438 { 439 item->data_len += recvd_len; 440 cache->len += recvd_len; 441 total += recvd_len; 442 if (recvd_len < (int32_t)remain) 443 { 444 return total; 445 } 446 } 447 } 448 449 return total; 450 } 451 452 int32_t cache_tcp_send(TRWCache* cache, uint32_t fd) 453 { 454 if ((NULL == cache) || (NULL == cache->pool)) 455 { 456 return -1; 457 } 458 459 if (cache->len == 0) 460 { 461 return 0; 462 } 463 464 int32_t ret = 0, total = 0; 465 TSkBuffer* item = NULL; 466 TSkBuffer* tmp = NULL; 467 TAILQ_FOREACH_SAFE(item, &cache->list, entry, tmp) 468 { 469 mt_hook_syscall(send); 470 ret = ff_hook_send(fd, item->data, item->data_len, 0); 471 if (ret < 0) 472 { 473 break; 474 } 475 476 total += ret; 477 if (ret < (int32_t)item->data_len) 478 { 479 break; 480 } 481 } 482 483 cache_skip_data(cache, total); 484 if (ret < 0) 485 { 486 if (errno != EAGAIN) 487 { 488 MTLOG_ERROR("tcp socket send failed, error: %d[%m]", errno); 489 return -2; 490 } 491 } 492 493 return total; 494 } 495 496 int32_t cache_tcp_send_buff(TRWCache* cache, uint32_t fd, const void* data, uint32_t len) 497 { 498 if ((NULL == cache) || (NULL == data)) 499 { 500 return -1; 501 } 502 503 int32_t ret = cache_tcp_send(cache, fd); 504 if (ret < 0) 505 { 506 MTLOG_ERROR("tcp socket[%d] send cache data failed, rc: %d", fd, ret); 507 return ret; 508 } 509 510 int32_t send_len = 0; 511 if (cache->len == 0) 512 { 513 mt_hook_syscall(send); 514 ret = ff_hook_send(fd, data, len, 0); 515 if (ret >= 0) 516 { 517 send_len += ret; 518 } 519 else 520 { 521 if (errno != EAGAIN) 522 { 523 MTLOG_ERROR("tcp socket[%d] send failed, error: %d[%m]", fd, errno); 524 return -2; 525 } 526 } 527 } 528 529 int32_t rc = cache_append_data(cache, (char*)data + send_len, len - send_len); 530 if (rc < 0) 531 { 532 MTLOG_ERROR("tcp socket[%d] apend data failed, rc: %d", fd, rc); 533 return -3; 534 } 535 536 return send_len; 537 } 538 539 uint32_t get_data_len(TBuffVecPtr multi) 540 { 541 TRWCache* cache = (TRWCache*)multi; 542 if (NULL == cache) { 543 return 0; 544 } else { 545 return cache->len; 546 } 547 } 548 549 uint32_t get_block_count(TBuffVecPtr multi) 550 { 551 TRWCache* cache = (TRWCache*)multi; 552 if (NULL == cache) { 553 return 0; 554 } else { 555 return cache->count; 556 } 557 } 558 559 TBuffBlockPtr get_first_block(TBuffVecPtr multi) 560 { 561 TRWCache* cache = (TRWCache*)multi; 562 if (NULL == cache) { 563 return NULL; 564 } else { 565 return (TBuffBlockPtr)TAILQ_FIRST(&cache->list); 566 } 567 } 568 569 TBuffBlockPtr get_next_block(TBuffVecPtr multi, TBuffBlockPtr block) 570 { 571 TRWCache* cache = (TRWCache*)multi; 572 TSkBuffer* item = (TSkBuffer*)block; 573 if ((NULL == cache) || (NULL == item)) 574 { 575 return NULL; 576 } 577 578 return (TBuffBlockPtr)TAILQ_NEXT(item, entry); 579 580 } 581 582 void get_block_data(TBuffBlockPtr block, const void** data, int32_t* len) 583 { 584 TSkBuffer* item = (TSkBuffer*)block; 585 if (NULL == block) 586 { 587 return; 588 } 589 590 if (data != NULL) 591 { 592 *(uint8_t**)data = item->data; 593 } 594 595 if (len != NULL) 596 { 597 *len = (int32_t)item->data_len; 598 } 599 } 600 601 uint32_t read_cache_data(TBuffVecPtr multi, void* data, uint32_t len) 602 { 603 TRWCache* cache = (TRWCache*)multi; 604 if (NULL == cache) { 605 return 0; 606 } 607 608 uint32_t left_len = len; 609 uint32_t offset = 0; 610 TSkBuffer* item = NULL; 611 TSkBuffer* tmp = NULL; 612 TAILQ_FOREACH_SAFE(item, &cache->list, entry, tmp) 613 { 614 uint32_t copy_len = 0; 615 if (left_len <= item->data_len) 616 { 617 copy_len = left_len; 618 } 619 else 620 { 621 copy_len = item->data_len; 622 } 623 624 if (data != NULL) 625 { 626 memcpy((char*)data + offset, item->data, copy_len); 627 } 628 offset += copy_len; 629 left_len -= copy_len; 630 631 if (left_len <= 0) 632 { 633 break; 634 } 635 } 636 637 return offset; 638 } 639 640 uint32_t read_cache_begin(TBuffVecPtr multi, uint32_t begin, void* data, uint32_t len) 641 { 642 TRWCache* cache = (TRWCache*)multi; 643 if (NULL == cache) { 644 return 0; 645 } 646 647 if (begin >= cache->len) { 648 return 0; 649 } 650 651 uint32_t pos_left = begin; 652 uint32_t copy_left = len; 653 uint32_t offset = 0; 654 TSkBuffer* item = NULL; 655 TAILQ_FOREACH(item, &cache->list, entry) 656 { 657 uint8_t* start_ptr = item->data; 658 uint32_t real_left = item->data_len; 659 if (pos_left > 0) 660 { 661 uint32_t skip_len = pos_left > real_left ? real_left : pos_left; 662 pos_left -= skip_len; 663 real_left -= skip_len; 664 start_ptr += skip_len; 665 } 666 667 if (real_left == 0) 668 { 669 continue; 670 } 671 672 uint32_t copy_len = copy_left > real_left ? real_left : copy_left; 673 if (data != NULL) 674 { 675 memcpy((char*)data + offset, start_ptr, copy_len); 676 } 677 offset += copy_len; 678 copy_left -= copy_len; 679 if (copy_left == 0) 680 { 681 break; 682 } 683 } 684 685 return offset; 686 } 687 688 }; 689