1
2 /**
3 * Tencent is pleased to support the open source community by making MSEC available.
4 *
5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6 *
7 * Licensed under the GNU General Public License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License. You may
9 * obtain a copy of the License at
10 *
11 * https://opensource.org/licenses/GPL-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software distributed under the
14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15 * either express or implied. See the License for the specific language governing permissions
16 * and limitations under the License.
17 */
18
19
20 /**
21 * @filename mt_cache.cpp
22 */
23
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <string.h>
29 #include "mt_incl.h"
30 #include "kqueue_proxy.h"
31 #include "micro_thread.h"
32 #include "mt_sys_hook.h"
33 #include "ff_hook.h"
34
35 #include "mt_cache.h"
36
37 namespace NS_MICRO_THREAD {
38
new_sk_buffer(uint32_t size)39 TSkBuffer* new_sk_buffer(uint32_t size)
40 {
41 uint32_t total = sizeof(TSkBuffer) + size;
42 total = (total + SK_DFLT_ALIGN_SIZE - 1) / SK_DFLT_ALIGN_SIZE * SK_DFLT_ALIGN_SIZE;
43 TSkBuffer* block = (TSkBuffer*)malloc(total);
44 if (block == NULL)
45 {
46 MTLOG_ERROR("malloc failed, no more memory[%u]", total);
47 return NULL;
48 }
49
50 block->last_time = 0;
51 block->size = size;
52 block->head = block->buff;
53 block->end = block->buff + size;
54
55 block->data = block->head;
56 block->data_len = 0;
57
58 return block;
59 }
60
delete_sk_buffer(TSkBuffer * block)61 void delete_sk_buffer(TSkBuffer* block)
62 {
63 if (NULL == block) {
64 return;
65 }
66
67 free(block);
68 }
69
reserve_sk_buffer(TSkBuffer * buff,uint32_t size)70 TSkBuffer* reserve_sk_buffer(TSkBuffer* buff, uint32_t size)
71 {
72 if (NULL == buff) {
73 return new_sk_buffer(size);
74 }
75
76 if (buff->size >= size) {
77 return buff;
78 }
79
80 TSkBuffer* new_buff = new_sk_buffer(size);
81 if (NULL == new_buff) {
82 return buff;
83 }
84 memcpy(new_buff->data, buff->data, buff->data_len);
85 new_buff->data_len = buff->data_len;
86 delete_sk_buffer(buff);
87
88 return new_buff;
89 }
90
sk_buffer_mng_init(TSkBuffMng * mng,uint32_t expired,uint32_t size)91 void sk_buffer_mng_init(TSkBuffMng* mng, uint32_t expired, uint32_t size)
92 {
93 TAILQ_INIT(&mng->free_list);
94 mng->expired = expired;
95 mng->count = 0;
96 mng->size = size;
97 }
98
sk_buffer_mng_destroy(TSkBuffMng * mng)99 void sk_buffer_mng_destroy(TSkBuffMng * mng)
100 {
101 TSkBuffer* item = NULL;
102 TSkBuffer* tmp = NULL;
103 TAILQ_FOREACH_SAFE(item, &mng->free_list, entry, tmp)
104 {
105 TAILQ_REMOVE(&mng->free_list, item, entry);
106 delete_sk_buffer(item);
107 }
108 mng->count = 0;
109 }
110
alloc_sk_buffer(TSkBuffMng * mng)111 TSkBuffer* alloc_sk_buffer(TSkBuffMng* mng)
112 {
113 if (NULL == mng) {
114 return NULL;
115 }
116
117 TSkBuffer* item = TAILQ_FIRST(&mng->free_list);
118 if (item != NULL)
119 {
120 TAILQ_REMOVE(&mng->free_list, item, entry);
121 mng->count--;
122 return item;
123 }
124
125 item = new_sk_buffer(mng->size);
126 if (NULL == item)
127 {
128 return NULL;
129 }
130
131 return item;
132 }
133
free_sk_buffer(TSkBuffMng * mng,TSkBuffer * buff)134 void free_sk_buffer(TSkBuffMng* mng, TSkBuffer* buff)
135 {
136 if ((NULL == mng) || (NULL == buff)) {
137 return;
138 }
139
140 TAILQ_INSERT_TAIL(&mng->free_list, buff, entry);
141 mng->count++;
142
143 buff->last_time = (uint32_t)(mt_time_ms() / 1000);
144 buff->data = buff->head;
145 buff->data_len = 0;
146 }
147
recycle_sk_buffer(TSkBuffMng * mng,uint32_t now)148 void recycle_sk_buffer(TSkBuffMng* mng, uint32_t now)
149 {
150 TSkBuffer* item = NULL;
151 TSkBuffer* tmp = NULL;
152 TAILQ_FOREACH_SAFE(item, &mng->free_list, entry, tmp)
153 {
154 if ((now - item->last_time) < mng->expired)
155 {
156 break;
157 }
158
159 TAILQ_REMOVE(&mng->free_list, item, entry);
160 delete_sk_buffer(item);
161 mng->count--;
162 }
163 }
164
rw_cache_init(TRWCache * cache,TSkBuffMng * pool)165 void rw_cache_init(TRWCache* cache, TSkBuffMng* pool)
166 {
167 TAILQ_INIT(&cache->list);
168 cache->len = 0;
169 cache->count = 0;
170 cache->pool = pool;
171 }
172
rw_cache_destroy(TRWCache * cache)173 void rw_cache_destroy(TRWCache* cache)
174 {
175 if ((cache == NULL) || (cache->pool == NULL)) {
176 return;
177 }
178
179 TSkBuffer* item = NULL;
180 TSkBuffer* tmp = NULL;
181 TAILQ_FOREACH_SAFE(item, &cache->list, entry, tmp)
182 {
183 TAILQ_REMOVE(&cache->list, item, entry);
184 free_sk_buffer(cache->pool, item);
185 }
186 cache->count = 0;
187 cache->len = 0;
188 cache->pool = NULL;
189 }
190
cache_copy_out(TRWCache * cache,void * buff,uint32_t len)191 uint32_t cache_copy_out(TRWCache* cache, void* buff, uint32_t len)
192 {
193 if ((cache == NULL) || (cache->pool == NULL)) {
194 return 0;
195 }
196
197 char* out_buff = (char*)buff;
198 uint32_t left = len, skip_len = 0;
199 TSkBuffer* item = NULL;
200 TSkBuffer* tmp = NULL;
201 TAILQ_FOREACH_SAFE(item, &cache->list, entry, tmp)
202 {
203 skip_len = (item->data_len > left) ? left : item->data_len;
204 if (out_buff != NULL)
205 {
206 memcpy(out_buff, item->data, skip_len);
207 out_buff += skip_len;
208 }
209
210 left -= skip_len;
211 item->data_len -= skip_len;
212 item->data += skip_len;
213 if (item->data_len > 0)
214 {
215 break;
216 }
217
218 if (cache->count > 0) {
219 cache->count--;
220 }
221 TAILQ_REMOVE(&cache->list, item, entry);
222 free_sk_buffer(cache->pool, item);
223
224 if (left == 0)
225 {
226 break;
227 }
228 }
229
230 skip_len = len - left;
231 if (cache->len > skip_len)
232 {
233 cache->len -= skip_len;
234 }
235 else
236 {
237 cache->len = 0;
238 }
239
240 return skip_len;
241 }
242
cache_skip_data(TRWCache * cache,uint32_t len)243 void cache_skip_data(TRWCache* cache, uint32_t len)
244 {
245 cache_copy_out(cache, NULL, len);
246 }
247
cache_append_buffer(TRWCache * cache,TSkBuffer * buff)248 void cache_append_buffer(TRWCache* cache, TSkBuffer* buff)
249 {
250 if ((NULL == cache) || (NULL == buff))
251 {
252 return;
253 }
254
255 TAILQ_INSERT_TAIL(&cache->list, buff, entry);
256 cache->len += buff->data_len;
257 cache->count++;
258 }
259
cache_skip_first_buffer(TRWCache * cache)260 TSkBuffer* cache_skip_first_buffer(TRWCache* cache)
261 {
262 TSkBuffer* buff = TAILQ_FIRST(&cache->list);
263 if ((NULL == cache) || (NULL == buff))
264 {
265 return NULL;
266 }
267
268 TAILQ_REMOVE(&cache->list, buff, entry);
269 if (cache->len >= buff->data_len)
270 {
271 cache->len -= buff->data_len;
272 }
273
274 if (cache->count > 0)
275 {
276 cache->count--;
277 }
278
279 return buff;
280 }
281
cache_append_data(TRWCache * cache,const void * data,uint32_t len)282 int32_t cache_append_data(TRWCache* cache, const void* data, uint32_t len)
283 {
284 if ((NULL == data) || (NULL == cache) || (NULL == cache->pool))
285 {
286 return -1;
287 }
288
289 if (len == 0)
290 {
291 return 0;
292 }
293
294 uint32_t left = len;
295 uint32_t remain = 0;
296
297 TSkBuffer* tail = TAILQ_LAST(&cache->list, __sk_buff_list);
298 if (tail != NULL)
299 {
300 if (tail->end > (tail->data + tail->data_len))
301 {
302 remain = tail->end - tail->data - tail->data_len;
303 }
304
305 if (remain >= len)
306 {
307 memcpy(tail->data + tail->data_len, data, len);
308 tail->data_len += len;
309 cache->len += len;
310 return (int32_t)len;
311 }
312 }
313
314 TRWCache keep_list;
315 rw_cache_init(&keep_list, cache->pool);
316 left -= remain;
317 while (left > 0)
318 {
319 TSkBuffer* item = alloc_sk_buffer(cache->pool);
320 if (item == NULL)
321 {
322 rw_cache_destroy(&keep_list);
323 return -2;
324 }
325 cache_append_buffer(&keep_list, item);
326
327 if (left <= item->size)
328 {
329 memcpy(item->head, (char*)data + len - left, left);
330 item->data_len = left;
331 break;
332 }
333
334 memcpy(item->head, (char*)data + len - left, item->size);
335 item->data_len = item->size;
336 left -= item->size;
337 }
338
339 if ((tail != NULL) && (remain > 0))
340 {
341 memcpy(tail->data + tail->data_len, data, remain);
342 tail->data_len += remain;
343 }
344
345 cache->len += len;
346 cache->count += keep_list.count;
347 TAILQ_CONCAT(&cache->list, &keep_list.list, entry);
348
349 return (int32_t)len;
350 }
351
cache_udp_recv(TRWCache * cache,uint32_t fd,struct sockaddr_in * remote_addr)352 int32_t cache_udp_recv(TRWCache* cache, uint32_t fd, struct sockaddr_in* remote_addr)
353 {
354 if (NULL == cache)
355 {
356 return -1;
357 }
358
359 int32_t total = 0;
360 for (uint32_t i = 0; i < 100; i++)
361 {
362 TSkBuffer* item = alloc_sk_buffer(cache->pool);
363 if (NULL == item)
364 {
365 return -2;
366 }
367
368 socklen_t addr_len = sizeof(*remote_addr);
369 mt_hook_syscall(recvfrom);
370 int32_t rc = ff_hook_recvfrom(fd, item->data, item->size, 0, (struct sockaddr*)remote_addr, &addr_len);
371 if (rc <= 0)
372 {
373 free_sk_buffer(cache->pool, item);
374
375 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
376 {
377 break;
378 }
379 else
380 {
381 MTLOG_ERROR("recvfrom failed, fd[%d] ret %d[%m]", fd, rc);
382 return -3;
383 }
384 }
385
386 item->data_len += rc;
387 cache_append_buffer(cache, item);
388 total += rc;
389 }
390
391 return total;
392 }
393
cache_tcp_recv(TRWCache * cache,uint32_t fd)394 int32_t cache_tcp_recv(TRWCache* cache, uint32_t fd)
395 {
396 if (NULL == cache)
397 {
398 return -1;
399 }
400
401 int32_t total = 0;
402 for (uint32_t i = 0; i < 100; i++)
403 {
404 TSkBuffer* item = TAILQ_LAST(&cache->list, __sk_buff_list);
405 if ((NULL == item)
406 || ((item->data_len + item->data) >= item->end))
407 {
408 item = alloc_sk_buffer(cache->pool);
409 if (item == NULL)
410 {
411 return -2;
412 }
413 cache_append_buffer(cache, item);
414 }
415
416 uint8_t* buff = item->data + item->data_len;
417 uint32_t remain = item->end - item->data - item->data_len;
418 mt_hook_syscall(recv);
419 int32_t recvd_len = ff_hook_recv(fd, buff, remain, 0);
420 if (recvd_len == 0)
421 {
422 MTLOG_DEBUG("remote close, socket: %d", fd);
423 return -SK_ERR_NEED_CLOSE;
424 }
425 else if (recvd_len < 0)
426 {
427 if (errno == EAGAIN)
428 {
429 return total;
430 }
431 else
432 {
433 MTLOG_ERROR("recv tcp socket failed, error: %d[%m]", errno);
434 return -2;
435 }
436 }
437 else
438 {
439 item->data_len += recvd_len;
440 cache->len += recvd_len;
441 total += recvd_len;
442 if (recvd_len < (int32_t)remain)
443 {
444 return total;
445 }
446 }
447 }
448
449 return total;
450 }
451
cache_tcp_send(TRWCache * cache,uint32_t fd)452 int32_t cache_tcp_send(TRWCache* cache, uint32_t fd)
453 {
454 if ((NULL == cache) || (NULL == cache->pool))
455 {
456 return -1;
457 }
458
459 if (cache->len == 0)
460 {
461 return 0;
462 }
463
464 int32_t ret = 0, total = 0;
465 TSkBuffer* item = NULL;
466 TSkBuffer* tmp = NULL;
467 TAILQ_FOREACH_SAFE(item, &cache->list, entry, tmp)
468 {
469 mt_hook_syscall(send);
470 ret = ff_hook_send(fd, item->data, item->data_len, 0);
471 if (ret < 0)
472 {
473 break;
474 }
475
476 total += ret;
477 if (ret < (int32_t)item->data_len)
478 {
479 break;
480 }
481 }
482
483 cache_skip_data(cache, total);
484 if (ret < 0)
485 {
486 if (errno != EAGAIN)
487 {
488 MTLOG_ERROR("tcp socket send failed, error: %d[%m]", errno);
489 return -2;
490 }
491 }
492
493 return total;
494 }
495
cache_tcp_send_buff(TRWCache * cache,uint32_t fd,const void * data,uint32_t len)496 int32_t cache_tcp_send_buff(TRWCache* cache, uint32_t fd, const void* data, uint32_t len)
497 {
498 if ((NULL == cache) || (NULL == data))
499 {
500 return -1;
501 }
502
503 int32_t ret = cache_tcp_send(cache, fd);
504 if (ret < 0)
505 {
506 MTLOG_ERROR("tcp socket[%d] send cache data failed, rc: %d", fd, ret);
507 return ret;
508 }
509
510 int32_t send_len = 0;
511 if (cache->len == 0)
512 {
513 mt_hook_syscall(send);
514 ret = ff_hook_send(fd, data, len, 0);
515 if (ret >= 0)
516 {
517 send_len += ret;
518 }
519 else
520 {
521 if (errno != EAGAIN)
522 {
523 MTLOG_ERROR("tcp socket[%d] send failed, error: %d[%m]", fd, errno);
524 return -2;
525 }
526 }
527 }
528
529 int32_t rc = cache_append_data(cache, (char*)data + send_len, len - send_len);
530 if (rc < 0)
531 {
532 MTLOG_ERROR("tcp socket[%d] apend data failed, rc: %d", fd, rc);
533 return -3;
534 }
535
536 return send_len;
537 }
538
get_data_len(TBuffVecPtr multi)539 uint32_t get_data_len(TBuffVecPtr multi)
540 {
541 TRWCache* cache = (TRWCache*)multi;
542 if (NULL == cache) {
543 return 0;
544 } else {
545 return cache->len;
546 }
547 }
548
get_block_count(TBuffVecPtr multi)549 uint32_t get_block_count(TBuffVecPtr multi)
550 {
551 TRWCache* cache = (TRWCache*)multi;
552 if (NULL == cache) {
553 return 0;
554 } else {
555 return cache->count;
556 }
557 }
558
get_first_block(TBuffVecPtr multi)559 TBuffBlockPtr get_first_block(TBuffVecPtr multi)
560 {
561 TRWCache* cache = (TRWCache*)multi;
562 if (NULL == cache) {
563 return NULL;
564 } else {
565 return (TBuffBlockPtr)TAILQ_FIRST(&cache->list);
566 }
567 }
568
get_next_block(TBuffVecPtr multi,TBuffBlockPtr block)569 TBuffBlockPtr get_next_block(TBuffVecPtr multi, TBuffBlockPtr block)
570 {
571 TRWCache* cache = (TRWCache*)multi;
572 TSkBuffer* item = (TSkBuffer*)block;
573 if ((NULL == cache) || (NULL == item))
574 {
575 return NULL;
576 }
577
578 return (TBuffBlockPtr)TAILQ_NEXT(item, entry);
579
580 }
581
get_block_data(TBuffBlockPtr block,const void ** data,int32_t * len)582 void get_block_data(TBuffBlockPtr block, const void** data, int32_t* len)
583 {
584 TSkBuffer* item = (TSkBuffer*)block;
585 if (NULL == block)
586 {
587 return;
588 }
589
590 if (data != NULL)
591 {
592 *(uint8_t**)data = item->data;
593 }
594
595 if (len != NULL)
596 {
597 *len = (int32_t)item->data_len;
598 }
599 }
600
read_cache_data(TBuffVecPtr multi,void * data,uint32_t len)601 uint32_t read_cache_data(TBuffVecPtr multi, void* data, uint32_t len)
602 {
603 TRWCache* cache = (TRWCache*)multi;
604 if (NULL == cache) {
605 return 0;
606 }
607
608 uint32_t left_len = len;
609 uint32_t offset = 0;
610 TSkBuffer* item = NULL;
611 TSkBuffer* tmp = NULL;
612 TAILQ_FOREACH_SAFE(item, &cache->list, entry, tmp)
613 {
614 uint32_t copy_len = 0;
615 if (left_len <= item->data_len)
616 {
617 copy_len = left_len;
618 }
619 else
620 {
621 copy_len = item->data_len;
622 }
623
624 if (data != NULL)
625 {
626 memcpy((char*)data + offset, item->data, copy_len);
627 }
628 offset += copy_len;
629 left_len -= copy_len;
630
631 if (left_len <= 0)
632 {
633 break;
634 }
635 }
636
637 return offset;
638 }
639
read_cache_begin(TBuffVecPtr multi,uint32_t begin,void * data,uint32_t len)640 uint32_t read_cache_begin(TBuffVecPtr multi, uint32_t begin, void* data, uint32_t len)
641 {
642 TRWCache* cache = (TRWCache*)multi;
643 if (NULL == cache) {
644 return 0;
645 }
646
647 if (begin >= cache->len) {
648 return 0;
649 }
650
651 uint32_t pos_left = begin;
652 uint32_t copy_left = len;
653 uint32_t offset = 0;
654 TSkBuffer* item = NULL;
655 TAILQ_FOREACH(item, &cache->list, entry)
656 {
657 uint8_t* start_ptr = item->data;
658 uint32_t real_left = item->data_len;
659 if (pos_left > 0)
660 {
661 uint32_t skip_len = pos_left > real_left ? real_left : pos_left;
662 pos_left -= skip_len;
663 real_left -= skip_len;
664 start_ptr += skip_len;
665 }
666
667 if (real_left == 0)
668 {
669 continue;
670 }
671
672 uint32_t copy_len = copy_left > real_left ? real_left : copy_left;
673 if (data != NULL)
674 {
675 memcpy((char*)data + offset, start_ptr, copy_len);
676 }
677 offset += copy_len;
678 copy_left -= copy_len;
679 if (copy_left == 0)
680 {
681 break;
682 }
683 }
684
685 return offset;
686 }
687
688 };
689