1 2 /** 3 * Tencent is pleased to support the open source community by making MSEC available. 4 * 5 * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6 * 7 * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. You may 9 * obtain a copy of the License at 10 * 11 * https://opensource.org/licenses/GPL-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software distributed under the 14 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15 * either express or implied. See the License for the specific language governing permissions 16 * and limitations under the License. 17 */ 18 19 20 /** 21 * @file mt_concurrent.c 22 * @time 20130924 23 **/ 24 25 #include "micro_thread.h" 26 #include "mt_msg.h" 27 #include "mt_notify.h" 28 #include "mt_connection.h" 29 #include "mt_concurrent.h" 30 31 using namespace std; 32 using namespace NS_MICRO_THREAD; 33 34 int NS_MICRO_THREAD::mt_multi_netfd_poll(IMtActList& req_list, int how, int timeout) 35 { 36 KqObjList fdlist; 37 TAILQ_INIT(&fdlist); 38 39 KqueuerObj* obj = NULL; 40 IMtAction* action = NULL; 41 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 42 { 43 action = *it; 44 if (action) { 45 obj = action->GetNtfyObj(); 46 } 47 if (!action || !obj) 48 { 49 action->SetErrno(ERR_FRAME_ERROR); 50 MTLOG_ERROR("input action %p, or ntify null, error", action); 51 return -1; 52 } 53 54 obj->SetRcvEvents(0); 55 if (how & KQ_EVENT_READ) 56 { 57 obj->EnableInput(); 58 } 59 else 60 { 61 obj->DisableInput(); 62 } 63 64 if (how & KQ_EVENT_WRITE) 65 { 66 obj->EnableOutput(); 67 } 68 else 69 { 70 obj->DisableOutput(); 71 } 72 73 TAILQ_INSERT_TAIL(&fdlist, obj, _entry); 74 75 } 76 77 MtFrame* mtframe = MtFrame::Instance(); 78 if (!mtframe || !mtframe->KqueueSchedule(&fdlist, NULL, (int)timeout)) 79 { 80 if (errno != ETIME) 81 { 82 action->SetErrno(ERR_KQUEUE_FAIL); 83 MTLOG_ERROR("Mtframe %p, epoll schedule failed, errno %d", mtframe, errno); 84 return -2; 85 } 86 87 return -3; 88 } 89 90 return 0; 91 } 92 93 int NS_MICRO_THREAD::mt_multi_newsock(IMtActList& req_list) 94 { 95 int sock = -1, has_ok = 0; 96 IMtAction* action = NULL; 97 IMtConnection* net_handler = NULL; 98 99 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 100 { 101 action = *it; 102 if (NULL == action) 103 { 104 action->SetErrno(ERR_PARAM_ERROR); 105 MTLOG_ERROR("Invalid param, conn %p null!!", action); 106 return -1; 107 } 108 109 if (action->GetErrno() != ERR_NONE) { 110 continue; 111 } 112 113 net_handler = action->GetIConnection(); 114 if (NULL == net_handler) 115 { 116 action->SetErrno(ERR_FRAME_ERROR); 117 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler); 118 return -2; 119 } 120 121 sock = net_handler->CreateSocket(); 122 if (sock < 0) 123 { 124 action->SetErrno(ERR_SOCKET_FAIL); 125 MTLOG_ERROR("Get sock data failed, ret %d, errno %d!!", sock, errno); 126 return -3; 127 } 128 has_ok = 1; 129 130 if (action->GetProtoType() == MT_UDP) 131 { 132 action->SetMsgFlag(MULTI_FLAG_OPEN); 133 } 134 else 135 { 136 action->SetMsgFlag(MULTI_FLAG_INIT); 137 } 138 } 139 140 if (has_ok) 141 { 142 return 0; 143 } 144 else 145 { 146 return -4; 147 } 148 } 149 150 int NS_MICRO_THREAD::mt_multi_open(IMtActList& req_list, int timeout) 151 { 152 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 153 utime64_t end_ms = start_ms + timeout; 154 utime64_t curr_ms = 0; 155 156 int ret = 0, has_open = 0; 157 IMtAction* action = NULL; 158 IMtConnection* net_handler = NULL; 159 IMtActList::iterator it; 160 161 while (1) 162 { 163 IMtActList wait_list; 164 for (it = req_list.begin(); it != req_list.end(); ++it) 165 { 166 action = *it; 167 if (action->GetErrno() != ERR_NONE) { 168 continue; 169 } 170 171 if (action->GetMsgFlag() == MULTI_FLAG_OPEN) { 172 has_open = 1; 173 continue; 174 } 175 176 net_handler = action->GetIConnection(); 177 if (NULL == net_handler) 178 { 179 action->SetErrno(ERR_FRAME_ERROR); 180 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler); 181 return -1; 182 } 183 184 ret = net_handler->OpenCnnect(); 185 if (ret < 0) 186 { 187 wait_list.push_back(action); 188 } 189 else 190 { 191 action->SetMsgFlag(MULTI_FLAG_OPEN); 192 } 193 } 194 195 curr_ms = MtFrame::Instance()->GetLastClock(); 196 if (curr_ms > end_ms) 197 { 198 MTLOG_DEBUG("Open connect timeout, errno %d", errno); 199 for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it) 200 { 201 (*it)->SetErrno(ERR_CONNECT_FAIL); 202 } 203 204 if (!has_open) 205 { 206 return 0; 207 } 208 else 209 { 210 return -2; 211 } 212 } 213 214 if (!wait_list.empty()) 215 { 216 mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms); 217 } 218 else 219 { 220 return 0; 221 } 222 } 223 224 } 225 226 int NS_MICRO_THREAD::mt_multi_sendto(IMtActList& req_list, int timeout) 227 { 228 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 229 utime64_t end_ms = start_ms + timeout; 230 utime64_t curr_ms = 0; 231 232 int ret = 0, has_send = 0; 233 IMtAction* action = NULL; 234 IMtConnection* net_handler = NULL; 235 236 while (1) 237 { 238 IMtActList wait_list; 239 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 240 { 241 action = *it; 242 if (action->GetErrno() != ERR_NONE) { 243 continue; 244 } 245 246 if (action->GetMsgFlag() == MULTI_FLAG_SEND) { 247 has_send = 1; 248 continue; 249 } 250 251 net_handler = action->GetIConnection(); 252 if (NULL == net_handler) 253 { 254 action->SetErrno(ERR_FRAME_ERROR); 255 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler); 256 return -2; 257 } 258 259 ret = net_handler->SendData(); 260 if (ret == -1) 261 { 262 action->SetErrno(ERR_SEND_FAIL); 263 MTLOG_ERROR("MultiItem msg send error, %d", errno); 264 continue; 265 } 266 else if (ret == 0) 267 { 268 wait_list.push_back(action); 269 continue; 270 } 271 else 272 { 273 action->SetMsgFlag(MULTI_FLAG_SEND); 274 } 275 } 276 277 curr_ms = MtFrame::Instance()->GetLastClock(); 278 if (curr_ms > end_ms) 279 { 280 MTLOG_DEBUG("send data timeout"); 281 for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it) 282 { 283 (*it)->SetErrno(ERR_SEND_FAIL); 284 } 285 286 if (has_send) 287 { 288 return 0; 289 } 290 else 291 { 292 return -5; 293 } 294 } 295 296 if (!wait_list.empty()) 297 { 298 mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms); 299 } 300 else 301 { 302 return 0; 303 } 304 } 305 306 return 0; 307 } 308 309 int NS_MICRO_THREAD::mt_multi_recvfrom(IMtActList& req_list, int timeout) 310 { 311 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 312 utime64_t end_ms = start_ms + timeout; 313 utime64_t curr_ms = 0; 314 315 int ret = 0; 316 IMtAction* action = NULL; 317 IMtConnection* net_handler = NULL; 318 319 while (1) 320 { 321 IMtActList wait_list; 322 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 323 { 324 action = *it; 325 if (action->GetErrno() != ERR_NONE) { 326 continue; 327 } 328 329 if (MULTI_FLAG_FIN == action->GetMsgFlag()) 330 { 331 continue; 332 } 333 334 net_handler = action->GetIConnection(); 335 if (NULL == net_handler) 336 { 337 action->SetErrno(ERR_FRAME_ERROR); 338 MTLOG_ERROR("Invalid param, conn %p null!!", net_handler); 339 return -2; 340 } 341 342 ret = net_handler->RecvData(); 343 if (ret < 0) 344 { 345 action->SetErrno(ERR_RECV_FAIL); 346 MTLOG_ERROR("MultiItem msg recv failed: %p", net_handler); 347 continue; 348 } 349 else if (ret == 0) 350 { 351 wait_list.push_back(action); 352 continue; 353 } 354 else 355 { 356 action->SetMsgFlag(MULTI_FLAG_FIN); 357 action->SetCost(MtFrame::Instance()->GetLastClock() - start_ms); 358 } 359 } 360 361 curr_ms = MtFrame::Instance()->GetLastClock(); 362 if (curr_ms > end_ms) 363 { 364 MTLOG_DEBUG("Recv data timeout, curr %llu, start: %llu", curr_ms, start_ms); 365 for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it) 366 { 367 (*it)->SetErrno(ERR_RECV_TIMEOUT); 368 } 369 return -5; 370 } 371 372 if (!wait_list.empty()) 373 { 374 mt_multi_netfd_poll(wait_list, KQ_EVENT_READ, end_ms - curr_ms); 375 } 376 else 377 { 378 return 0; 379 } 380 } 381 } 382 383 int NS_MICRO_THREAD::mt_multi_sendrcv_ex(IMtActList& req_list, int timeout) 384 { 385 utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 386 utime64_t curr_ms = 0; 387 388 int rc = mt_multi_newsock(req_list); 389 if (rc < 0) 390 { 391 MT_ATTR_API(320842, 1); 392 MTLOG_ERROR("mt_multi_sendrcv new sock failed, ret: %d", rc); 393 return -1; 394 } 395 396 rc = mt_multi_open(req_list, timeout); 397 if (rc < 0) 398 { 399 MT_ATTR_API(320843, 1); 400 MTLOG_ERROR("mt_multi_sendrcv open failed, ret: %d", rc); 401 return -2; 402 } 403 404 curr_ms = MtFrame::Instance()->GetLastClock(); 405 rc = mt_multi_sendto(req_list, timeout - (curr_ms - start_ms)); 406 if (rc < 0) 407 { 408 MT_ATTR_API(320844, 1); 409 MTLOG_ERROR("mt_multi_sendrcv send failed, ret: %d", rc); 410 return -3; 411 } 412 413 curr_ms = MtFrame::Instance()->GetLastClock(); 414 rc = mt_multi_recvfrom(req_list, timeout - (curr_ms - start_ms)); 415 if (rc < 0) 416 { 417 MT_ATTR_API(320845, 1); 418 MTLOG_ERROR("mt_multi_sendrcv recv failed, ret: %d", rc); 419 return -4; 420 } 421 422 return 0; 423 } 424 425 int NS_MICRO_THREAD::mt_msg_sendrcv(IMtActList& req_list, int timeout) 426 { 427 int iRet = 0; 428 429 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 430 { 431 IMtAction* pAction = *it; 432 if (!pAction || pAction->InitConnEnv() < 0) 433 { 434 MTLOG_ERROR("invalid action(%p) or int failed, error", pAction); 435 return -1; 436 } 437 438 iRet = pAction->DoEncode(); 439 if (iRet < 0) 440 { 441 pAction->SetErrno(ERR_ENCODE_ERROR); 442 MTLOG_ERROR("pack action pkg failed, act %p, ret %d", pAction, iRet); 443 continue; 444 } 445 446 } 447 448 mt_multi_sendrcv_ex(req_list, timeout); 449 450 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 451 { 452 IMtAction* pAction = *it; 453 454 if (pAction->GetMsgFlag() != MULTI_FLAG_FIN) 455 { 456 pAction->DoError(); 457 MTLOG_DEBUG("send recv failed: %d", pAction->GetErrno()); 458 continue; 459 } 460 461 iRet = pAction->DoProcess(); 462 if (iRet < 0) 463 { 464 MTLOG_DEBUG("action process failed: %d", iRet); 465 continue; 466 } 467 } 468 469 for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 470 { 471 IMtAction* pAction = *it; 472 pAction->Reset(); 473 } 474 475 return 0; 476 } 477