1a9643ea8Slogwang 
2a9643ea8Slogwang /**
3a9643ea8Slogwang  * Tencent is pleased to support the open source community by making MSEC available.
4a9643ea8Slogwang  *
5a9643ea8Slogwang  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6a9643ea8Slogwang  *
7a9643ea8Slogwang  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8a9643ea8Slogwang  * you may not use this file except in compliance with the License. You may
9a9643ea8Slogwang  * obtain a copy of the License at
10a9643ea8Slogwang  *
11a9643ea8Slogwang  *     https://opensource.org/licenses/GPL-2.0
12a9643ea8Slogwang  *
13a9643ea8Slogwang  * Unless required by applicable law or agreed to in writing, software distributed under the
14a9643ea8Slogwang  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15a9643ea8Slogwang  * either express or implied. See the License for the specific language governing permissions
16a9643ea8Slogwang  * and limitations under the License.
17a9643ea8Slogwang  */
18a9643ea8Slogwang 
19a9643ea8Slogwang 
20a9643ea8Slogwang /**
21a9643ea8Slogwang  *  @file mt_concurrent.c
22a9643ea8Slogwang  *  @time 20130924
23a9643ea8Slogwang  **/
24a9643ea8Slogwang 
25a9643ea8Slogwang #include "micro_thread.h"
26a9643ea8Slogwang #include "mt_msg.h"
27a9643ea8Slogwang #include "mt_notify.h"
28a9643ea8Slogwang #include "mt_connection.h"
29a9643ea8Slogwang #include "mt_concurrent.h"
30a9643ea8Slogwang 
31a9643ea8Slogwang using namespace std;
32a9643ea8Slogwang using namespace NS_MICRO_THREAD;
33a9643ea8Slogwang 
mt_multi_netfd_poll(IMtActList & req_list,int how,int timeout)34a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_netfd_poll(IMtActList& req_list, int how, int timeout)
35a9643ea8Slogwang {
36a9643ea8Slogwang     KqObjList fdlist;
37a9643ea8Slogwang     TAILQ_INIT(&fdlist);
38a9643ea8Slogwang 
39a9643ea8Slogwang     KqueuerObj* obj = NULL;
40a9643ea8Slogwang     IMtAction* action = NULL;
41a9643ea8Slogwang     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
42a9643ea8Slogwang     {
43a9643ea8Slogwang         action = *it;
44a9643ea8Slogwang         if (action) {
45a9643ea8Slogwang             obj = action->GetNtfyObj();
46a9643ea8Slogwang         }
47a9643ea8Slogwang         if (!action || !obj)
48a9643ea8Slogwang         {
49a9643ea8Slogwang             action->SetErrno(ERR_FRAME_ERROR);
50a9643ea8Slogwang             MTLOG_ERROR("input action %p, or ntify null, error", action);
51a9643ea8Slogwang             return -1;
52a9643ea8Slogwang         }
53a9643ea8Slogwang 
54a9643ea8Slogwang         obj->SetRcvEvents(0);
55a9643ea8Slogwang         if (how & KQ_EVENT_READ)
56a9643ea8Slogwang         {
57a9643ea8Slogwang             obj->EnableInput();
58a9643ea8Slogwang         }
59a9643ea8Slogwang         else
60a9643ea8Slogwang         {
61a9643ea8Slogwang             obj->DisableInput();
62a9643ea8Slogwang         }
63a9643ea8Slogwang 
64a9643ea8Slogwang         if (how & KQ_EVENT_WRITE)
65a9643ea8Slogwang         {
66a9643ea8Slogwang             obj->EnableOutput();
67a9643ea8Slogwang         }
68a9643ea8Slogwang         else
69a9643ea8Slogwang         {
70a9643ea8Slogwang             obj->DisableOutput();
71a9643ea8Slogwang         }
72a9643ea8Slogwang 
73a9643ea8Slogwang         TAILQ_INSERT_TAIL(&fdlist, obj, _entry);
74a9643ea8Slogwang 
75a9643ea8Slogwang     }
76a9643ea8Slogwang 
77a9643ea8Slogwang     MtFrame* mtframe = MtFrame::Instance();
78a9643ea8Slogwang     if (!mtframe || !mtframe->KqueueSchedule(&fdlist, NULL, (int)timeout))
79a9643ea8Slogwang     {
80a9643ea8Slogwang         if (errno != ETIME)
81a9643ea8Slogwang         {
82a9643ea8Slogwang             action->SetErrno(ERR_KQUEUE_FAIL);
83a9643ea8Slogwang             MTLOG_ERROR("Mtframe %p, epoll schedule failed, errno %d", mtframe, errno);
84a9643ea8Slogwang             return -2;
85a9643ea8Slogwang         }
86a9643ea8Slogwang 
87a9643ea8Slogwang         return -3;
88a9643ea8Slogwang     }
89a9643ea8Slogwang 
90a9643ea8Slogwang     return 0;
91a9643ea8Slogwang }
92a9643ea8Slogwang 
mt_multi_newsock(IMtActList & req_list)93a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_newsock(IMtActList& req_list)
94a9643ea8Slogwang {
95a9643ea8Slogwang     int sock = -1, has_ok = 0;
96a9643ea8Slogwang     IMtAction* action = NULL;
97a9643ea8Slogwang     IMtConnection* net_handler = NULL;
98a9643ea8Slogwang 
99a9643ea8Slogwang     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
100a9643ea8Slogwang     {
101a9643ea8Slogwang         action = *it;
102a9643ea8Slogwang         if (NULL == action)
103a9643ea8Slogwang         {
104a9643ea8Slogwang             action->SetErrno(ERR_PARAM_ERROR);
105a9643ea8Slogwang             MTLOG_ERROR("Invalid param, conn %p null!!", action);
106a9643ea8Slogwang             return -1;
107a9643ea8Slogwang         }
108a9643ea8Slogwang 
109a9643ea8Slogwang         if (action->GetErrno() != ERR_NONE) {
110a9643ea8Slogwang             continue;
111a9643ea8Slogwang         }
112a9643ea8Slogwang 
113a9643ea8Slogwang         net_handler = action->GetIConnection();
114a9643ea8Slogwang         if (NULL == net_handler)
115a9643ea8Slogwang         {
116a9643ea8Slogwang             action->SetErrno(ERR_FRAME_ERROR);
117a9643ea8Slogwang             MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
118a9643ea8Slogwang             return -2;
119a9643ea8Slogwang         }
120a9643ea8Slogwang 
121a9643ea8Slogwang         sock = net_handler->CreateSocket();
122a9643ea8Slogwang         if (sock < 0)
123a9643ea8Slogwang         {
124a9643ea8Slogwang             action->SetErrno(ERR_SOCKET_FAIL);
125a9643ea8Slogwang             MTLOG_ERROR("Get sock data failed, ret %d, errno %d!!", sock, errno);
126a9643ea8Slogwang             return -3;
127a9643ea8Slogwang         }
128a9643ea8Slogwang         has_ok = 1;
129a9643ea8Slogwang 
130a9643ea8Slogwang         if (action->GetProtoType() == MT_UDP)
131a9643ea8Slogwang         {
132a9643ea8Slogwang             action->SetMsgFlag(MULTI_FLAG_OPEN);
133a9643ea8Slogwang         }
134a9643ea8Slogwang         else
135a9643ea8Slogwang         {
136a9643ea8Slogwang             action->SetMsgFlag(MULTI_FLAG_INIT);
137a9643ea8Slogwang         }
138a9643ea8Slogwang     }
139a9643ea8Slogwang 
140a9643ea8Slogwang     if (has_ok)
141a9643ea8Slogwang     {
142a9643ea8Slogwang         return 0;
143a9643ea8Slogwang     }
144a9643ea8Slogwang     else
145a9643ea8Slogwang     {
146a9643ea8Slogwang         return -4;
147a9643ea8Slogwang     }
148a9643ea8Slogwang }
149a9643ea8Slogwang 
mt_multi_open(IMtActList & req_list,int timeout)150a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_open(IMtActList& req_list, int timeout)
151a9643ea8Slogwang {
152a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
153a9643ea8Slogwang     utime64_t end_ms = start_ms + timeout;
154a9643ea8Slogwang     utime64_t curr_ms = 0;
155a9643ea8Slogwang 
156a9643ea8Slogwang     int ret = 0, has_open = 0;
157a9643ea8Slogwang     IMtAction* action = NULL;
158a9643ea8Slogwang     IMtConnection* net_handler = NULL;
159a9643ea8Slogwang     IMtActList::iterator it;
160a9643ea8Slogwang 
161a9643ea8Slogwang     while (1)
162a9643ea8Slogwang     {
163a9643ea8Slogwang         IMtActList wait_list;
164a9643ea8Slogwang         for (it = req_list.begin(); it != req_list.end(); ++it)
165a9643ea8Slogwang         {
166a9643ea8Slogwang             action = *it;
167a9643ea8Slogwang             if (action->GetErrno() != ERR_NONE) {
168a9643ea8Slogwang                 continue;
169a9643ea8Slogwang             }
170a9643ea8Slogwang 
171a9643ea8Slogwang             if (action->GetMsgFlag() == MULTI_FLAG_OPEN) {
172a9643ea8Slogwang                 has_open = 1;
173a9643ea8Slogwang                 continue;
174a9643ea8Slogwang             }
175a9643ea8Slogwang 
176a9643ea8Slogwang             net_handler = action->GetIConnection();
177a9643ea8Slogwang             if (NULL == net_handler)
178a9643ea8Slogwang             {
179a9643ea8Slogwang                 action->SetErrno(ERR_FRAME_ERROR);
180a9643ea8Slogwang                 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
181a9643ea8Slogwang                 return -1;
182a9643ea8Slogwang             }
183a9643ea8Slogwang 
184a9643ea8Slogwang             ret = net_handler->OpenCnnect();
185a9643ea8Slogwang             if (ret < 0)
186a9643ea8Slogwang             {
187a9643ea8Slogwang                 wait_list.push_back(action);
188a9643ea8Slogwang             }
189a9643ea8Slogwang             else
190a9643ea8Slogwang             {
191a9643ea8Slogwang                 action->SetMsgFlag(MULTI_FLAG_OPEN);
192a9643ea8Slogwang             }
193a9643ea8Slogwang         }
194a9643ea8Slogwang 
195a9643ea8Slogwang         curr_ms = MtFrame::Instance()->GetLastClock();
196a9643ea8Slogwang         if (curr_ms > end_ms)
197a9643ea8Slogwang         {
198a9643ea8Slogwang             MTLOG_DEBUG("Open connect timeout, errno %d", errno);
199a9643ea8Slogwang             for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
200a9643ea8Slogwang             {
201a9643ea8Slogwang                 (*it)->SetErrno(ERR_CONNECT_FAIL);
202a9643ea8Slogwang             }
203a9643ea8Slogwang 
204a9643ea8Slogwang             if (!has_open)
205a9643ea8Slogwang             {
206a9643ea8Slogwang                 return 0;
207a9643ea8Slogwang             }
208a9643ea8Slogwang             else
209a9643ea8Slogwang             {
210a9643ea8Slogwang                 return -2;
211a9643ea8Slogwang             }
212a9643ea8Slogwang         }
213a9643ea8Slogwang 
214a9643ea8Slogwang         if (!wait_list.empty())
215a9643ea8Slogwang         {
216a9643ea8Slogwang             mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms);
217a9643ea8Slogwang         }
218a9643ea8Slogwang         else
219a9643ea8Slogwang         {
220a9643ea8Slogwang             return 0;
221a9643ea8Slogwang         }
222a9643ea8Slogwang     }
223a9643ea8Slogwang 
224a9643ea8Slogwang }
225a9643ea8Slogwang 
mt_multi_sendto(IMtActList & req_list,int timeout)226a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_sendto(IMtActList& req_list, int timeout)
227a9643ea8Slogwang {
228a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
229a9643ea8Slogwang     utime64_t end_ms = start_ms + timeout;
230a9643ea8Slogwang     utime64_t curr_ms = 0;
231a9643ea8Slogwang 
232a9643ea8Slogwang     int ret = 0, has_send = 0;
233a9643ea8Slogwang     IMtAction* action = NULL;
234a9643ea8Slogwang     IMtConnection* net_handler = NULL;
235a9643ea8Slogwang 
236a9643ea8Slogwang     while (1)
237a9643ea8Slogwang     {
238a9643ea8Slogwang         IMtActList wait_list;
239a9643ea8Slogwang         for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
240a9643ea8Slogwang         {
241a9643ea8Slogwang             action = *it;
242a9643ea8Slogwang             if (action->GetErrno() != ERR_NONE) {
243a9643ea8Slogwang                 continue;
244a9643ea8Slogwang             }
245a9643ea8Slogwang 
246a9643ea8Slogwang             if (action->GetMsgFlag() == MULTI_FLAG_SEND) {
247a9643ea8Slogwang                 has_send = 1;
248a9643ea8Slogwang                 continue;
249a9643ea8Slogwang             }
250a9643ea8Slogwang 
251a9643ea8Slogwang             net_handler = action->GetIConnection();
252a9643ea8Slogwang             if (NULL == net_handler)
253a9643ea8Slogwang             {
254a9643ea8Slogwang                 action->SetErrno(ERR_FRAME_ERROR);
255a9643ea8Slogwang                 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
256a9643ea8Slogwang                 return -2;
257a9643ea8Slogwang             }
258a9643ea8Slogwang 
259a9643ea8Slogwang             ret = net_handler->SendData();
260a9643ea8Slogwang             if (ret == -1)
261a9643ea8Slogwang             {
262a9643ea8Slogwang                 action->SetErrno(ERR_SEND_FAIL);
263a9643ea8Slogwang                 MTLOG_ERROR("MultiItem msg send error, %d", errno);
264a9643ea8Slogwang                 continue;
265a9643ea8Slogwang             }
266a9643ea8Slogwang             else if (ret == 0)
267a9643ea8Slogwang             {
268a9643ea8Slogwang                 wait_list.push_back(action);
269a9643ea8Slogwang                 continue;
270a9643ea8Slogwang             }
271a9643ea8Slogwang             else
272a9643ea8Slogwang             {
273a9643ea8Slogwang                 action->SetMsgFlag(MULTI_FLAG_SEND);
274a9643ea8Slogwang             }
275a9643ea8Slogwang         }
276a9643ea8Slogwang 
277a9643ea8Slogwang         curr_ms = MtFrame::Instance()->GetLastClock();
278a9643ea8Slogwang         if (curr_ms > end_ms)
279a9643ea8Slogwang         {
280a9643ea8Slogwang             MTLOG_DEBUG("send data timeout");
281a9643ea8Slogwang             for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
282a9643ea8Slogwang             {
283a9643ea8Slogwang                 (*it)->SetErrno(ERR_SEND_FAIL);
284a9643ea8Slogwang             }
285a9643ea8Slogwang 
286a9643ea8Slogwang             if (has_send)
287a9643ea8Slogwang             {
288a9643ea8Slogwang                 return 0;
289a9643ea8Slogwang             }
290a9643ea8Slogwang             else
291a9643ea8Slogwang             {
292a9643ea8Slogwang                 return -5;
293a9643ea8Slogwang             }
294a9643ea8Slogwang         }
295a9643ea8Slogwang 
296a9643ea8Slogwang         if (!wait_list.empty())
297a9643ea8Slogwang         {
298a9643ea8Slogwang             mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms);
299a9643ea8Slogwang         }
300a9643ea8Slogwang         else
301a9643ea8Slogwang         {
302a9643ea8Slogwang             return 0;
303a9643ea8Slogwang         }
304a9643ea8Slogwang     }
305a9643ea8Slogwang 
306a9643ea8Slogwang     return 0;
307a9643ea8Slogwang }
308a9643ea8Slogwang 
mt_multi_recvfrom(IMtActList & req_list,int timeout)309a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_recvfrom(IMtActList& req_list, int timeout)
310a9643ea8Slogwang {
311a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
312a9643ea8Slogwang     utime64_t end_ms = start_ms + timeout;
313a9643ea8Slogwang     utime64_t curr_ms = 0;
314a9643ea8Slogwang 
315a9643ea8Slogwang     int ret = 0;
316a9643ea8Slogwang     IMtAction* action = NULL;
317a9643ea8Slogwang     IMtConnection* net_handler = NULL;
318a9643ea8Slogwang 
319a9643ea8Slogwang     while (1)
320a9643ea8Slogwang     {
321a9643ea8Slogwang         IMtActList wait_list;
322a9643ea8Slogwang         for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
323a9643ea8Slogwang         {
324a9643ea8Slogwang             action = *it;
325a9643ea8Slogwang             if (action->GetErrno() != ERR_NONE) {
326a9643ea8Slogwang                 continue;
327a9643ea8Slogwang             }
328a9643ea8Slogwang 
329*35a81399Slogwang             if (MULTI_FLAG_FIN == action->GetMsgFlag())
330a9643ea8Slogwang             {
331a9643ea8Slogwang                 continue;
332a9643ea8Slogwang             }
333a9643ea8Slogwang 
334a9643ea8Slogwang             net_handler = action->GetIConnection();
335a9643ea8Slogwang             if (NULL == net_handler)
336a9643ea8Slogwang             {
337a9643ea8Slogwang                 action->SetErrno(ERR_FRAME_ERROR);
338a9643ea8Slogwang                 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
339a9643ea8Slogwang                 return -2;
340a9643ea8Slogwang             }
341a9643ea8Slogwang 
342a9643ea8Slogwang             ret = net_handler->RecvData();
343a9643ea8Slogwang             if (ret < 0)
344a9643ea8Slogwang             {
345a9643ea8Slogwang                 action->SetErrno(ERR_RECV_FAIL);
346a9643ea8Slogwang                 MTLOG_ERROR("MultiItem msg recv failed: %p", net_handler);
347a9643ea8Slogwang                 continue;
348a9643ea8Slogwang             }
349a9643ea8Slogwang             else if (ret == 0)
350a9643ea8Slogwang             {
351a9643ea8Slogwang                 wait_list.push_back(action);
352a9643ea8Slogwang                 continue;
353a9643ea8Slogwang             }
354a9643ea8Slogwang             else
355a9643ea8Slogwang             {
356a9643ea8Slogwang                 action->SetMsgFlag(MULTI_FLAG_FIN);
357a9643ea8Slogwang                 action->SetCost(MtFrame::Instance()->GetLastClock() - start_ms);
358a9643ea8Slogwang             }
359a9643ea8Slogwang         }
360a9643ea8Slogwang 
361a9643ea8Slogwang         curr_ms = MtFrame::Instance()->GetLastClock();
362a9643ea8Slogwang         if (curr_ms > end_ms)
363a9643ea8Slogwang         {
364a9643ea8Slogwang             MTLOG_DEBUG("Recv data timeout, curr %llu, start: %llu", curr_ms, start_ms);
365a9643ea8Slogwang             for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
366a9643ea8Slogwang             {
367a9643ea8Slogwang                 (*it)->SetErrno(ERR_RECV_TIMEOUT);
368a9643ea8Slogwang             }
369a9643ea8Slogwang             return -5;
370a9643ea8Slogwang         }
371a9643ea8Slogwang 
372a9643ea8Slogwang         if (!wait_list.empty())
373a9643ea8Slogwang         {
374a9643ea8Slogwang             mt_multi_netfd_poll(wait_list, KQ_EVENT_READ, end_ms - curr_ms);
375a9643ea8Slogwang         }
376a9643ea8Slogwang         else
377a9643ea8Slogwang         {
378a9643ea8Slogwang             return 0;
379a9643ea8Slogwang         }
380a9643ea8Slogwang     }
381a9643ea8Slogwang }
382a9643ea8Slogwang 
mt_multi_sendrcv_ex(IMtActList & req_list,int timeout)383a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_sendrcv_ex(IMtActList& req_list, int timeout)
384a9643ea8Slogwang {
385a9643ea8Slogwang     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
386a9643ea8Slogwang     utime64_t curr_ms = 0;
387a9643ea8Slogwang 
388*35a81399Slogwang     int rc = mt_multi_newsock(req_list);
389a9643ea8Slogwang     if (rc < 0)
390a9643ea8Slogwang     {
391*35a81399Slogwang         MT_ATTR_API(320842, 1);
392a9643ea8Slogwang         MTLOG_ERROR("mt_multi_sendrcv new sock failed, ret: %d", rc);
393a9643ea8Slogwang         return -1;
394a9643ea8Slogwang     }
395a9643ea8Slogwang 
396a9643ea8Slogwang     rc = mt_multi_open(req_list, timeout);
397a9643ea8Slogwang     if (rc < 0)
398a9643ea8Slogwang     {
399*35a81399Slogwang         MT_ATTR_API(320843, 1);
400a9643ea8Slogwang         MTLOG_ERROR("mt_multi_sendrcv open failed, ret: %d", rc);
401a9643ea8Slogwang         return -2;
402a9643ea8Slogwang     }
403a9643ea8Slogwang 
404a9643ea8Slogwang     curr_ms = MtFrame::Instance()->GetLastClock();
405a9643ea8Slogwang     rc = mt_multi_sendto(req_list, timeout - (curr_ms - start_ms));
406a9643ea8Slogwang     if (rc < 0)
407a9643ea8Slogwang     {
408*35a81399Slogwang         MT_ATTR_API(320844, 1);
409a9643ea8Slogwang         MTLOG_ERROR("mt_multi_sendrcv send failed, ret: %d", rc);
410a9643ea8Slogwang         return -3;
411a9643ea8Slogwang     }
412a9643ea8Slogwang 
413a9643ea8Slogwang     curr_ms = MtFrame::Instance()->GetLastClock();
414a9643ea8Slogwang     rc = mt_multi_recvfrom(req_list, timeout - (curr_ms - start_ms));
415a9643ea8Slogwang     if (rc < 0)
416a9643ea8Slogwang     {
417*35a81399Slogwang         MT_ATTR_API(320845, 1);
418a9643ea8Slogwang         MTLOG_ERROR("mt_multi_sendrcv recv failed, ret: %d", rc);
419a9643ea8Slogwang         return -4;
420a9643ea8Slogwang     }
421a9643ea8Slogwang 
422a9643ea8Slogwang     return 0;
423a9643ea8Slogwang }
424a9643ea8Slogwang 
mt_msg_sendrcv(IMtActList & req_list,int timeout)425a9643ea8Slogwang int NS_MICRO_THREAD::mt_msg_sendrcv(IMtActList& req_list, int timeout)
426a9643ea8Slogwang {
427a9643ea8Slogwang     int iRet = 0;
428a9643ea8Slogwang 
429a9643ea8Slogwang     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
430a9643ea8Slogwang     {
431a9643ea8Slogwang         IMtAction* pAction = *it;
432a9643ea8Slogwang         if (!pAction || pAction->InitConnEnv() < 0)
433a9643ea8Slogwang         {
434a9643ea8Slogwang             MTLOG_ERROR("invalid action(%p) or int failed, error", pAction);
435a9643ea8Slogwang             return -1;
436a9643ea8Slogwang         }
437a9643ea8Slogwang 
438a9643ea8Slogwang         iRet = pAction->DoEncode();
439a9643ea8Slogwang         if (iRet < 0)
440a9643ea8Slogwang         {
441a9643ea8Slogwang             pAction->SetErrno(ERR_ENCODE_ERROR);
442a9643ea8Slogwang             MTLOG_ERROR("pack action pkg failed, act %p, ret %d", pAction, iRet);
443a9643ea8Slogwang             continue;
444a9643ea8Slogwang         }
445a9643ea8Slogwang 
446a9643ea8Slogwang     }
447a9643ea8Slogwang 
448a9643ea8Slogwang     mt_multi_sendrcv_ex(req_list, timeout);
449a9643ea8Slogwang 
450a9643ea8Slogwang     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
451a9643ea8Slogwang     {
452a9643ea8Slogwang         IMtAction* pAction = *it;
453a9643ea8Slogwang 
454a9643ea8Slogwang         if (pAction->GetMsgFlag() != MULTI_FLAG_FIN)
455a9643ea8Slogwang         {
456a9643ea8Slogwang             pAction->DoError();
457a9643ea8Slogwang             MTLOG_DEBUG("send recv failed: %d", pAction->GetErrno());
458a9643ea8Slogwang             continue;
459a9643ea8Slogwang         }
460a9643ea8Slogwang 
461a9643ea8Slogwang         iRet = pAction->DoProcess();
462a9643ea8Slogwang         if (iRet < 0)
463a9643ea8Slogwang         {
464a9643ea8Slogwang             MTLOG_DEBUG("action process failed: %d", iRet);
465a9643ea8Slogwang             continue;
466a9643ea8Slogwang         }
467a9643ea8Slogwang     }
468a9643ea8Slogwang 
469a9643ea8Slogwang     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
470a9643ea8Slogwang     {
471a9643ea8Slogwang         IMtAction* pAction = *it;
472a9643ea8Slogwang         pAction->Reset();
473a9643ea8Slogwang     }
474a9643ea8Slogwang 
475a9643ea8Slogwang     return 0;
476a9643ea8Slogwang }
477