1 
2 /**
3  * Tencent is pleased to support the open source community by making MSEC available.
4  *
5  * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6  *
7  * Licensed under the GNU General Public License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License. You may
9  * obtain a copy of the License at
10  *
11  *     https://opensource.org/licenses/GPL-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the
14  * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15  * either express or implied. See the License for the specific language governing permissions
16  * and limitations under the License.
17  */
18 
19 
20 /**
21  *  @file mt_concurrent.c
22  *  @time 20130924
23  **/
24 
25 #include "micro_thread.h"
26 #include "mt_msg.h"
27 #include "mt_notify.h"
28 #include "mt_connection.h"
29 #include "mt_concurrent.h"
30 
31 using namespace std;
32 using namespace NS_MICRO_THREAD;
33 
mt_multi_netfd_poll(IMtActList & req_list,int how,int timeout)34 int NS_MICRO_THREAD::mt_multi_netfd_poll(IMtActList& req_list, int how, int timeout)
35 {
36     KqObjList fdlist;
37     TAILQ_INIT(&fdlist);
38 
39     KqueuerObj* obj = NULL;
40     IMtAction* action = NULL;
41     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
42     {
43         action = *it;
44         if (action) {
45             obj = action->GetNtfyObj();
46         }
47         if (!action || !obj)
48         {
49             action->SetErrno(ERR_FRAME_ERROR);
50             MTLOG_ERROR("input action %p, or ntify null, error", action);
51             return -1;
52         }
53 
54         obj->SetRcvEvents(0);
55         if (how & KQ_EVENT_READ)
56         {
57             obj->EnableInput();
58         }
59         else
60         {
61             obj->DisableInput();
62         }
63 
64         if (how & KQ_EVENT_WRITE)
65         {
66             obj->EnableOutput();
67         }
68         else
69         {
70             obj->DisableOutput();
71         }
72 
73         TAILQ_INSERT_TAIL(&fdlist, obj, _entry);
74 
75     }
76 
77     MtFrame* mtframe = MtFrame::Instance();
78     if (!mtframe || !mtframe->KqueueSchedule(&fdlist, NULL, (int)timeout))
79     {
80         if (errno != ETIME)
81         {
82             action->SetErrno(ERR_KQUEUE_FAIL);
83             MTLOG_ERROR("Mtframe %p, epoll schedule failed, errno %d", mtframe, errno);
84             return -2;
85         }
86 
87         return -3;
88     }
89 
90     return 0;
91 }
92 
mt_multi_newsock(IMtActList & req_list)93 int NS_MICRO_THREAD::mt_multi_newsock(IMtActList& req_list)
94 {
95     int sock = -1, has_ok = 0;
96     IMtAction* action = NULL;
97     IMtConnection* net_handler = NULL;
98 
99     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
100     {
101         action = *it;
102         if (NULL == action)
103         {
104             action->SetErrno(ERR_PARAM_ERROR);
105             MTLOG_ERROR("Invalid param, conn %p null!!", action);
106             return -1;
107         }
108 
109         if (action->GetErrno() != ERR_NONE) {
110             continue;
111         }
112 
113         net_handler = action->GetIConnection();
114         if (NULL == net_handler)
115         {
116             action->SetErrno(ERR_FRAME_ERROR);
117             MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
118             return -2;
119         }
120 
121         sock = net_handler->CreateSocket();
122         if (sock < 0)
123         {
124             action->SetErrno(ERR_SOCKET_FAIL);
125             MTLOG_ERROR("Get sock data failed, ret %d, errno %d!!", sock, errno);
126             return -3;
127         }
128         has_ok = 1;
129 
130         if (action->GetProtoType() == MT_UDP)
131         {
132             action->SetMsgFlag(MULTI_FLAG_OPEN);
133         }
134         else
135         {
136             action->SetMsgFlag(MULTI_FLAG_INIT);
137         }
138     }
139 
140     if (has_ok)
141     {
142         return 0;
143     }
144     else
145     {
146         return -4;
147     }
148 }
149 
mt_multi_open(IMtActList & req_list,int timeout)150 int NS_MICRO_THREAD::mt_multi_open(IMtActList& req_list, int timeout)
151 {
152     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
153     utime64_t end_ms = start_ms + timeout;
154     utime64_t curr_ms = 0;
155 
156     int ret = 0, has_open = 0;
157     IMtAction* action = NULL;
158     IMtConnection* net_handler = NULL;
159     IMtActList::iterator it;
160 
161     while (1)
162     {
163         IMtActList wait_list;
164         for (it = req_list.begin(); it != req_list.end(); ++it)
165         {
166             action = *it;
167             if (action->GetErrno() != ERR_NONE) {
168                 continue;
169             }
170 
171             if (action->GetMsgFlag() == MULTI_FLAG_OPEN) {
172                 has_open = 1;
173                 continue;
174             }
175 
176             net_handler = action->GetIConnection();
177             if (NULL == net_handler)
178             {
179                 action->SetErrno(ERR_FRAME_ERROR);
180                 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
181                 return -1;
182             }
183 
184             ret = net_handler->OpenCnnect();
185             if (ret < 0)
186             {
187                 wait_list.push_back(action);
188             }
189             else
190             {
191                 action->SetMsgFlag(MULTI_FLAG_OPEN);
192             }
193         }
194 
195         curr_ms = MtFrame::Instance()->GetLastClock();
196         if (curr_ms > end_ms)
197         {
198             MTLOG_DEBUG("Open connect timeout, errno %d", errno);
199             for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
200             {
201                 (*it)->SetErrno(ERR_CONNECT_FAIL);
202             }
203 
204             if (!has_open)
205             {
206                 return 0;
207             }
208             else
209             {
210                 return -2;
211             }
212         }
213 
214         if (!wait_list.empty())
215         {
216             mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms);
217         }
218         else
219         {
220             return 0;
221         }
222     }
223 
224 }
225 
mt_multi_sendto(IMtActList & req_list,int timeout)226 int NS_MICRO_THREAD::mt_multi_sendto(IMtActList& req_list, int timeout)
227 {
228     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
229     utime64_t end_ms = start_ms + timeout;
230     utime64_t curr_ms = 0;
231 
232     int ret = 0, has_send = 0;
233     IMtAction* action = NULL;
234     IMtConnection* net_handler = NULL;
235 
236     while (1)
237     {
238         IMtActList wait_list;
239         for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
240         {
241             action = *it;
242             if (action->GetErrno() != ERR_NONE) {
243                 continue;
244             }
245 
246             if (action->GetMsgFlag() == MULTI_FLAG_SEND) {
247                 has_send = 1;
248                 continue;
249             }
250 
251             net_handler = action->GetIConnection();
252             if (NULL == net_handler)
253             {
254                 action->SetErrno(ERR_FRAME_ERROR);
255                 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
256                 return -2;
257             }
258 
259             ret = net_handler->SendData();
260             if (ret == -1)
261             {
262                 action->SetErrno(ERR_SEND_FAIL);
263                 MTLOG_ERROR("MultiItem msg send error, %d", errno);
264                 continue;
265             }
266             else if (ret == 0)
267             {
268                 wait_list.push_back(action);
269                 continue;
270             }
271             else
272             {
273                 action->SetMsgFlag(MULTI_FLAG_SEND);
274             }
275         }
276 
277         curr_ms = MtFrame::Instance()->GetLastClock();
278         if (curr_ms > end_ms)
279         {
280             MTLOG_DEBUG("send data timeout");
281             for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
282             {
283                 (*it)->SetErrno(ERR_SEND_FAIL);
284             }
285 
286             if (has_send)
287             {
288                 return 0;
289             }
290             else
291             {
292                 return -5;
293             }
294         }
295 
296         if (!wait_list.empty())
297         {
298             mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms);
299         }
300         else
301         {
302             return 0;
303         }
304     }
305 
306     return 0;
307 }
308 
mt_multi_recvfrom(IMtActList & req_list,int timeout)309 int NS_MICRO_THREAD::mt_multi_recvfrom(IMtActList& req_list, int timeout)
310 {
311     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
312     utime64_t end_ms = start_ms + timeout;
313     utime64_t curr_ms = 0;
314 
315     int ret = 0;
316     IMtAction* action = NULL;
317     IMtConnection* net_handler = NULL;
318 
319     while (1)
320     {
321         IMtActList wait_list;
322         for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
323         {
324             action = *it;
325             if (action->GetErrno() != ERR_NONE) {
326                 continue;
327             }
328 
329             if (MULTI_FLAG_FIN == action->GetMsgFlag())
330             {
331                 continue;
332             }
333 
334             net_handler = action->GetIConnection();
335             if (NULL == net_handler)
336             {
337                 action->SetErrno(ERR_FRAME_ERROR);
338                 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
339                 return -2;
340             }
341 
342             ret = net_handler->RecvData();
343             if (ret < 0)
344             {
345                 action->SetErrno(ERR_RECV_FAIL);
346                 MTLOG_ERROR("MultiItem msg recv failed: %p", net_handler);
347                 continue;
348             }
349             else if (ret == 0)
350             {
351                 wait_list.push_back(action);
352                 continue;
353             }
354             else
355             {
356                 action->SetMsgFlag(MULTI_FLAG_FIN);
357                 action->SetCost(MtFrame::Instance()->GetLastClock() - start_ms);
358             }
359         }
360 
361         curr_ms = MtFrame::Instance()->GetLastClock();
362         if (curr_ms > end_ms)
363         {
364             MTLOG_DEBUG("Recv data timeout, curr %llu, start: %llu", curr_ms, start_ms);
365             for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
366             {
367                 (*it)->SetErrno(ERR_RECV_TIMEOUT);
368             }
369             return -5;
370         }
371 
372         if (!wait_list.empty())
373         {
374             mt_multi_netfd_poll(wait_list, KQ_EVENT_READ, end_ms - curr_ms);
375         }
376         else
377         {
378             return 0;
379         }
380     }
381 }
382 
mt_multi_sendrcv_ex(IMtActList & req_list,int timeout)383 int NS_MICRO_THREAD::mt_multi_sendrcv_ex(IMtActList& req_list, int timeout)
384 {
385     utime64_t start_ms = MtFrame::Instance()->GetLastClock();
386     utime64_t curr_ms = 0;
387 
388     int rc = mt_multi_newsock(req_list);
389     if (rc < 0)
390     {
391         MT_ATTR_API(320842, 1);
392         MTLOG_ERROR("mt_multi_sendrcv new sock failed, ret: %d", rc);
393         return -1;
394     }
395 
396     rc = mt_multi_open(req_list, timeout);
397     if (rc < 0)
398     {
399         MT_ATTR_API(320843, 1);
400         MTLOG_ERROR("mt_multi_sendrcv open failed, ret: %d", rc);
401         return -2;
402     }
403 
404     curr_ms = MtFrame::Instance()->GetLastClock();
405     rc = mt_multi_sendto(req_list, timeout - (curr_ms - start_ms));
406     if (rc < 0)
407     {
408         MT_ATTR_API(320844, 1);
409         MTLOG_ERROR("mt_multi_sendrcv send failed, ret: %d", rc);
410         return -3;
411     }
412 
413     curr_ms = MtFrame::Instance()->GetLastClock();
414     rc = mt_multi_recvfrom(req_list, timeout - (curr_ms - start_ms));
415     if (rc < 0)
416     {
417         MT_ATTR_API(320845, 1);
418         MTLOG_ERROR("mt_multi_sendrcv recv failed, ret: %d", rc);
419         return -4;
420     }
421 
422     return 0;
423 }
424 
mt_msg_sendrcv(IMtActList & req_list,int timeout)425 int NS_MICRO_THREAD::mt_msg_sendrcv(IMtActList& req_list, int timeout)
426 {
427     int iRet = 0;
428 
429     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
430     {
431         IMtAction* pAction = *it;
432         if (!pAction || pAction->InitConnEnv() < 0)
433         {
434             MTLOG_ERROR("invalid action(%p) or int failed, error", pAction);
435             return -1;
436         }
437 
438         iRet = pAction->DoEncode();
439         if (iRet < 0)
440         {
441             pAction->SetErrno(ERR_ENCODE_ERROR);
442             MTLOG_ERROR("pack action pkg failed, act %p, ret %d", pAction, iRet);
443             continue;
444         }
445 
446     }
447 
448     mt_multi_sendrcv_ex(req_list, timeout);
449 
450     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
451     {
452         IMtAction* pAction = *it;
453 
454         if (pAction->GetMsgFlag() != MULTI_FLAG_FIN)
455         {
456             pAction->DoError();
457             MTLOG_DEBUG("send recv failed: %d", pAction->GetErrno());
458             continue;
459         }
460 
461         iRet = pAction->DoProcess();
462         if (iRet < 0)
463         {
464             MTLOG_DEBUG("action process failed: %d", iRet);
465             continue;
466         }
467     }
468 
469     for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
470     {
471         IMtAction* pAction = *it;
472         pAction->Reset();
473     }
474 
475     return 0;
476 }
477