1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @file mt_concurrent.c 22 * @info ��·������ģ����չ 23 * @time 20130924 24 **/ 25 26 #include "micro_thread.h" 27 #include "mt_msg.h" 28 #include "mt_notify.h" 29 #include "mt_connection.h" 30 #include "mt_concurrent.h" 31 32 using namespace std; 33 using namespace NS_MICRO_THREAD; 34 35 36 /** 37 * @brief ��·IO�Ĵ����Ż�, �첽���ȵȴ����� 38 * @param req_list - �����б� 39 * @param how - EPOLLIN EPOLLOUT 40 * @param timeout - ��ʱʱ�� ���뵥λ 41 * @return 0 �ɹ�, <0ʧ�� -3 ����ʱ 42 */ 43 int NS_MICRO_THREAD::mt_multi_netfd_poll(IMtActList& req_list, int how, int timeout) 44 { 45 KqObjList fdlist; 46 TAILQ_INIT(&fdlist); 47 48 KqueuerObj* obj = NULL; 49 IMtAction* action = NULL; 50 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 51 { 52 action = *it; 53 if (action) { 54 obj = action->GetNtfyObj(); 55 } 56 if (!action || !obj) 57 { 58 action->SetErrno(ERR_FRAME_ERROR); 59 MTLOG_ERROR("input action %p, or ntify null, error", action); 60 return -1; 61 } 62 63 obj->SetRcvEvents(0); 64 if (how & KQ_EVENT_READ) 65 { 66 obj->EnableInput(); 67 } 68 else 69 { 70 obj->DisableInput(); 71 } 72 73 if (how & KQ_EVENT_WRITE) 74 { 75 obj->EnableOutput(); 76 } 77 else 78 { 79 obj->DisableOutput(); 80 } 81 82 TAILQ_INSERT_TAIL(&fdlist, obj, _entry); 83 84 } 85 86 MtFrame* mtframe = MtFrame::Instance(); 87 if (!mtframe || !mtframe->KqueueSchedule(&fdlist, NULL, (int)timeout)) 88 { 89 if (errno != ETIME) 90 { 91 action->SetErrno(ERR_KQUEUE_FAIL); 92 MTLOG_ERROR("Mtframe %p, epoll schedule failed, errno %d", mtframe, errno); 93 return -2; 94 } 95 96 return -3; 97 } 98 99 return 0; 100 } 101 102 /** 103 * @brief Ϊÿ��ITEM���������ĵ�socket 104 * @param req_list - �����б� 105 * @return 0 �ɹ�, <0ʧ�� 106 */ 107 int NS_MICRO_THREAD::mt_multi_newsock(IMtActList& req_list) 108 { 109 int sock = -1, has_ok = 0; 110 IMtAction* action = NULL; 111 IMtConnection* net_handler = NULL; 112 113 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 114 { 115 action = *it; 116 if (NULL == action) 117 { 118 action->SetErrno(ERR_PARAM_ERROR); 119 MTLOG_ERROR("Invalid param, conn %p null!!", action); 120 return -1; 121 } 122 123 if (action->GetErrno() != ERR_NONE) { 124 continue; 125 } 126 127 net_handler = action->GetIConnection(); 128 if (NULL == net_handler) 129 { 130 action->SetErrno(ERR_FRAME_ERROR); 131 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler); 132 return -2; 133 } 134 135 sock = net_handler->CreateSocket(); 136 if (sock < 0) 137 { 138 action->SetErrno(ERR_SOCKET_FAIL); 139 MTLOG_ERROR("Get sock data failed, ret %d, errno %d!!", sock, errno); 140 return -3; 141 } 142 has_ok = 1; 143 144 if (action->GetProtoType() == MT_UDP) 145 { 146 action->SetMsgFlag(MULTI_FLAG_OPEN); 147 } 148 else 149 { 150 action->SetMsgFlag(MULTI_FLAG_INIT); 151 } 152 } 153 154 if (has_ok) 155 { 156 return 0; 157 } 158 else 159 { 160 return -4; 161 } 162 } 163 164 165 /** 166 * @brief ��·IO�Ĵ���, ������ 167 * @param req_list - �����б� 168 * @param timeout - ��ʱʱ�� ���뵥λ 169 * @return 0 �ɹ�, <0ʧ�� 170 */ 171 int NS_MICRO_THREAD::mt_multi_open(IMtActList& req_list, int timeout) 172 { 173 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 174 utime64_t end_ms = start_ms + timeout; 175 utime64_t curr_ms = 0; 176 177 int ret = 0, has_open = 0; 178 IMtAction* action = NULL; 179 IMtConnection* net_handler = NULL; 180 IMtActList::iterator it; 181 182 while (1) 183 { 184 IMtActList wait_list; 185 for (it = req_list.begin(); it != req_list.end(); ++it) 186 { 187 action = *it; 188 if (action->GetErrno() != ERR_NONE) { 189 continue; 190 } 191 192 if (action->GetMsgFlag() == MULTI_FLAG_OPEN) { 193 has_open = 1; 194 continue; 195 } 196 197 net_handler = action->GetIConnection(); 198 if (NULL == net_handler) 199 { 200 action->SetErrno(ERR_FRAME_ERROR); 201 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler); 202 return -1; 203 } 204 205 ret = net_handler->OpenCnnect(); 206 if (ret < 0) 207 { 208 wait_list.push_back(action); 209 } 210 else 211 { 212 action->SetMsgFlag(MULTI_FLAG_OPEN); 213 } 214 } 215 216 curr_ms = MtFrame::Instance()->GetLastClock(); 217 if (curr_ms > end_ms) 218 { 219 MTLOG_DEBUG("Open connect timeout, errno %d", errno); 220 for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it) 221 { 222 (*it)->SetErrno(ERR_CONNECT_FAIL); 223 } 224 225 if (!has_open) 226 { 227 return 0; 228 } 229 else 230 { 231 return -2; 232 } 233 } 234 235 if (!wait_list.empty()) 236 { 237 mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms); 238 } 239 else 240 { 241 return 0; 242 } 243 } 244 245 } 246 247 248 /** 249 * @brief ��·IO�Ĵ���, �������� 250 * @param req_list - �����б� 251 * @param timeout - ��ʱʱ�� ���뵥λ 252 * @return 0 �ɹ�, <0ʧ�� 253 */ 254 int NS_MICRO_THREAD::mt_multi_sendto(IMtActList& req_list, int timeout) 255 { 256 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 257 utime64_t end_ms = start_ms + timeout; 258 utime64_t curr_ms = 0; 259 260 int ret = 0, has_send = 0; 261 IMtAction* action = NULL; 262 IMtConnection* net_handler = NULL; 263 264 while (1) 265 { 266 IMtActList wait_list; 267 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 268 { 269 action = *it; 270 if (action->GetErrno() != ERR_NONE) { 271 continue; 272 } 273 274 if (action->GetMsgFlag() == MULTI_FLAG_SEND) { 275 has_send = 1; 276 continue; 277 } 278 279 net_handler = action->GetIConnection(); 280 if (NULL == net_handler) 281 { 282 action->SetErrno(ERR_FRAME_ERROR); 283 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler); 284 return -2; 285 } 286 287 // 0 -��Ҫ��������; -1 ֹͣ����; > 0 ����OK 288 ret = net_handler->SendData(); 289 if (ret == -1) 290 { 291 action->SetErrno(ERR_SEND_FAIL); 292 MTLOG_ERROR("MultiItem msg send error, %d", errno); 293 continue; 294 } 295 else if (ret == 0) 296 { 297 wait_list.push_back(action); 298 continue; 299 } 300 else 301 { 302 action->SetMsgFlag(MULTI_FLAG_SEND); 303 } 304 } 305 306 curr_ms = MtFrame::Instance()->GetLastClock(); 307 if (curr_ms > end_ms) 308 { 309 MTLOG_DEBUG("send data timeout"); 310 for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it) 311 { 312 (*it)->SetErrno(ERR_SEND_FAIL); 313 } 314 315 if (has_send) 316 { 317 return 0; 318 } 319 else 320 { 321 return -5; 322 } 323 } 324 325 if (!wait_list.empty()) 326 { 327 mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms); 328 } 329 else 330 { 331 return 0; 332 } 333 } 334 335 return 0; 336 } 337 338 339 340 /** 341 * @brief ��·IO�������մ��� 342 */ 343 int NS_MICRO_THREAD::mt_multi_recvfrom(IMtActList& req_list, int timeout) 344 { 345 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 346 utime64_t end_ms = start_ms + timeout; 347 utime64_t curr_ms = 0; 348 349 int ret = 0; 350 IMtAction* action = NULL; 351 IMtConnection* net_handler = NULL; 352 353 while (1) 354 { 355 IMtActList wait_list; 356 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 357 { 358 action = *it; 359 if (action->GetErrno() != ERR_NONE) { 360 continue; 361 } 362 363 if (MULTI_FLAG_FIN == action->GetMsgFlag()) ///< �Ѵ������ 364 { 365 continue; 366 } 367 368 net_handler = action->GetIConnection(); 369 if (NULL == net_handler) 370 { 371 action->SetErrno(ERR_FRAME_ERROR); 372 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler); 373 return -2; 374 } 375 376 // <0 ʧ��, 0 ������, >0 �ɹ� 377 ret = net_handler->RecvData(); 378 if (ret < 0) 379 { 380 action->SetErrno(ERR_RECV_FAIL); 381 MTLOG_ERROR("MultiItem msg recv failed: %p", net_handler); 382 continue; 383 } 384 else if (ret == 0) 385 { 386 wait_list.push_back(action); 387 continue; 388 } 389 else 390 { 391 action->SetMsgFlag(MULTI_FLAG_FIN); 392 action->SetCost(MtFrame::Instance()->GetLastClock() - start_ms); 393 } 394 } 395 396 curr_ms = MtFrame::Instance()->GetLastClock(); 397 if (curr_ms > end_ms) 398 { 399 MTLOG_DEBUG("Recv data timeout, curr %llu, start: %llu", curr_ms, start_ms); 400 for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it) 401 { 402 (*it)->SetErrno(ERR_RECV_TIMEOUT); 403 } 404 return -5; 405 } 406 407 if (!wait_list.empty()) 408 { 409 mt_multi_netfd_poll(wait_list, KQ_EVENT_READ, end_ms - curr_ms); 410 } 411 else 412 { 413 return 0; 414 } 415 } 416 } 417 418 /** 419 * @brief ��·IO�������մ��� 420 */ 421 int NS_MICRO_THREAD::mt_multi_sendrcv_ex(IMtActList& req_list, int timeout) 422 { 423 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 424 utime64_t curr_ms = 0; 425 426 int rc = mt_multi_newsock(req_list); // TODO, ����ȡconnect��ʱʱ��� 427 if (rc < 0) 428 { 429 MT_ATTR_API(320842, 1); // socketʧ�� 430 MTLOG_ERROR("mt_multi_sendrcv new sock failed, ret: %d", rc); 431 return -1; 432 } 433 434 rc = mt_multi_open(req_list, timeout); 435 if (rc < 0) 436 { 437 MT_ATTR_API(320843, 1); // connectʧ�� 438 MTLOG_ERROR("mt_multi_sendrcv open failed, ret: %d", rc); 439 return -2; 440 } 441 442 curr_ms = MtFrame::Instance()->GetLastClock(); 443 rc = mt_multi_sendto(req_list, timeout - (curr_ms - start_ms)); 444 if (rc < 0) 445 { 446 MT_ATTR_API(320844, 1); // ����ʧ�� 447 MTLOG_ERROR("mt_multi_sendrcv send failed, ret: %d", rc); 448 return -3; 449 } 450 451 curr_ms = MtFrame::Instance()->GetLastClock(); 452 rc = mt_multi_recvfrom(req_list, timeout - (curr_ms - start_ms)); 453 if (rc < 0) 454 { 455 MT_ATTR_API(320845, 1); // ����δ��ȫ�ɹ� 456 MTLOG_ERROR("mt_multi_sendrcv recv failed, ret: %d", rc); 457 return -4; 458 } 459 460 return 0; 461 } 462 463 464 /** 465 * @brief ��·IO�������մ���ӿ�, ��װACTON�ӿ�ģ��, �ڲ�����msg 466 * @param req_list -action list ʵ�ַ�װ�����ӿ� 467 * @param timeout -��ʱʱ��, ��λms 468 * @return 0 �ɹ�, -1 ��ʼ������ʧ��, �����ɹ��ֳɹ� 469 */ 470 int NS_MICRO_THREAD::mt_msg_sendrcv(IMtActList& req_list, int timeout) 471 { 472 int iRet = 0; 473 474 // ��һ��, ��ʼ��action����, ��װ������ 475 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 476 { 477 IMtAction* pAction = *it; 478 if (!pAction || pAction->InitConnEnv() < 0) 479 { 480 MTLOG_ERROR("invalid action(%p) or int failed, error", pAction); 481 return -1; 482 } 483 484 iRet = pAction->DoEncode(); 485 if (iRet < 0) 486 { 487 pAction->SetErrno(ERR_ENCODE_ERROR); 488 MTLOG_ERROR("pack action pkg failed, act %p, ret %d", pAction, iRet); 489 continue; 490 } 491 492 } 493 494 // �ڶ���, ͬ���շ���Ϣ, ʧ��Ҳ��Ҫ֪ͨ���� 495 mt_multi_sendrcv_ex(req_list, timeout); 496 497 // ������, ͬ��֪ͨ������� 498 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 499 { 500 IMtAction* pAction = *it; 501 502 if (pAction->GetMsgFlag() != MULTI_FLAG_FIN) 503 { 504 pAction->DoError(); 505 MTLOG_DEBUG("send recv failed: %d", pAction->GetErrno()); 506 continue; 507 } 508 509 iRet = pAction->DoProcess(); 510 if (iRet < 0) 511 { 512 MTLOG_DEBUG("action process failed: %d", iRet); 513 continue; 514 } 515 } 516 517 // ���IJ�, �������ڲ���Դ, ���ݸ����÷� 518 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 519 { 520 IMtAction* pAction = *it; 521 pAction->Reset(); 522 } 523 524 return 0; 525 } 526 527 528 529