1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @file mt_concurrent.c
22  *  @info ��·������ģ����չ
23  *  @time 20130924
24  **/
25 
26 #include "micro_thread.h"
27 #include "mt_msg.h"
28 #include "mt_notify.h"
29 #include "mt_connection.h"
30 #include "mt_concurrent.h"
31 
32 using namespace std;
33 using namespace NS_MICRO_THREAD;
34 
35 
36 /**
37  * @brief ��·IO�Ĵ����Ż�, �첽���ȵȴ�����
38  * @param req_list - �����б�
39  * @param how - EPOLLIN  EPOLLOUT
40  * @param timeout - ��ʱʱ�� ���뵥λ
41  * @return 0 �ɹ�, <0ʧ�� -3 ����ʱ
42  */
43 int NS_MICRO_THREAD::mt_multi_netfd_poll(IMtActList& req_list, int how, int timeout)
44 {
45     KqObjList fdlist;
46     TAILQ_INIT(&fdlist);
47 
48     KqueuerObj* obj = NULL;
49     IMtAction* action = NULL;
50     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
51     {
52         action = *it;
53         if (action) {
54             obj = action->GetNtfyObj();
55         }
56         if (!action || !obj)
57         {
58             action->SetErrno(ERR_FRAME_ERROR);
59             MTLOG_ERROR("input action %p, or ntify null, error", action);
60             return -1;
61         }
62 
63         obj->SetRcvEvents(0);
64         if (how & KQ_EVENT_READ)
65         {
66             obj->EnableInput();
67         }
68         else
69         {
70             obj->DisableInput();
71         }
72 
73         if (how & KQ_EVENT_WRITE)
74         {
75             obj->EnableOutput();
76         }
77         else
78         {
79             obj->DisableOutput();
80         }
81 
82         TAILQ_INSERT_TAIL(&fdlist, obj, _entry);
83 
84     }
85 
86     MtFrame* mtframe = MtFrame::Instance();
87     if (!mtframe || !mtframe->KqueueSchedule(&fdlist, NULL, (int)timeout))
88     {
89         if (errno != ETIME)
90         {
91             action->SetErrno(ERR_KQUEUE_FAIL);
92             MTLOG_ERROR("Mtframe %p, epoll schedule failed, errno %d", mtframe, errno);
93             return -2;
94         }
95 
96         return -3;
97     }
98 
99     return 0;
100 }
101 
102 /**
103  * @brief Ϊÿ��ITEM���������ĵ�socket
104  * @param req_list - �����б�
105  * @return 0 �ɹ�, <0ʧ��
106  */
107 int NS_MICRO_THREAD::mt_multi_newsock(IMtActList& req_list)
108 {
109     int sock = -1, has_ok = 0;
110     IMtAction* action = NULL;
111     IMtConnection* net_handler = NULL;
112 
113     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
114     {
115         action = *it;
116         if (NULL == action)
117         {
118             action->SetErrno(ERR_PARAM_ERROR);
119             MTLOG_ERROR("Invalid param, conn %p null!!", action);
120             return -1;
121         }
122 
123         if (action->GetErrno() != ERR_NONE) {
124             continue;
125         }
126 
127         net_handler = action->GetIConnection();
128         if (NULL == net_handler)
129         {
130             action->SetErrno(ERR_FRAME_ERROR);
131             MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
132             return -2;
133         }
134 
135         sock = net_handler->CreateSocket();
136         if (sock < 0)
137         {
138             action->SetErrno(ERR_SOCKET_FAIL);
139             MTLOG_ERROR("Get sock data failed, ret %d, errno %d!!", sock, errno);
140             return -3;
141         }
142         has_ok = 1;
143 
144         if (action->GetProtoType() == MT_UDP)
145         {
146             action->SetMsgFlag(MULTI_FLAG_OPEN);
147         }
148         else
149         {
150             action->SetMsgFlag(MULTI_FLAG_INIT);
151         }
152     }
153 
154     if (has_ok)
155     {
156         return 0;
157     }
158     else
159     {
160         return -4;
161     }
162 }
163 
164 
165 /**
166  * @brief ��·IO�Ĵ���, ������
167  * @param req_list - �����б�
168  * @param timeout - ��ʱʱ�� ���뵥λ
169  * @return 0 �ɹ�, <0ʧ��
170  */
171 int NS_MICRO_THREAD::mt_multi_open(IMtActList& req_list, int timeout)
172 {
173     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
174     utime64_t end_ms = start_ms + timeout;
175     utime64_t curr_ms = 0;
176 
177     int ret = 0, has_open = 0;
178     IMtAction* action = NULL;
179     IMtConnection* net_handler = NULL;
180     IMtActList::iterator it;
181 
182     while (1)
183     {
184         IMtActList wait_list;
185         for (it = req_list.begin(); it != req_list.end(); ++it)
186         {
187             action = *it;
188             if (action->GetErrno() != ERR_NONE) {
189                 continue;
190             }
191 
192             if (action->GetMsgFlag() == MULTI_FLAG_OPEN) {
193                 has_open = 1;
194                 continue;
195             }
196 
197             net_handler = action->GetIConnection();
198             if (NULL == net_handler)
199             {
200                 action->SetErrno(ERR_FRAME_ERROR);
201                 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
202                 return -1;
203             }
204 
205             ret = net_handler->OpenCnnect();
206             if (ret < 0)
207             {
208                 wait_list.push_back(action);
209             }
210             else
211             {
212                 action->SetMsgFlag(MULTI_FLAG_OPEN);
213             }
214         }
215 
216         curr_ms = MtFrame::Instance()->GetLastClock();
217         if (curr_ms > end_ms)
218         {
219             MTLOG_DEBUG("Open connect timeout, errno %d", errno);
220             for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
221             {
222                 (*it)->SetErrno(ERR_CONNECT_FAIL);
223             }
224 
225             if (!has_open)
226             {
227                 return 0;
228             }
229             else
230             {
231                 return -2;
232             }
233         }
234 
235         if (!wait_list.empty())
236         {
237             mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms);
238         }
239         else
240         {
241             return 0;
242         }
243     }
244 
245 }
246 
247 
248 /**
249  * @brief ��·IO�Ĵ���, ��������
250  * @param req_list - �����б�
251  * @param timeout - ��ʱʱ�� ���뵥λ
252  * @return 0 �ɹ�, <0ʧ��
253  */
254 int NS_MICRO_THREAD::mt_multi_sendto(IMtActList& req_list, int timeout)
255 {
256     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
257     utime64_t end_ms = start_ms + timeout;
258     utime64_t curr_ms = 0;
259 
260     int ret = 0, has_send = 0;
261     IMtAction* action = NULL;
262     IMtConnection* net_handler = NULL;
263 
264     while (1)
265     {
266         IMtActList wait_list;
267         for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
268         {
269             action = *it;
270             if (action->GetErrno() != ERR_NONE) {
271                 continue;
272             }
273 
274             if (action->GetMsgFlag() == MULTI_FLAG_SEND) {
275                 has_send = 1;
276                 continue;
277             }
278 
279             net_handler = action->GetIConnection();
280             if (NULL == net_handler)
281             {
282                 action->SetErrno(ERR_FRAME_ERROR);
283                 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
284                 return -2;
285             }
286 
287             // 0 -��Ҫ��������; -1 ֹͣ����; > 0 ����OK
288             ret = net_handler->SendData();
289             if (ret == -1)
290             {
291                 action->SetErrno(ERR_SEND_FAIL);
292                 MTLOG_ERROR("MultiItem msg send error, %d", errno);
293                 continue;
294             }
295             else if (ret == 0)
296             {
297                 wait_list.push_back(action);
298                 continue;
299             }
300             else
301             {
302                 action->SetMsgFlag(MULTI_FLAG_SEND);
303             }
304         }
305 
306         curr_ms = MtFrame::Instance()->GetLastClock();
307         if (curr_ms > end_ms)
308         {
309             MTLOG_DEBUG("send data timeout");
310             for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
311             {
312                 (*it)->SetErrno(ERR_SEND_FAIL);
313             }
314 
315             if (has_send)
316             {
317                 return 0;
318             }
319             else
320             {
321                 return -5;
322             }
323         }
324 
325         if (!wait_list.empty())
326         {
327             mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms);
328         }
329         else
330         {
331             return 0;
332         }
333     }
334 
335     return 0;
336 }
337 
338 
339 
340 /**
341  * @brief ��·IO�������մ���
342  */
343 int NS_MICRO_THREAD::mt_multi_recvfrom(IMtActList& req_list, int timeout)
344 {
345     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
346     utime64_t end_ms = start_ms + timeout;
347     utime64_t curr_ms = 0;
348 
349     int ret = 0;
350     IMtAction* action = NULL;
351     IMtConnection* net_handler = NULL;
352 
353     while (1)
354     {
355         IMtActList wait_list;
356         for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
357         {
358             action = *it;
359             if (action->GetErrno() != ERR_NONE) {
360                 continue;
361             }
362 
363             if (MULTI_FLAG_FIN == action->GetMsgFlag()) ///< �Ѵ������
364             {
365                 continue;
366             }
367 
368             net_handler = action->GetIConnection();
369             if (NULL == net_handler)
370             {
371                 action->SetErrno(ERR_FRAME_ERROR);
372                 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
373                 return -2;
374             }
375 
376             // <0 ʧ��, 0 ������, >0 �ɹ�
377             ret = net_handler->RecvData();
378             if (ret < 0)
379             {
380                 action->SetErrno(ERR_RECV_FAIL);
381                 MTLOG_ERROR("MultiItem msg recv failed: %p", net_handler);
382                 continue;
383             }
384             else if (ret == 0)
385             {
386                 wait_list.push_back(action);
387                 continue;
388             }
389             else
390             {
391                 action->SetMsgFlag(MULTI_FLAG_FIN);
392                 action->SetCost(MtFrame::Instance()->GetLastClock() - start_ms);
393             }
394         }
395 
396         curr_ms = MtFrame::Instance()->GetLastClock();
397         if (curr_ms > end_ms)
398         {
399             MTLOG_DEBUG("Recv data timeout, curr %llu, start: %llu", curr_ms, start_ms);
400             for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
401             {
402                 (*it)->SetErrno(ERR_RECV_TIMEOUT);
403             }
404             return -5;
405         }
406 
407         if (!wait_list.empty())
408         {
409             mt_multi_netfd_poll(wait_list, KQ_EVENT_READ, end_ms - curr_ms);
410         }
411         else
412         {
413             return 0;
414         }
415     }
416 }
417 
418 /**
419  * @brief ��·IO�������մ���
420  */
421 int NS_MICRO_THREAD::mt_multi_sendrcv_ex(IMtActList& req_list, int timeout)
422 {
423     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
424     utime64_t curr_ms = 0;
425 
426     int rc = mt_multi_newsock(req_list); // TODO, ����ȡconnect��ʱʱ���
427     if (rc < 0)
428     {
429         MT_ATTR_API(320842, 1); // socketʧ��
430         MTLOG_ERROR("mt_multi_sendrcv new sock failed, ret: %d", rc);
431         return -1;
432     }
433 
434     rc = mt_multi_open(req_list, timeout);
435     if (rc < 0)
436     {
437         MT_ATTR_API(320843, 1); // connectʧ��
438         MTLOG_ERROR("mt_multi_sendrcv open failed, ret: %d", rc);
439         return -2;
440     }
441 
442     curr_ms = MtFrame::Instance()->GetLastClock();
443     rc = mt_multi_sendto(req_list, timeout - (curr_ms - start_ms));
444     if (rc < 0)
445     {
446         MT_ATTR_API(320844, 1); // ����ʧ��
447         MTLOG_ERROR("mt_multi_sendrcv send failed, ret: %d", rc);
448         return -3;
449     }
450 
451     curr_ms = MtFrame::Instance()->GetLastClock();
452     rc = mt_multi_recvfrom(req_list, timeout - (curr_ms - start_ms));
453     if (rc < 0)
454     {
455         MT_ATTR_API(320845, 1); // ����δ��ȫ�ɹ�
456         MTLOG_ERROR("mt_multi_sendrcv recv failed, ret: %d", rc);
457         return -4;
458     }
459 
460     return 0;
461 }
462 
463 
464 /**
465  * @brief ��·IO�������մ���ӿ�, ��װACTON�ӿ�ģ��, �ڲ�����msg
466  * @param req_list -action list ʵ�ַ�װ�����ӿ�
467  * @param timeout -��ʱʱ��, ��λms
468  * @return  0 �ɹ�, -1 ��ʼ������ʧ��, �����ɹ����ֳɹ�
469  */
470 int NS_MICRO_THREAD::mt_msg_sendrcv(IMtActList& req_list, int timeout)
471 {
472     int iRet = 0;
473 
474     // ��һ��, ��ʼ��action����, ��װ������
475     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
476     {
477         IMtAction* pAction = *it;
478         if (!pAction || pAction->InitConnEnv() < 0)
479         {
480             MTLOG_ERROR("invalid action(%p) or int failed, error", pAction);
481             return -1;
482         }
483 
484         iRet = pAction->DoEncode();
485         if (iRet < 0)
486         {
487             pAction->SetErrno(ERR_ENCODE_ERROR);
488             MTLOG_ERROR("pack action pkg failed, act %p, ret %d", pAction, iRet);
489             continue;
490         }
491 
492     }
493 
494     // �ڶ���, ͬ���շ���Ϣ, ʧ��Ҳ��Ҫ֪ͨ����
495     mt_multi_sendrcv_ex(req_list, timeout);
496 
497     // ������, ͬ��֪ͨ�������
498     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
499     {
500         IMtAction* pAction = *it;
501 
502         if (pAction->GetMsgFlag() != MULTI_FLAG_FIN)
503         {
504             pAction->DoError();
505             MTLOG_DEBUG("send recv failed: %d", pAction->GetErrno());
506             continue;
507         }
508 
509         iRet = pAction->DoProcess();
510         if (iRet < 0)
511         {
512             MTLOG_DEBUG("action process failed: %d", iRet);
513             continue;
514         }
515     }
516 
517     // ���IJ�, �������ڲ���Դ, ���ݸ����÷�
518     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
519     {
520         IMtAction* pAction = *it;
521         pAction->Reset();
522     }
523 
524     return 0;
525 }
526 
527 
528 
529