1
2 /**
3 * Tencent is pleased to support the open source community by making MSEC available.
4 *
5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved.
6 *
7 * Licensed under the GNU General Public License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License. You may
9 * obtain a copy of the License at
10 *
11 * https://opensource.org/licenses/GPL-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software distributed under the
14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
15 * either express or implied. See the License for the specific language governing permissions
16 * and limitations under the License.
17 */
18
19
20 /**
21 * @file mt_concurrent.c
22 * @time 20130924
23 **/
24
25 #include "micro_thread.h"
26 #include "mt_msg.h"
27 #include "mt_notify.h"
28 #include "mt_connection.h"
29 #include "mt_concurrent.h"
30
31 using namespace std;
32 using namespace NS_MICRO_THREAD;
33
mt_multi_netfd_poll(IMtActList & req_list,int how,int timeout)34 int NS_MICRO_THREAD::mt_multi_netfd_poll(IMtActList& req_list, int how, int timeout)
35 {
36 KqObjList fdlist;
37 TAILQ_INIT(&fdlist);
38
39 KqueuerObj* obj = NULL;
40 IMtAction* action = NULL;
41 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
42 {
43 action = *it;
44 if (action) {
45 obj = action->GetNtfyObj();
46 }
47 if (!action || !obj)
48 {
49 action->SetErrno(ERR_FRAME_ERROR);
50 MTLOG_ERROR("input action %p, or ntify null, error", action);
51 return -1;
52 }
53
54 obj->SetRcvEvents(0);
55 if (how & KQ_EVENT_READ)
56 {
57 obj->EnableInput();
58 }
59 else
60 {
61 obj->DisableInput();
62 }
63
64 if (how & KQ_EVENT_WRITE)
65 {
66 obj->EnableOutput();
67 }
68 else
69 {
70 obj->DisableOutput();
71 }
72
73 TAILQ_INSERT_TAIL(&fdlist, obj, _entry);
74
75 }
76
77 MtFrame* mtframe = MtFrame::Instance();
78 if (!mtframe || !mtframe->KqueueSchedule(&fdlist, NULL, (int)timeout))
79 {
80 if (errno != ETIME)
81 {
82 action->SetErrno(ERR_KQUEUE_FAIL);
83 MTLOG_ERROR("Mtframe %p, epoll schedule failed, errno %d", mtframe, errno);
84 return -2;
85 }
86
87 return -3;
88 }
89
90 return 0;
91 }
92
mt_multi_newsock(IMtActList & req_list)93 int NS_MICRO_THREAD::mt_multi_newsock(IMtActList& req_list)
94 {
95 int sock = -1, has_ok = 0;
96 IMtAction* action = NULL;
97 IMtConnection* net_handler = NULL;
98
99 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
100 {
101 action = *it;
102 if (NULL == action)
103 {
104 action->SetErrno(ERR_PARAM_ERROR);
105 MTLOG_ERROR("Invalid param, conn %p null!!", action);
106 return -1;
107 }
108
109 if (action->GetErrno() != ERR_NONE) {
110 continue;
111 }
112
113 net_handler = action->GetIConnection();
114 if (NULL == net_handler)
115 {
116 action->SetErrno(ERR_FRAME_ERROR);
117 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
118 return -2;
119 }
120
121 sock = net_handler->CreateSocket();
122 if (sock < 0)
123 {
124 action->SetErrno(ERR_SOCKET_FAIL);
125 MTLOG_ERROR("Get sock data failed, ret %d, errno %d!!", sock, errno);
126 return -3;
127 }
128 has_ok = 1;
129
130 if (action->GetProtoType() == MT_UDP)
131 {
132 action->SetMsgFlag(MULTI_FLAG_OPEN);
133 }
134 else
135 {
136 action->SetMsgFlag(MULTI_FLAG_INIT);
137 }
138 }
139
140 if (has_ok)
141 {
142 return 0;
143 }
144 else
145 {
146 return -4;
147 }
148 }
149
mt_multi_open(IMtActList & req_list,int timeout)150 int NS_MICRO_THREAD::mt_multi_open(IMtActList& req_list, int timeout)
151 {
152 utime64_t start_ms = MtFrame::Instance()->GetLastClock();
153 utime64_t end_ms = start_ms + timeout;
154 utime64_t curr_ms = 0;
155
156 int ret = 0, has_open = 0;
157 IMtAction* action = NULL;
158 IMtConnection* net_handler = NULL;
159 IMtActList::iterator it;
160
161 while (1)
162 {
163 IMtActList wait_list;
164 for (it = req_list.begin(); it != req_list.end(); ++it)
165 {
166 action = *it;
167 if (action->GetErrno() != ERR_NONE) {
168 continue;
169 }
170
171 if (action->GetMsgFlag() == MULTI_FLAG_OPEN) {
172 has_open = 1;
173 continue;
174 }
175
176 net_handler = action->GetIConnection();
177 if (NULL == net_handler)
178 {
179 action->SetErrno(ERR_FRAME_ERROR);
180 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
181 return -1;
182 }
183
184 ret = net_handler->OpenCnnect();
185 if (ret < 0)
186 {
187 wait_list.push_back(action);
188 }
189 else
190 {
191 action->SetMsgFlag(MULTI_FLAG_OPEN);
192 }
193 }
194
195 curr_ms = MtFrame::Instance()->GetLastClock();
196 if (curr_ms > end_ms)
197 {
198 MTLOG_DEBUG("Open connect timeout, errno %d", errno);
199 for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
200 {
201 (*it)->SetErrno(ERR_CONNECT_FAIL);
202 }
203
204 if (!has_open)
205 {
206 return 0;
207 }
208 else
209 {
210 return -2;
211 }
212 }
213
214 if (!wait_list.empty())
215 {
216 mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms);
217 }
218 else
219 {
220 return 0;
221 }
222 }
223
224 }
225
mt_multi_sendto(IMtActList & req_list,int timeout)226 int NS_MICRO_THREAD::mt_multi_sendto(IMtActList& req_list, int timeout)
227 {
228 utime64_t start_ms = MtFrame::Instance()->GetLastClock();
229 utime64_t end_ms = start_ms + timeout;
230 utime64_t curr_ms = 0;
231
232 int ret = 0, has_send = 0;
233 IMtAction* action = NULL;
234 IMtConnection* net_handler = NULL;
235
236 while (1)
237 {
238 IMtActList wait_list;
239 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
240 {
241 action = *it;
242 if (action->GetErrno() != ERR_NONE) {
243 continue;
244 }
245
246 if (action->GetMsgFlag() == MULTI_FLAG_SEND) {
247 has_send = 1;
248 continue;
249 }
250
251 net_handler = action->GetIConnection();
252 if (NULL == net_handler)
253 {
254 action->SetErrno(ERR_FRAME_ERROR);
255 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
256 return -2;
257 }
258
259 ret = net_handler->SendData();
260 if (ret == -1)
261 {
262 action->SetErrno(ERR_SEND_FAIL);
263 MTLOG_ERROR("MultiItem msg send error, %d", errno);
264 continue;
265 }
266 else if (ret == 0)
267 {
268 wait_list.push_back(action);
269 continue;
270 }
271 else
272 {
273 action->SetMsgFlag(MULTI_FLAG_SEND);
274 }
275 }
276
277 curr_ms = MtFrame::Instance()->GetLastClock();
278 if (curr_ms > end_ms)
279 {
280 MTLOG_DEBUG("send data timeout");
281 for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
282 {
283 (*it)->SetErrno(ERR_SEND_FAIL);
284 }
285
286 if (has_send)
287 {
288 return 0;
289 }
290 else
291 {
292 return -5;
293 }
294 }
295
296 if (!wait_list.empty())
297 {
298 mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms);
299 }
300 else
301 {
302 return 0;
303 }
304 }
305
306 return 0;
307 }
308
mt_multi_recvfrom(IMtActList & req_list,int timeout)309 int NS_MICRO_THREAD::mt_multi_recvfrom(IMtActList& req_list, int timeout)
310 {
311 utime64_t start_ms = MtFrame::Instance()->GetLastClock();
312 utime64_t end_ms = start_ms + timeout;
313 utime64_t curr_ms = 0;
314
315 int ret = 0;
316 IMtAction* action = NULL;
317 IMtConnection* net_handler = NULL;
318
319 while (1)
320 {
321 IMtActList wait_list;
322 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
323 {
324 action = *it;
325 if (action->GetErrno() != ERR_NONE) {
326 continue;
327 }
328
329 if (MULTI_FLAG_FIN == action->GetMsgFlag())
330 {
331 continue;
332 }
333
334 net_handler = action->GetIConnection();
335 if (NULL == net_handler)
336 {
337 action->SetErrno(ERR_FRAME_ERROR);
338 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler);
339 return -2;
340 }
341
342 ret = net_handler->RecvData();
343 if (ret < 0)
344 {
345 action->SetErrno(ERR_RECV_FAIL);
346 MTLOG_ERROR("MultiItem msg recv failed: %p", net_handler);
347 continue;
348 }
349 else if (ret == 0)
350 {
351 wait_list.push_back(action);
352 continue;
353 }
354 else
355 {
356 action->SetMsgFlag(MULTI_FLAG_FIN);
357 action->SetCost(MtFrame::Instance()->GetLastClock() - start_ms);
358 }
359 }
360
361 curr_ms = MtFrame::Instance()->GetLastClock();
362 if (curr_ms > end_ms)
363 {
364 MTLOG_DEBUG("Recv data timeout, curr %llu, start: %llu", curr_ms, start_ms);
365 for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it)
366 {
367 (*it)->SetErrno(ERR_RECV_TIMEOUT);
368 }
369 return -5;
370 }
371
372 if (!wait_list.empty())
373 {
374 mt_multi_netfd_poll(wait_list, KQ_EVENT_READ, end_ms - curr_ms);
375 }
376 else
377 {
378 return 0;
379 }
380 }
381 }
382
mt_multi_sendrcv_ex(IMtActList & req_list,int timeout)383 int NS_MICRO_THREAD::mt_multi_sendrcv_ex(IMtActList& req_list, int timeout)
384 {
385 utime64_t start_ms = MtFrame::Instance()->GetLastClock();
386 utime64_t curr_ms = 0;
387
388 int rc = mt_multi_newsock(req_list);
389 if (rc < 0)
390 {
391 MT_ATTR_API(320842, 1);
392 MTLOG_ERROR("mt_multi_sendrcv new sock failed, ret: %d", rc);
393 return -1;
394 }
395
396 rc = mt_multi_open(req_list, timeout);
397 if (rc < 0)
398 {
399 MT_ATTR_API(320843, 1);
400 MTLOG_ERROR("mt_multi_sendrcv open failed, ret: %d", rc);
401 return -2;
402 }
403
404 curr_ms = MtFrame::Instance()->GetLastClock();
405 rc = mt_multi_sendto(req_list, timeout - (curr_ms - start_ms));
406 if (rc < 0)
407 {
408 MT_ATTR_API(320844, 1);
409 MTLOG_ERROR("mt_multi_sendrcv send failed, ret: %d", rc);
410 return -3;
411 }
412
413 curr_ms = MtFrame::Instance()->GetLastClock();
414 rc = mt_multi_recvfrom(req_list, timeout - (curr_ms - start_ms));
415 if (rc < 0)
416 {
417 MT_ATTR_API(320845, 1);
418 MTLOG_ERROR("mt_multi_sendrcv recv failed, ret: %d", rc);
419 return -4;
420 }
421
422 return 0;
423 }
424
mt_msg_sendrcv(IMtActList & req_list,int timeout)425 int NS_MICRO_THREAD::mt_msg_sendrcv(IMtActList& req_list, int timeout)
426 {
427 int iRet = 0;
428
429 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
430 {
431 IMtAction* pAction = *it;
432 if (!pAction || pAction->InitConnEnv() < 0)
433 {
434 MTLOG_ERROR("invalid action(%p) or int failed, error", pAction);
435 return -1;
436 }
437
438 iRet = pAction->DoEncode();
439 if (iRet < 0)
440 {
441 pAction->SetErrno(ERR_ENCODE_ERROR);
442 MTLOG_ERROR("pack action pkg failed, act %p, ret %d", pAction, iRet);
443 continue;
444 }
445
446 }
447
448 mt_multi_sendrcv_ex(req_list, timeout);
449
450 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
451 {
452 IMtAction* pAction = *it;
453
454 if (pAction->GetMsgFlag() != MULTI_FLAG_FIN)
455 {
456 pAction->DoError();
457 MTLOG_DEBUG("send recv failed: %d", pAction->GetErrno());
458 continue;
459 }
460
461 iRet = pAction->DoProcess();
462 if (iRet < 0)
463 {
464 MTLOG_DEBUG("action process failed: %d", iRet);
465 continue;
466 }
467 }
468
469 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it)
470 {
471 IMtAction* pAction = *it;
472 pAction->Reset();
473 }
474
475 return 0;
476 }
477