xref: /f-stack/app/micro_thread/mt_cache.cpp (revision 70bb2888)
1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @filename mt_cache.cpp
22  */
23 
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <string.h>
29 #include "mt_incl.h"
30 #include "kqueue_proxy.h"
31 #include "micro_thread.h"
32 #include "mt_sys_hook.h"
33 #include "ff_hook.h"
34 
35 #include "mt_cache.h"
36 
37 namespace NS_MICRO_THREAD {
38 
39 TSkBuffer* new_sk_buffer(uint32_t size)
40 {
41     uint32_t total = sizeof(TSkBuffer) + size;
42     total = (total + SK_DFLT_ALIGN_SIZE - 1) / SK_DFLT_ALIGN_SIZE * SK_DFLT_ALIGN_SIZE;
43     TSkBuffer* block = (TSkBuffer*)malloc(total);
44     if (block == NULL)
45     {
46         MTLOG_ERROR("malloc failed, no more memory[%u]", total);
47         return NULL;
48     }
49 
50     block->last_time = 0;
51     block->size = size;
52     block->head = block->buff;
53     block->end  = block->buff + size;
54 
55     block->data = block->head;
56     block->data_len = 0;
57 
58     return block;
59 }
60 
61 void delete_sk_buffer(TSkBuffer* block)
62 {
63     if (NULL == block) {
64         return;
65     }
66 
67     free(block);
68 }
69 
70 TSkBuffer* reserve_sk_buffer(TSkBuffer* buff, uint32_t size)
71 {
72     if (NULL == buff) {
73         return new_sk_buffer(size);
74     }
75 
76     if (buff->size >= size) {
77         return buff;
78     }
79 
80     TSkBuffer* new_buff = new_sk_buffer(size);
81     if (NULL == new_buff) {
82         return buff;
83     }
84     memcpy(new_buff->data, buff->data, buff->data_len);
85     new_buff->data_len = buff->data_len;
86     delete_sk_buffer(buff);
87 
88     return new_buff;
89 }
90 
91 void sk_buffer_mng_init(TSkBuffMng* mng, uint32_t expired, uint32_t size)
92 {
93     TAILQ_INIT(&mng->free_list);
94     mng->expired  = expired;
95     mng->count = 0;
96     mng->size = size;
97 }
98 
99 void sk_buffer_mng_destroy(TSkBuffMng * mng)
100 {
101     TSkBuffer* item = NULL;
102     TSkBuffer* tmp = NULL;
103     TAILQ_FOREACH_SAFE(item, &mng->free_list, entry, tmp)
104     {
105         TAILQ_REMOVE(&mng->free_list, item, entry);
106         delete_sk_buffer(item);
107     }
108     mng->count = 0;
109 }
110 
111 TSkBuffer* alloc_sk_buffer(TSkBuffMng* mng)
112 {
113     if (NULL == mng) {
114         return NULL;
115     }
116 
117     TSkBuffer* item = TAILQ_FIRST(&mng->free_list);
118     if (item != NULL)
119     {
120         TAILQ_REMOVE(&mng->free_list, item, entry);
121         mng->count--;
122         return item;
123     }
124 
125     item = new_sk_buffer(mng->size);
126     if (NULL == item)
127     {
128         return NULL;
129     }
130 
131     return item;
132 }
133 
134 void free_sk_buffer(TSkBuffMng* mng, TSkBuffer* buff)
135 {
136     if ((NULL == mng) || (NULL == buff)) {
137         return;
138     }
139 
140     TAILQ_INSERT_TAIL(&mng->free_list, buff, entry);
141     mng->count++;
142 
143     buff->last_time = (uint32_t)(mt_time_ms() / 1000);
144     buff->data = buff->head;
145     buff->data_len = 0;
146 }
147 
148 void recycle_sk_buffer(TSkBuffMng* mng, uint32_t now)
149 {
150     TSkBuffer* item = NULL;
151     TSkBuffer* tmp = NULL;
152     TAILQ_FOREACH_SAFE(item, &mng->free_list, entry, tmp)
153     {
154         if ((now - item->last_time) < mng->expired)
155         {
156             break;
157         }
158 
159         TAILQ_REMOVE(&mng->free_list, item, entry);
160         delete_sk_buffer(item);
161         mng->count--;
162     }
163 }
164 
165 void rw_cache_init(TRWCache* cache, TSkBuffMng* pool)
166 {
167     TAILQ_INIT(&cache->list);
168     cache->len = 0;
169     cache->count = 0;
170     cache->pool = pool;
171 }
172 
173 void rw_cache_destroy(TRWCache* cache)
174 {
175     if ((cache == NULL) || (cache->pool == NULL)) {
176         return;
177     }
178 
179     TSkBuffer* item = NULL;
180     TSkBuffer* tmp = NULL;
181     TAILQ_FOREACH_SAFE(item, &cache->list, entry, tmp)
182     {
183         TAILQ_REMOVE(&cache->list, item, entry);
184         free_sk_buffer(cache->pool, item);
185     }
186     cache->count = 0;
187     cache->len = 0;
188     cache->pool = NULL;
189 }
190 
191 uint32_t cache_copy_out(TRWCache* cache, void* buff, uint32_t len)
192 {
193     if ((cache == NULL) || (cache->pool == NULL)) {
194         return 0;
195     }
196 
197     char* out_buff = (char*)buff;
198     uint32_t left = len, skip_len = 0;
199     TSkBuffer* item = NULL;
200     TSkBuffer* tmp = NULL;
201     TAILQ_FOREACH_SAFE(item, &cache->list, entry, tmp)
202     {
203         skip_len = (item->data_len > left) ? left : item->data_len;
204         if (out_buff != NULL)
205         {
206             memcpy(out_buff, item->data, skip_len);
207             out_buff += skip_len;
208         }
209 
210         left -= skip_len;
211         item->data_len -= skip_len;
212         item->data += skip_len;
213         if (item->data_len > 0)
214         {
215             break;
216         }
217 
218         if (cache->count > 0) {
219             cache->count--;
220         }
221         TAILQ_REMOVE(&cache->list, item, entry);
222         free_sk_buffer(cache->pool, item);
223 
224         if (left == 0)
225         {
226             break;
227         }
228     }
229 
230     skip_len = len - left;
231     if (cache->len > skip_len)
232     {
233         cache->len -= skip_len;
234     }
235     else
236     {
237         cache->len = 0;
238     }
239 
240     return skip_len;
241 }
242 
243 void cache_skip_data(TRWCache* cache, uint32_t len)
244 {
245     cache_copy_out(cache, NULL, len);
246 }
247 
248 void cache_append_buffer(TRWCache* cache, TSkBuffer* buff)
249 {
250     if ((NULL == cache) || (NULL == buff))
251     {
252         return;
253     }
254 
255     TAILQ_INSERT_TAIL(&cache->list, buff, entry);
256     cache->len += buff->data_len;
257     cache->count++;
258 }
259 
260 TSkBuffer* cache_skip_first_buffer(TRWCache* cache)
261 {
262     TSkBuffer* buff = TAILQ_FIRST(&cache->list);
263     if ((NULL == cache) || (NULL == buff))
264     {
265         return NULL;
266     }
267 
268     TAILQ_REMOVE(&cache->list, buff, entry);
269     if (cache->len >= buff->data_len)
270     {
271         cache->len -= buff->data_len;
272     }
273 
274     if (cache->count > 0)
275     {
276         cache->count--;
277     }
278 
279     return buff;
280 }
281 
282 int32_t cache_append_data(TRWCache* cache, const void* data, uint32_t len)
283 {
284     if ((NULL == data) || (NULL == cache) || (NULL == cache->pool))
285     {
286         return -1;
287     }
288 
289     if (len == 0)
290     {
291         return 0;
292     }
293 
294     uint32_t left = len;
295     uint32_t remain = 0;
296 
297     TSkBuffer* tail = TAILQ_LAST(&cache->list, __sk_buff_list);
298     if (tail != NULL)
299     {
300         if (tail->end > (tail->data + tail->data_len))
301         {
302             remain = tail->end - tail->data - tail->data_len;
303         }
304 
305         if (remain >= len)
306         {
307             memcpy(tail->data + tail->data_len, data, len);
308             tail->data_len += len;
309             cache->len += len;
310             return (int32_t)len;
311         }
312     }
313 
314     TRWCache keep_list;
315     rw_cache_init(&keep_list, cache->pool);
316     left -= remain;
317     while (left > 0)
318     {
319         TSkBuffer* item = alloc_sk_buffer(cache->pool);
320         if (item == NULL)
321         {
322             rw_cache_destroy(&keep_list);
323             return -2;
324         }
325         cache_append_buffer(&keep_list, item);
326 
327         if (left <= item->size)
328         {
329             memcpy(item->head, (char*)data + len - left, left);
330             item->data_len = left;
331             break;
332         }
333 
334         memcpy(item->head, (char*)data + len - left, item->size);
335         item->data_len = item->size;
336         left -= item->size;
337     }
338 
339     if ((tail != NULL) && (remain > 0))
340     {
341         memcpy(tail->data + tail->data_len, data, remain);
342         tail->data_len += remain;
343     }
344 
345     cache->len += len;
346     cache->count += keep_list.count;
347     TAILQ_CONCAT(&cache->list, &keep_list.list, entry);
348 
349     return (int32_t)len;
350 }
351 
352 int32_t cache_udp_recv(TRWCache* cache, uint32_t fd, struct sockaddr_in* remote_addr)
353 {
354     if (NULL == cache)
355     {
356         return -1;
357     }
358 
359     int32_t total = 0;
360     for (uint32_t i = 0; i < 100; i++)
361     {
362         TSkBuffer* item = alloc_sk_buffer(cache->pool);
363         if (NULL == item)
364         {
365             return -2;
366         }
367 
368         socklen_t addr_len = sizeof(*remote_addr);
369         mt_hook_syscall(recvfrom);
370         int32_t rc = ff_hook_recvfrom(fd, item->data, item->size, 0, (struct sockaddr*)remote_addr, &addr_len);
371         if (rc <= 0)
372         {
373             free_sk_buffer(cache->pool, item);
374 
375             if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
376             {
377                 break;
378             }
379             else
380             {
381                 MTLOG_ERROR("recvfrom failed, fd[%d] ret %d[%m]", fd, rc);
382                 return -3;
383             }
384         }
385 
386         item->data_len += rc;
387         cache_append_buffer(cache, item);
388         total += rc;
389     }
390 
391     return total;
392 }
393 
394 int32_t cache_tcp_recv(TRWCache* cache, uint32_t fd)
395 {
396     if (NULL == cache)
397     {
398         return -1;
399     }
400 
401     int32_t total = 0;
402     for (uint32_t i = 0; i < 100; i++)
403     {
404         TSkBuffer* item = TAILQ_LAST(&cache->list, __sk_buff_list);
405         if ((NULL == item)
406             || ((item->data_len + item->data) >= item->end))
407         {
408             item = alloc_sk_buffer(cache->pool);
409             if (item == NULL)
410             {
411                return -2;
412             }
413             cache_append_buffer(cache, item);
414         }
415 
416         uint8_t* buff = item->data + item->data_len;
417         uint32_t remain = item->end - item->data - item->data_len;
418         mt_hook_syscall(recv);
419         int32_t recvd_len = ff_hook_recv(fd, buff, remain, 0);
420         if (recvd_len == 0)
421         {
422             MTLOG_DEBUG("remote close, socket: %d", fd);
423             return -SK_ERR_NEED_CLOSE;
424         }
425         else if (recvd_len < 0)
426         {
427             if (errno == EAGAIN)
428             {
429                 return total;
430             }
431             else
432             {
433                 MTLOG_ERROR("recv tcp socket failed, error: %d[%m]", errno);
434                 return -2;
435             }
436         }
437         else
438         {
439             item->data_len += recvd_len;
440             cache->len += recvd_len;
441             total += recvd_len;
442             if (recvd_len < (int32_t)remain)
443             {
444                 return total;
445             }
446         }
447     }
448 
449     return total;
450 }
451 
452 int32_t cache_tcp_send(TRWCache* cache, uint32_t fd)
453 {
454     if ((NULL == cache) || (NULL == cache->pool))
455     {
456         return -1;
457     }
458 
459     if (cache->len == 0)
460     {
461         return 0;
462     }
463 
464     int32_t ret = 0, total = 0;
465     TSkBuffer* item = NULL;
466     TSkBuffer* tmp = NULL;
467     TAILQ_FOREACH_SAFE(item, &cache->list, entry, tmp)
468     {
469         mt_hook_syscall(send);
470         ret = ff_hook_send(fd, item->data, item->data_len, 0);
471         if (ret < 0)
472         {
473             break;
474         }
475 
476         total += ret;
477         if (ret < (int32_t)item->data_len)
478         {
479             break;
480         }
481     }
482 
483     cache_skip_data(cache, total);
484     if (ret < 0)
485     {
486         if (errno != EAGAIN)
487         {
488             MTLOG_ERROR("tcp socket send failed, error: %d[%m]", errno);
489             return -2;
490         }
491     }
492 
493     return total;
494 }
495 
496 int32_t cache_tcp_send_buff(TRWCache* cache, uint32_t fd, const void* data, uint32_t len)
497 {
498     if ((NULL == cache) || (NULL == data))
499     {
500         return -1;
501     }
502 
503     int32_t ret = cache_tcp_send(cache, fd);
504     if (ret < 0)
505     {
506         MTLOG_ERROR("tcp socket[%d] send cache data failed, rc: %d", fd, ret);
507         return ret;
508     }
509 
510     int32_t send_len = 0;
511     if (cache->len == 0)
512     {
513         mt_hook_syscall(send);
514         ret = ff_hook_send(fd, data, len, 0);
515         if (ret >= 0)
516         {
517             send_len += ret;
518         }
519         else
520         {
521             if (errno != EAGAIN)
522             {
523                 MTLOG_ERROR("tcp socket[%d] send failed, error: %d[%m]", fd, errno);
524                 return -2;
525             }
526         }
527     }
528 
529     int32_t rc = cache_append_data(cache, (char*)data + send_len, len - send_len);
530     if (rc < 0)
531     {
532         MTLOG_ERROR("tcp socket[%d] apend data failed, rc: %d", fd, rc);
533         return -3;
534     }
535 
536     return send_len;
537 }
538 
539 uint32_t get_data_len(TBuffVecPtr multi)
540 {
541     TRWCache* cache = (TRWCache*)multi;
542     if (NULL == cache) {
543         return 0;
544     } else {
545         return cache->len;
546     }
547 }
548 
549 uint32_t get_block_count(TBuffVecPtr multi)
550 {
551     TRWCache* cache = (TRWCache*)multi;
552     if (NULL == cache) {
553         return 0;
554     } else {
555         return cache->count;
556     }
557 }
558 
559 TBuffBlockPtr get_first_block(TBuffVecPtr multi)
560 {
561     TRWCache* cache = (TRWCache*)multi;
562     if (NULL == cache) {
563         return NULL;
564     } else {
565         return (TBuffBlockPtr)TAILQ_FIRST(&cache->list);
566     }
567 }
568 
569 TBuffBlockPtr get_next_block(TBuffVecPtr multi, TBuffBlockPtr block)
570 {
571     TRWCache* cache = (TRWCache*)multi;
572     TSkBuffer* item = (TSkBuffer*)block;
573     if ((NULL == cache) || (NULL == item))
574     {
575         return NULL;
576     }
577 
578     return (TBuffBlockPtr)TAILQ_NEXT(item, entry);
579 
580 }
581 
582 void get_block_data(TBuffBlockPtr block, const void** data, int32_t* len)
583 {
584     TSkBuffer* item = (TSkBuffer*)block;
585     if (NULL == block)
586     {
587         return;
588     }
589 
590     if (data != NULL)
591     {
592         *(uint8_t**)data = item->data;
593     }
594 
595     if (len != NULL)
596     {
597         *len = (int32_t)item->data_len;
598     }
599 }
600 
601 uint32_t read_cache_data(TBuffVecPtr multi, void* data, uint32_t len)
602 {
603     TRWCache* cache = (TRWCache*)multi;
604     if (NULL == cache) {
605         return 0;
606     }
607 
608     uint32_t left_len = len;
609     uint32_t offset = 0;
610     TSkBuffer* item = NULL;
611     TSkBuffer* tmp = NULL;
612     TAILQ_FOREACH_SAFE(item, &cache->list, entry, tmp)
613     {
614         uint32_t copy_len = 0;
615         if (left_len <= item->data_len)
616         {
617             copy_len = left_len;
618         }
619         else
620         {
621             copy_len = item->data_len;
622         }
623 
624         if (data != NULL)
625         {
626             memcpy((char*)data + offset, item->data, copy_len);
627         }
628         offset += copy_len;
629         left_len -= copy_len;
630 
631         if (left_len <= 0)
632         {
633             break;
634         }
635     }
636 
637     return offset;
638 }
639 
640 uint32_t read_cache_begin(TBuffVecPtr multi, uint32_t begin, void* data, uint32_t len)
641 {
642     TRWCache* cache = (TRWCache*)multi;
643     if (NULL == cache) {
644         return 0;
645     }
646 
647     if (begin >= cache->len) {
648         return 0;
649     }
650 
651     uint32_t pos_left = begin;
652     uint32_t copy_left = len;
653     uint32_t offset = 0;
654     TSkBuffer* item = NULL;
655     TAILQ_FOREACH(item, &cache->list, entry)
656     {
657         uint8_t* start_ptr = item->data;
658         uint32_t real_left = item->data_len;
659         if (pos_left > 0)
660         {
661             uint32_t skip_len = pos_left > real_left ? real_left : pos_left;
662             pos_left -= skip_len;
663             real_left -= skip_len;
664             start_ptr += skip_len;
665         }
666 
667         if (real_left == 0)
668         {
669             continue;
670         }
671 
672         uint32_t copy_len = copy_left > real_left ? real_left : copy_left;
673         if (data != NULL)
674         {
675             memcpy((char*)data + offset, start_ptr, copy_len);
676         }
677         offset += copy_len;
678         copy_left -= copy_len;
679         if (copy_left == 0)
680         {
681             break;
682         }
683     }
684 
685     return offset;
686 }
687 
688 };
689