1*a9643ea8Slogwang 
2*a9643ea8Slogwang /**
3*a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4*a9643ea8Slogwang  *
5*a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6*a9643ea8Slogwang  *
7*a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8*a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9*a9643ea8Slogwang  * obtain a copy of the License at
10*a9643ea8Slogwang  *
11*a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12*a9643ea8Slogwang  *
13*a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14*a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15*a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16*a9643ea8Slogwang  * and limitations under the License.
17*a9643ea8Slogwang  */
18*a9643ea8Slogwang 
19*a9643ea8Slogwang 
20*a9643ea8Slogwang /**
21*a9643ea8Slogwang  *  @file mt_concurrent.c
22*a9643ea8Slogwang  *  @info ��·������ģ����չ
23*a9643ea8Slogwang  *  @time 20130924
24*a9643ea8Slogwang  **/
25*a9643ea8Slogwang 
26*a9643ea8Slogwang #include "micro_thread.h"
27*a9643ea8Slogwang #include "mt_msg.h"
28*a9643ea8Slogwang #include "mt_notify.h"
29*a9643ea8Slogwang #include "mt_connection.h"
30*a9643ea8Slogwang #include "mt_concurrent.h"
31*a9643ea8Slogwang 
32*a9643ea8Slogwang using namespace std;
33*a9643ea8Slogwang using namespace NS_MICRO_THREAD;
34*a9643ea8Slogwang 
35*a9643ea8Slogwang 
36*a9643ea8Slogwang /**
37*a9643ea8Slogwang  * @brief ��·IO�Ĵ����Ż�, �첽���ȵȴ�����
38*a9643ea8Slogwang  * @param req_list - �����б�
39*a9643ea8Slogwang  * @param how - EPOLLIN  EPOLLOUT
40*a9643ea8Slogwang  * @param timeout - ��ʱʱ�� ���뵥λ
41*a9643ea8Slogwang  * @return 0 �ɹ�, <0ʧ�� -3 ����ʱ
42*a9643ea8Slogwang  */
43*a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_netfd_poll(IMtActList& req_list, int how, int timeout)
44*a9643ea8Slogwang {
45*a9643ea8Slogwang     KqObjList fdlist;
46*a9643ea8Slogwang     TAILQ_INIT(&fdlist);
47*a9643ea8Slogwang 
48*a9643ea8Slogwang     KqueuerObj* obj = NULL;
49*a9643ea8Slogwang     IMtAction* action = NULL;
50*a9643ea8Slogwang     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
51*a9643ea8Slogwang     {
52*a9643ea8Slogwang         action = *it;
53*a9643ea8Slogwang         if (action) {
54*a9643ea8Slogwang             obj = action->GetNtfyObj();
55*a9643ea8Slogwang         }
56*a9643ea8Slogwang         if (!action || !obj)
57*a9643ea8Slogwang         {
58*a9643ea8Slogwang             action->SetErrno(ERR_FRAME_ERROR);
59*a9643ea8Slogwang             MTLOG_ERROR("input action %p, or ntify null, error", action);
60*a9643ea8Slogwang             return -1;
61*a9643ea8Slogwang         }
62*a9643ea8Slogwang 
63*a9643ea8Slogwang         obj->SetRcvEvents(0);
64*a9643ea8Slogwang         if (how & KQ_EVENT_READ)
65*a9643ea8Slogwang         {
66*a9643ea8Slogwang             obj->EnableInput();
67*a9643ea8Slogwang         }
68*a9643ea8Slogwang         else
69*a9643ea8Slogwang         {
70*a9643ea8Slogwang             obj->DisableInput();
71*a9643ea8Slogwang         }
72*a9643ea8Slogwang 
73*a9643ea8Slogwang         if (how & KQ_EVENT_WRITE)
74*a9643ea8Slogwang         {
75*a9643ea8Slogwang             obj->EnableOutput();
76*a9643ea8Slogwang         }
77*a9643ea8Slogwang         else
78*a9643ea8Slogwang         {
79*a9643ea8Slogwang             obj->DisableOutput();
80*a9643ea8Slogwang         }
81*a9643ea8Slogwang 
82*a9643ea8Slogwang         TAILQ_INSERT_TAIL(&fdlist, obj, _entry);
83*a9643ea8Slogwang 
84*a9643ea8Slogwang     }
85*a9643ea8Slogwang 
86*a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
87*a9643ea8Slogwang     if (!mtframe || !mtframe->KqueueSchedule(&fdlist, NULL, (int)timeout))
88*a9643ea8Slogwang     {
89*a9643ea8Slogwang         if (errno != ETIME)
90*a9643ea8Slogwang         {
91*a9643ea8Slogwang             action->SetErrno(ERR_KQUEUE_FAIL);
92*a9643ea8Slogwang             MTLOG_ERROR("Mtframe %p, epoll schedule failed, errno %d", mtframe, errno);
93*a9643ea8Slogwang             return -2;
94*a9643ea8Slogwang         }
95*a9643ea8Slogwang 
96*a9643ea8Slogwang         return -3;
97*a9643ea8Slogwang     }
98*a9643ea8Slogwang 
99*a9643ea8Slogwang     return 0;
100*a9643ea8Slogwang }
101*a9643ea8Slogwang 
102*a9643ea8Slogwang /**
103*a9643ea8Slogwang  * @brief Ϊÿ��ITEM���������ĵ�socket
104*a9643ea8Slogwang  * @param req_list - �����б�
105*a9643ea8Slogwang  * @return 0 �ɹ�, <0ʧ��
106*a9643ea8Slogwang  */
107*a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_newsock(IMtActList& req_list)
108*a9643ea8Slogwang {
109*a9643ea8Slogwang     int sock = -1, has_ok = 0;
110*a9643ea8Slogwang     IMtAction* action = NULL;
111*a9643ea8Slogwang     IMtConnection* net_handler = NULL;
112*a9643ea8Slogwang 
113*a9643ea8Slogwang     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
114*a9643ea8Slogwang     {
115*a9643ea8Slogwang         action = *it;
116*a9643ea8Slogwang         if (NULL == action)
117*a9643ea8Slogwang         {
118*a9643ea8Slogwang             action->SetErrno(ERR_PARAM_ERROR);
119*a9643ea8Slogwang             MTLOG_ERROR("Invalid param, conn %p null!!", action);
120*a9643ea8Slogwang             return -1;
121*a9643ea8Slogwang         }
122*a9643ea8Slogwang 
123*a9643ea8Slogwang         if (action->GetErrno() != ERR_NONE) {
124*a9643ea8Slogwang             continue;
125*a9643ea8Slogwang         }
126*a9643ea8Slogwang 
127*a9643ea8Slogwang         net_handler = action->GetIConnection();
128*a9643ea8Slogwang         if (NULL == net_handler)
129*a9643ea8Slogwang         {
130*a9643ea8Slogwang             action->SetErrno(ERR_FRAME_ERROR);
131*a9643ea8Slogwang             MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
132*a9643ea8Slogwang             return -2;
133*a9643ea8Slogwang         }
134*a9643ea8Slogwang 
135*a9643ea8Slogwang         sock = net_handler->CreateSocket();
136*a9643ea8Slogwang         if (sock < 0)
137*a9643ea8Slogwang         {
138*a9643ea8Slogwang             action->SetErrno(ERR_SOCKET_FAIL);
139*a9643ea8Slogwang             MTLOG_ERROR("Get sock data failed, ret %d, errno %d!!", sock, errno);
140*a9643ea8Slogwang             return -3;
141*a9643ea8Slogwang         }
142*a9643ea8Slogwang         has_ok = 1;
143*a9643ea8Slogwang 
144*a9643ea8Slogwang         if (action->GetProtoType() == MT_UDP)
145*a9643ea8Slogwang         {
146*a9643ea8Slogwang             action->SetMsgFlag(MULTI_FLAG_OPEN);
147*a9643ea8Slogwang         }
148*a9643ea8Slogwang         else
149*a9643ea8Slogwang         {
150*a9643ea8Slogwang             action->SetMsgFlag(MULTI_FLAG_INIT);
151*a9643ea8Slogwang         }
152*a9643ea8Slogwang     }
153*a9643ea8Slogwang 
154*a9643ea8Slogwang     if (has_ok)
155*a9643ea8Slogwang     {
156*a9643ea8Slogwang         return 0;
157*a9643ea8Slogwang     }
158*a9643ea8Slogwang     else
159*a9643ea8Slogwang     {
160*a9643ea8Slogwang         return -4;
161*a9643ea8Slogwang     }
162*a9643ea8Slogwang }
163*a9643ea8Slogwang 
164*a9643ea8Slogwang 
165*a9643ea8Slogwang /**
166*a9643ea8Slogwang  * @brief ��·IO�Ĵ���, ������
167*a9643ea8Slogwang  * @param req_list - �����б�
168*a9643ea8Slogwang  * @param timeout - ��ʱʱ�� ���뵥λ
169*a9643ea8Slogwang  * @return 0 �ɹ�, <0ʧ��
170*a9643ea8Slogwang  */
171*a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_open(IMtActList& req_list, int timeout)
172*a9643ea8Slogwang {
173*a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
174*a9643ea8Slogwang     utime64_t end_ms = start_ms + timeout;
175*a9643ea8Slogwang     utime64_t curr_ms = 0;
176*a9643ea8Slogwang 
177*a9643ea8Slogwang     int ret = 0, has_open = 0;
178*a9643ea8Slogwang     IMtAction* action = NULL;
179*a9643ea8Slogwang     IMtConnection* net_handler = NULL;
180*a9643ea8Slogwang     IMtActList::iterator it;
181*a9643ea8Slogwang 
182*a9643ea8Slogwang     while (1)
183*a9643ea8Slogwang     {
184*a9643ea8Slogwang         IMtActList wait_list;
185*a9643ea8Slogwang         for (it = req_list.begin(); it != req_list.end(); ++it)
186*a9643ea8Slogwang         {
187*a9643ea8Slogwang             action = *it;
188*a9643ea8Slogwang             if (action->GetErrno() != ERR_NONE) {
189*a9643ea8Slogwang                 continue;
190*a9643ea8Slogwang             }
191*a9643ea8Slogwang 
192*a9643ea8Slogwang             if (action->GetMsgFlag() == MULTI_FLAG_OPEN) {
193*a9643ea8Slogwang                 has_open = 1;
194*a9643ea8Slogwang                 continue;
195*a9643ea8Slogwang             }
196*a9643ea8Slogwang 
197*a9643ea8Slogwang             net_handler = action->GetIConnection();
198*a9643ea8Slogwang             if (NULL == net_handler)
199*a9643ea8Slogwang             {
200*a9643ea8Slogwang                 action->SetErrno(ERR_FRAME_ERROR);
201*a9643ea8Slogwang                 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
202*a9643ea8Slogwang                 return -1;
203*a9643ea8Slogwang             }
204*a9643ea8Slogwang 
205*a9643ea8Slogwang             ret = net_handler->OpenCnnect();
206*a9643ea8Slogwang             if (ret < 0)
207*a9643ea8Slogwang             {
208*a9643ea8Slogwang                 wait_list.push_back(action);
209*a9643ea8Slogwang             }
210*a9643ea8Slogwang             else
211*a9643ea8Slogwang             {
212*a9643ea8Slogwang                 action->SetMsgFlag(MULTI_FLAG_OPEN);
213*a9643ea8Slogwang             }
214*a9643ea8Slogwang         }
215*a9643ea8Slogwang 
216*a9643ea8Slogwang         curr_ms = MtFrame::Instance()->GetLastClock();
217*a9643ea8Slogwang         if (curr_ms > end_ms)
218*a9643ea8Slogwang         {
219*a9643ea8Slogwang             MTLOG_DEBUG("Open connect timeout, errno %d", errno);
220*a9643ea8Slogwang             for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
221*a9643ea8Slogwang             {
222*a9643ea8Slogwang                 (*it)->SetErrno(ERR_CONNECT_FAIL);
223*a9643ea8Slogwang             }
224*a9643ea8Slogwang 
225*a9643ea8Slogwang             if (!has_open)
226*a9643ea8Slogwang             {
227*a9643ea8Slogwang                 return 0;
228*a9643ea8Slogwang             }
229*a9643ea8Slogwang             else
230*a9643ea8Slogwang             {
231*a9643ea8Slogwang                 return -2;
232*a9643ea8Slogwang             }
233*a9643ea8Slogwang         }
234*a9643ea8Slogwang 
235*a9643ea8Slogwang         if (!wait_list.empty())
236*a9643ea8Slogwang         {
237*a9643ea8Slogwang             mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms);
238*a9643ea8Slogwang         }
239*a9643ea8Slogwang         else
240*a9643ea8Slogwang         {
241*a9643ea8Slogwang             return 0;
242*a9643ea8Slogwang         }
243*a9643ea8Slogwang     }
244*a9643ea8Slogwang 
245*a9643ea8Slogwang }
246*a9643ea8Slogwang 
247*a9643ea8Slogwang 
248*a9643ea8Slogwang /**
249*a9643ea8Slogwang  * @brief ��·IO�Ĵ���, ��������
250*a9643ea8Slogwang  * @param req_list - �����б�
251*a9643ea8Slogwang  * @param timeout - ��ʱʱ�� ���뵥λ
252*a9643ea8Slogwang  * @return 0 �ɹ�, <0ʧ��
253*a9643ea8Slogwang  */
254*a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_sendto(IMtActList& req_list, int timeout)
255*a9643ea8Slogwang {
256*a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
257*a9643ea8Slogwang     utime64_t end_ms = start_ms + timeout;
258*a9643ea8Slogwang     utime64_t curr_ms = 0;
259*a9643ea8Slogwang 
260*a9643ea8Slogwang     int ret = 0, has_send = 0;
261*a9643ea8Slogwang     IMtAction* action = NULL;
262*a9643ea8Slogwang     IMtConnection* net_handler = NULL;
263*a9643ea8Slogwang 
264*a9643ea8Slogwang     while (1)
265*a9643ea8Slogwang     {
266*a9643ea8Slogwang         IMtActList wait_list;
267*a9643ea8Slogwang         for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
268*a9643ea8Slogwang         {
269*a9643ea8Slogwang             action = *it;
270*a9643ea8Slogwang             if (action->GetErrno() != ERR_NONE) {
271*a9643ea8Slogwang                 continue;
272*a9643ea8Slogwang             }
273*a9643ea8Slogwang 
274*a9643ea8Slogwang             if (action->GetMsgFlag() == MULTI_FLAG_SEND) {
275*a9643ea8Slogwang                 has_send = 1;
276*a9643ea8Slogwang                 continue;
277*a9643ea8Slogwang             }
278*a9643ea8Slogwang 
279*a9643ea8Slogwang             net_handler = action->GetIConnection();
280*a9643ea8Slogwang             if (NULL == net_handler)
281*a9643ea8Slogwang             {
282*a9643ea8Slogwang                 action->SetErrno(ERR_FRAME_ERROR);
283*a9643ea8Slogwang                 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
284*a9643ea8Slogwang                 return -2;
285*a9643ea8Slogwang             }
286*a9643ea8Slogwang 
287*a9643ea8Slogwang             // 0 -��Ҫ��������; -1 ֹͣ����; > 0 ����OK
288*a9643ea8Slogwang             ret = net_handler->SendData();
289*a9643ea8Slogwang             if (ret == -1)
290*a9643ea8Slogwang             {
291*a9643ea8Slogwang                 action->SetErrno(ERR_SEND_FAIL);
292*a9643ea8Slogwang                 MTLOG_ERROR("MultiItem msg send error, %d", errno);
293*a9643ea8Slogwang                 continue;
294*a9643ea8Slogwang             }
295*a9643ea8Slogwang             else if (ret == 0)
296*a9643ea8Slogwang             {
297*a9643ea8Slogwang                 wait_list.push_back(action);
298*a9643ea8Slogwang                 continue;
299*a9643ea8Slogwang             }
300*a9643ea8Slogwang             else
301*a9643ea8Slogwang             {
302*a9643ea8Slogwang                 action->SetMsgFlag(MULTI_FLAG_SEND);
303*a9643ea8Slogwang             }
304*a9643ea8Slogwang         }
305*a9643ea8Slogwang 
306*a9643ea8Slogwang         curr_ms = MtFrame::Instance()->GetLastClock();
307*a9643ea8Slogwang         if (curr_ms > end_ms)
308*a9643ea8Slogwang         {
309*a9643ea8Slogwang             MTLOG_DEBUG("send data timeout");
310*a9643ea8Slogwang             for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
311*a9643ea8Slogwang             {
312*a9643ea8Slogwang                 (*it)->SetErrno(ERR_SEND_FAIL);
313*a9643ea8Slogwang             }
314*a9643ea8Slogwang 
315*a9643ea8Slogwang             if (has_send)
316*a9643ea8Slogwang             {
317*a9643ea8Slogwang                 return 0;
318*a9643ea8Slogwang             }
319*a9643ea8Slogwang             else
320*a9643ea8Slogwang             {
321*a9643ea8Slogwang                 return -5;
322*a9643ea8Slogwang             }
323*a9643ea8Slogwang         }
324*a9643ea8Slogwang 
325*a9643ea8Slogwang         if (!wait_list.empty())
326*a9643ea8Slogwang         {
327*a9643ea8Slogwang             mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms);
328*a9643ea8Slogwang         }
329*a9643ea8Slogwang         else
330*a9643ea8Slogwang         {
331*a9643ea8Slogwang             return 0;
332*a9643ea8Slogwang         }
333*a9643ea8Slogwang     }
334*a9643ea8Slogwang 
335*a9643ea8Slogwang     return 0;
336*a9643ea8Slogwang }
337*a9643ea8Slogwang 
338*a9643ea8Slogwang 
339*a9643ea8Slogwang 
340*a9643ea8Slogwang /**
341*a9643ea8Slogwang  * @brief ��·IO�������մ���
342*a9643ea8Slogwang  */
343*a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_recvfrom(IMtActList& req_list, int timeout)
344*a9643ea8Slogwang {
345*a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
346*a9643ea8Slogwang     utime64_t end_ms = start_ms + timeout;
347*a9643ea8Slogwang     utime64_t curr_ms = 0;
348*a9643ea8Slogwang 
349*a9643ea8Slogwang     int ret = 0;
350*a9643ea8Slogwang     IMtAction* action = NULL;
351*a9643ea8Slogwang     IMtConnection* net_handler = NULL;
352*a9643ea8Slogwang 
353*a9643ea8Slogwang     while (1)
354*a9643ea8Slogwang     {
355*a9643ea8Slogwang         IMtActList wait_list;
356*a9643ea8Slogwang         for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
357*a9643ea8Slogwang         {
358*a9643ea8Slogwang             action = *it;
359*a9643ea8Slogwang             if (action->GetErrno() != ERR_NONE) {
360*a9643ea8Slogwang                 continue;
361*a9643ea8Slogwang             }
362*a9643ea8Slogwang 
363*a9643ea8Slogwang             if (MULTI_FLAG_FIN == action->GetMsgFlag()) ///< �Ѵ������
364*a9643ea8Slogwang             {
365*a9643ea8Slogwang                 continue;
366*a9643ea8Slogwang             }
367*a9643ea8Slogwang 
368*a9643ea8Slogwang             net_handler = action->GetIConnection();
369*a9643ea8Slogwang             if (NULL == net_handler)
370*a9643ea8Slogwang             {
371*a9643ea8Slogwang                 action->SetErrno(ERR_FRAME_ERROR);
372*a9643ea8Slogwang                 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
373*a9643ea8Slogwang                 return -2;
374*a9643ea8Slogwang             }
375*a9643ea8Slogwang 
376*a9643ea8Slogwang             // <0 ʧ��, 0 ������, >0 �ɹ�
377*a9643ea8Slogwang             ret = net_handler->RecvData();
378*a9643ea8Slogwang             if (ret < 0)
379*a9643ea8Slogwang             {
380*a9643ea8Slogwang                 action->SetErrno(ERR_RECV_FAIL);
381*a9643ea8Slogwang                 MTLOG_ERROR("MultiItem msg recv failed: %p", net_handler);
382*a9643ea8Slogwang                 continue;
383*a9643ea8Slogwang             }
384*a9643ea8Slogwang             else if (ret == 0)
385*a9643ea8Slogwang             {
386*a9643ea8Slogwang                 wait_list.push_back(action);
387*a9643ea8Slogwang                 continue;
388*a9643ea8Slogwang             }
389*a9643ea8Slogwang             else
390*a9643ea8Slogwang             {
391*a9643ea8Slogwang                 action->SetMsgFlag(MULTI_FLAG_FIN);
392*a9643ea8Slogwang                 action->SetCost(MtFrame::Instance()->GetLastClock() - start_ms);
393*a9643ea8Slogwang             }
394*a9643ea8Slogwang         }
395*a9643ea8Slogwang 
396*a9643ea8Slogwang         curr_ms = MtFrame::Instance()->GetLastClock();
397*a9643ea8Slogwang         if (curr_ms > end_ms)
398*a9643ea8Slogwang         {
399*a9643ea8Slogwang             MTLOG_DEBUG("Recv data timeout, curr %llu, start: %llu", curr_ms, start_ms);
400*a9643ea8Slogwang             for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
401*a9643ea8Slogwang             {
402*a9643ea8Slogwang                 (*it)->SetErrno(ERR_RECV_TIMEOUT);
403*a9643ea8Slogwang             }
404*a9643ea8Slogwang             return -5;
405*a9643ea8Slogwang         }
406*a9643ea8Slogwang 
407*a9643ea8Slogwang         if (!wait_list.empty())
408*a9643ea8Slogwang         {
409*a9643ea8Slogwang             mt_multi_netfd_poll(wait_list, KQ_EVENT_READ, end_ms - curr_ms);
410*a9643ea8Slogwang         }
411*a9643ea8Slogwang         else
412*a9643ea8Slogwang         {
413*a9643ea8Slogwang             return 0;
414*a9643ea8Slogwang         }
415*a9643ea8Slogwang     }
416*a9643ea8Slogwang }
417*a9643ea8Slogwang 
418*a9643ea8Slogwang /**
419*a9643ea8Slogwang  * @brief ��·IO�������մ���
420*a9643ea8Slogwang  */
421*a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_sendrcv_ex(IMtActList& req_list, int timeout)
422*a9643ea8Slogwang {
423*a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
424*a9643ea8Slogwang     utime64_t curr_ms = 0;
425*a9643ea8Slogwang 
426*a9643ea8Slogwang     int rc = mt_multi_newsock(req_list); // TODO, ����ȡconnect��ʱʱ���
427*a9643ea8Slogwang     if (rc < 0)
428*a9643ea8Slogwang     {
429*a9643ea8Slogwang         MT_ATTR_API(320842, 1); // socketʧ��
430*a9643ea8Slogwang         MTLOG_ERROR("mt_multi_sendrcv new sock failed, ret: %d", rc);
431*a9643ea8Slogwang         return -1;
432*a9643ea8Slogwang     }
433*a9643ea8Slogwang 
434*a9643ea8Slogwang     rc = mt_multi_open(req_list, timeout);
435*a9643ea8Slogwang     if (rc < 0)
436*a9643ea8Slogwang     {
437*a9643ea8Slogwang         MT_ATTR_API(320843, 1); // connectʧ��
438*a9643ea8Slogwang         MTLOG_ERROR("mt_multi_sendrcv open failed, ret: %d", rc);
439*a9643ea8Slogwang         return -2;
440*a9643ea8Slogwang     }
441*a9643ea8Slogwang 
442*a9643ea8Slogwang     curr_ms = MtFrame::Instance()->GetLastClock();
443*a9643ea8Slogwang     rc = mt_multi_sendto(req_list, timeout - (curr_ms - start_ms));
444*a9643ea8Slogwang     if (rc < 0)
445*a9643ea8Slogwang     {
446*a9643ea8Slogwang         MT_ATTR_API(320844, 1); // ����ʧ��
447*a9643ea8Slogwang         MTLOG_ERROR("mt_multi_sendrcv send failed, ret: %d", rc);
448*a9643ea8Slogwang         return -3;
449*a9643ea8Slogwang     }
450*a9643ea8Slogwang 
451*a9643ea8Slogwang     curr_ms = MtFrame::Instance()->GetLastClock();
452*a9643ea8Slogwang     rc = mt_multi_recvfrom(req_list, timeout - (curr_ms - start_ms));
453*a9643ea8Slogwang     if (rc < 0)
454*a9643ea8Slogwang     {
455*a9643ea8Slogwang         MT_ATTR_API(320845, 1); // ����δ��ȫ�ɹ�
456*a9643ea8Slogwang         MTLOG_ERROR("mt_multi_sendrcv recv failed, ret: %d", rc);
457*a9643ea8Slogwang         return -4;
458*a9643ea8Slogwang     }
459*a9643ea8Slogwang 
460*a9643ea8Slogwang     return 0;
461*a9643ea8Slogwang }
462*a9643ea8Slogwang 
463*a9643ea8Slogwang 
464*a9643ea8Slogwang /**
465*a9643ea8Slogwang  * @brief ��·IO�������մ���ӿ�, ��װACTON�ӿ�ģ��, �ڲ�����msg
466*a9643ea8Slogwang  * @param req_list -action list ʵ�ַ�װ�����ӿ�
467*a9643ea8Slogwang  * @param timeout -��ʱʱ��, ��λms
468*a9643ea8Slogwang  * @return  0 �ɹ�, -1 ��ʼ������ʧ��, �����ɹ����ֳɹ�
469*a9643ea8Slogwang  */
470*a9643ea8Slogwang int NS_MICRO_THREAD::mt_msg_sendrcv(IMtActList& req_list, int timeout)
471*a9643ea8Slogwang {
472*a9643ea8Slogwang     int iRet = 0;
473*a9643ea8Slogwang 
474*a9643ea8Slogwang     // ��һ��, ��ʼ��action����, ��װ������
475*a9643ea8Slogwang     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
476*a9643ea8Slogwang     {
477*a9643ea8Slogwang         IMtAction* pAction = *it;
478*a9643ea8Slogwang         if (!pAction || pAction->InitConnEnv() < 0)
479*a9643ea8Slogwang         {
480*a9643ea8Slogwang             MTLOG_ERROR("invalid action(%p) or int failed, error", pAction);
481*a9643ea8Slogwang             return -1;
482*a9643ea8Slogwang         }
483*a9643ea8Slogwang 
484*a9643ea8Slogwang         iRet = pAction->DoEncode();
485*a9643ea8Slogwang         if (iRet < 0)
486*a9643ea8Slogwang         {
487*a9643ea8Slogwang             pAction->SetErrno(ERR_ENCODE_ERROR);
488*a9643ea8Slogwang             MTLOG_ERROR("pack action pkg failed, act %p, ret %d", pAction, iRet);
489*a9643ea8Slogwang             continue;
490*a9643ea8Slogwang         }
491*a9643ea8Slogwang 
492*a9643ea8Slogwang     }
493*a9643ea8Slogwang 
494*a9643ea8Slogwang     // �ڶ���, ͬ���շ���Ϣ, ʧ��Ҳ��Ҫ֪ͨ����
495*a9643ea8Slogwang     mt_multi_sendrcv_ex(req_list, timeout);
496*a9643ea8Slogwang 
497*a9643ea8Slogwang     // ������, ͬ��֪ͨ�������
498*a9643ea8Slogwang     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
499*a9643ea8Slogwang     {
500*a9643ea8Slogwang         IMtAction* pAction = *it;
501*a9643ea8Slogwang 
502*a9643ea8Slogwang         if (pAction->GetMsgFlag() != MULTI_FLAG_FIN)
503*a9643ea8Slogwang         {
504*a9643ea8Slogwang             pAction->DoError();
505*a9643ea8Slogwang             MTLOG_DEBUG("send recv failed: %d", pAction->GetErrno());
506*a9643ea8Slogwang             continue;
507*a9643ea8Slogwang         }
508*a9643ea8Slogwang 
509*a9643ea8Slogwang         iRet = pAction->DoProcess();
510*a9643ea8Slogwang         if (iRet < 0)
511*a9643ea8Slogwang         {
512*a9643ea8Slogwang             MTLOG_DEBUG("action process failed: %d", iRet);
513*a9643ea8Slogwang             continue;
514*a9643ea8Slogwang         }
515*a9643ea8Slogwang     }
516*a9643ea8Slogwang 
517*a9643ea8Slogwang     // ���IJ�, �������ڲ���Դ, ���ݸ����÷�
518*a9643ea8Slogwang     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
519*a9643ea8Slogwang     {
520*a9643ea8Slogwang         IMtAction* pAction = *it;
521*a9643ea8Slogwang         pAction->Reset();
522*a9643ea8Slogwang     }
523*a9643ea8Slogwang 
524*a9643ea8Slogwang     return 0;
525*a9643ea8Slogwang }
526*a9643ea8Slogwang 
527*a9643ea8Slogwang 
528*a9643ea8Slogwang 
529