1*a9643ea8Slogwang 2*a9643ea8Slogwang /** 3*a9643ea8Slogwang * Tencent is pleased to support the open source community by making MSEC available. 4*a9643ea8Slogwang * 5*a9643ea8Slogwang * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6*a9643ea8Slogwang * 7*a9643ea8Slogwang * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8*a9643ea8Slogwang * you may not use this file except in compliance with the License. You may 9*a9643ea8Slogwang * obtain a copy of the License at 10*a9643ea8Slogwang * 11*a9643ea8Slogwang * https://opensource.org/licenses/GPL-2.0 12*a9643ea8Slogwang * 13*a9643ea8Slogwang * Unless required by applicable law or agreed to in writing, software distributed under the 14*a9643ea8Slogwang * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15*a9643ea8Slogwang * either express or implied. See the License for the specific language governing permissions 16*a9643ea8Slogwang * and limitations under the License. 17*a9643ea8Slogwang */ 18*a9643ea8Slogwang 19*a9643ea8Slogwang 20*a9643ea8Slogwang /** 21*a9643ea8Slogwang * @file mt_concurrent.c 22*a9643ea8Slogwang * @info ��·������ģ����չ 23*a9643ea8Slogwang * @time 20130924 24*a9643ea8Slogwang **/ 25*a9643ea8Slogwang 26*a9643ea8Slogwang #include "micro_thread.h" 27*a9643ea8Slogwang #include "mt_msg.h" 28*a9643ea8Slogwang #include "mt_notify.h" 29*a9643ea8Slogwang #include "mt_connection.h" 30*a9643ea8Slogwang #include "mt_concurrent.h" 31*a9643ea8Slogwang 32*a9643ea8Slogwang using namespace std; 33*a9643ea8Slogwang using namespace NS_MICRO_THREAD; 34*a9643ea8Slogwang 35*a9643ea8Slogwang 36*a9643ea8Slogwang /** 37*a9643ea8Slogwang * @brief ��·IO�Ĵ����Ż�, �첽���ȵȴ����� 38*a9643ea8Slogwang * @param req_list - �����б� 39*a9643ea8Slogwang * @param how - EPOLLIN EPOLLOUT 40*a9643ea8Slogwang * @param timeout - ��ʱʱ�� ���뵥λ 41*a9643ea8Slogwang * @return 0 �ɹ�, <0ʧ�� -3 ����ʱ 42*a9643ea8Slogwang */ 43*a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_netfd_poll(IMtActList& req_list, int how, int timeout) 44*a9643ea8Slogwang { 45*a9643ea8Slogwang KqObjList fdlist; 46*a9643ea8Slogwang TAILQ_INIT(&fdlist); 47*a9643ea8Slogwang 48*a9643ea8Slogwang KqueuerObj* obj = NULL; 49*a9643ea8Slogwang IMtAction* action = NULL; 50*a9643ea8Slogwang for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 51*a9643ea8Slogwang { 52*a9643ea8Slogwang action = *it; 53*a9643ea8Slogwang if (action) { 54*a9643ea8Slogwang obj = action->GetNtfyObj(); 55*a9643ea8Slogwang } 56*a9643ea8Slogwang if (!action || !obj) 57*a9643ea8Slogwang { 58*a9643ea8Slogwang action->SetErrno(ERR_FRAME_ERROR); 59*a9643ea8Slogwang MTLOG_ERROR("input action %p, or ntify null, error", action); 60*a9643ea8Slogwang return -1; 61*a9643ea8Slogwang } 62*a9643ea8Slogwang 63*a9643ea8Slogwang obj->SetRcvEvents(0); 64*a9643ea8Slogwang if (how & KQ_EVENT_READ) 65*a9643ea8Slogwang { 66*a9643ea8Slogwang obj->EnableInput(); 67*a9643ea8Slogwang } 68*a9643ea8Slogwang else 69*a9643ea8Slogwang { 70*a9643ea8Slogwang obj->DisableInput(); 71*a9643ea8Slogwang } 72*a9643ea8Slogwang 73*a9643ea8Slogwang if (how & KQ_EVENT_WRITE) 74*a9643ea8Slogwang { 75*a9643ea8Slogwang obj->EnableOutput(); 76*a9643ea8Slogwang } 77*a9643ea8Slogwang else 78*a9643ea8Slogwang { 79*a9643ea8Slogwang obj->DisableOutput(); 80*a9643ea8Slogwang } 81*a9643ea8Slogwang 82*a9643ea8Slogwang TAILQ_INSERT_TAIL(&fdlist, obj, _entry); 83*a9643ea8Slogwang 84*a9643ea8Slogwang } 85*a9643ea8Slogwang 86*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 87*a9643ea8Slogwang if (!mtframe || !mtframe->KqueueSchedule(&fdlist, NULL, (int)timeout)) 88*a9643ea8Slogwang { 89*a9643ea8Slogwang if (errno != ETIME) 90*a9643ea8Slogwang { 91*a9643ea8Slogwang action->SetErrno(ERR_KQUEUE_FAIL); 92*a9643ea8Slogwang MTLOG_ERROR("Mtframe %p, epoll schedule failed, errno %d", mtframe, errno); 93*a9643ea8Slogwang return -2; 94*a9643ea8Slogwang } 95*a9643ea8Slogwang 96*a9643ea8Slogwang return -3; 97*a9643ea8Slogwang } 98*a9643ea8Slogwang 99*a9643ea8Slogwang return 0; 100*a9643ea8Slogwang } 101*a9643ea8Slogwang 102*a9643ea8Slogwang /** 103*a9643ea8Slogwang * @brief Ϊÿ��ITEM���������ĵ�socket 104*a9643ea8Slogwang * @param req_list - �����б� 105*a9643ea8Slogwang * @return 0 �ɹ�, <0ʧ�� 106*a9643ea8Slogwang */ 107*a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_newsock(IMtActList& req_list) 108*a9643ea8Slogwang { 109*a9643ea8Slogwang int sock = -1, has_ok = 0; 110*a9643ea8Slogwang IMtAction* action = NULL; 111*a9643ea8Slogwang IMtConnection* net_handler = NULL; 112*a9643ea8Slogwang 113*a9643ea8Slogwang for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 114*a9643ea8Slogwang { 115*a9643ea8Slogwang action = *it; 116*a9643ea8Slogwang if (NULL == action) 117*a9643ea8Slogwang { 118*a9643ea8Slogwang action->SetErrno(ERR_PARAM_ERROR); 119*a9643ea8Slogwang MTLOG_ERROR("Invalid param, conn %p null!!", action); 120*a9643ea8Slogwang return -1; 121*a9643ea8Slogwang } 122*a9643ea8Slogwang 123*a9643ea8Slogwang if (action->GetErrno() != ERR_NONE) { 124*a9643ea8Slogwang continue; 125*a9643ea8Slogwang } 126*a9643ea8Slogwang 127*a9643ea8Slogwang net_handler = action->GetIConnection(); 128*a9643ea8Slogwang if (NULL == net_handler) 129*a9643ea8Slogwang { 130*a9643ea8Slogwang action->SetErrno(ERR_FRAME_ERROR); 131*a9643ea8Slogwang MTLOG_ERROR("Invalid param, conn %p null!!", net_handler); 132*a9643ea8Slogwang return -2; 133*a9643ea8Slogwang } 134*a9643ea8Slogwang 135*a9643ea8Slogwang sock = net_handler->CreateSocket(); 136*a9643ea8Slogwang if (sock < 0) 137*a9643ea8Slogwang { 138*a9643ea8Slogwang action->SetErrno(ERR_SOCKET_FAIL); 139*a9643ea8Slogwang MTLOG_ERROR("Get sock data failed, ret %d, errno %d!!", sock, errno); 140*a9643ea8Slogwang return -3; 141*a9643ea8Slogwang } 142*a9643ea8Slogwang has_ok = 1; 143*a9643ea8Slogwang 144*a9643ea8Slogwang if (action->GetProtoType() == MT_UDP) 145*a9643ea8Slogwang { 146*a9643ea8Slogwang action->SetMsgFlag(MULTI_FLAG_OPEN); 147*a9643ea8Slogwang } 148*a9643ea8Slogwang else 149*a9643ea8Slogwang { 150*a9643ea8Slogwang action->SetMsgFlag(MULTI_FLAG_INIT); 151*a9643ea8Slogwang } 152*a9643ea8Slogwang } 153*a9643ea8Slogwang 154*a9643ea8Slogwang if (has_ok) 155*a9643ea8Slogwang { 156*a9643ea8Slogwang return 0; 157*a9643ea8Slogwang } 158*a9643ea8Slogwang else 159*a9643ea8Slogwang { 160*a9643ea8Slogwang return -4; 161*a9643ea8Slogwang } 162*a9643ea8Slogwang } 163*a9643ea8Slogwang 164*a9643ea8Slogwang 165*a9643ea8Slogwang /** 166*a9643ea8Slogwang * @brief ��·IO�Ĵ���, ������ 167*a9643ea8Slogwang * @param req_list - �����б� 168*a9643ea8Slogwang * @param timeout - ��ʱʱ�� ���뵥λ 169*a9643ea8Slogwang * @return 0 �ɹ�, <0ʧ�� 170*a9643ea8Slogwang */ 171*a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_open(IMtActList& req_list, int timeout) 172*a9643ea8Slogwang { 173*a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 174*a9643ea8Slogwang utime64_t end_ms = start_ms + timeout; 175*a9643ea8Slogwang utime64_t curr_ms = 0; 176*a9643ea8Slogwang 177*a9643ea8Slogwang int ret = 0, has_open = 0; 178*a9643ea8Slogwang IMtAction* action = NULL; 179*a9643ea8Slogwang IMtConnection* net_handler = NULL; 180*a9643ea8Slogwang IMtActList::iterator it; 181*a9643ea8Slogwang 182*a9643ea8Slogwang while (1) 183*a9643ea8Slogwang { 184*a9643ea8Slogwang IMtActList wait_list; 185*a9643ea8Slogwang for (it = req_list.begin(); it != req_list.end(); ++it) 186*a9643ea8Slogwang { 187*a9643ea8Slogwang action = *it; 188*a9643ea8Slogwang if (action->GetErrno() != ERR_NONE) { 189*a9643ea8Slogwang continue; 190*a9643ea8Slogwang } 191*a9643ea8Slogwang 192*a9643ea8Slogwang if (action->GetMsgFlag() == MULTI_FLAG_OPEN) { 193*a9643ea8Slogwang has_open = 1; 194*a9643ea8Slogwang continue; 195*a9643ea8Slogwang } 196*a9643ea8Slogwang 197*a9643ea8Slogwang net_handler = action->GetIConnection(); 198*a9643ea8Slogwang if (NULL == net_handler) 199*a9643ea8Slogwang { 200*a9643ea8Slogwang action->SetErrno(ERR_FRAME_ERROR); 201*a9643ea8Slogwang MTLOG_ERROR("Invalid param, conn %p null!!", net_handler); 202*a9643ea8Slogwang return -1; 203*a9643ea8Slogwang } 204*a9643ea8Slogwang 205*a9643ea8Slogwang ret = net_handler->OpenCnnect(); 206*a9643ea8Slogwang if (ret < 0) 207*a9643ea8Slogwang { 208*a9643ea8Slogwang wait_list.push_back(action); 209*a9643ea8Slogwang } 210*a9643ea8Slogwang else 211*a9643ea8Slogwang { 212*a9643ea8Slogwang action->SetMsgFlag(MULTI_FLAG_OPEN); 213*a9643ea8Slogwang } 214*a9643ea8Slogwang } 215*a9643ea8Slogwang 216*a9643ea8Slogwang curr_ms = MtFrame::Instance()->GetLastClock(); 217*a9643ea8Slogwang if (curr_ms > end_ms) 218*a9643ea8Slogwang { 219*a9643ea8Slogwang MTLOG_DEBUG("Open connect timeout, errno %d", errno); 220*a9643ea8Slogwang for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it) 221*a9643ea8Slogwang { 222*a9643ea8Slogwang (*it)->SetErrno(ERR_CONNECT_FAIL); 223*a9643ea8Slogwang } 224*a9643ea8Slogwang 225*a9643ea8Slogwang if (!has_open) 226*a9643ea8Slogwang { 227*a9643ea8Slogwang return 0; 228*a9643ea8Slogwang } 229*a9643ea8Slogwang else 230*a9643ea8Slogwang { 231*a9643ea8Slogwang return -2; 232*a9643ea8Slogwang } 233*a9643ea8Slogwang } 234*a9643ea8Slogwang 235*a9643ea8Slogwang if (!wait_list.empty()) 236*a9643ea8Slogwang { 237*a9643ea8Slogwang mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms); 238*a9643ea8Slogwang } 239*a9643ea8Slogwang else 240*a9643ea8Slogwang { 241*a9643ea8Slogwang return 0; 242*a9643ea8Slogwang } 243*a9643ea8Slogwang } 244*a9643ea8Slogwang 245*a9643ea8Slogwang } 246*a9643ea8Slogwang 247*a9643ea8Slogwang 248*a9643ea8Slogwang /** 249*a9643ea8Slogwang * @brief ��·IO�Ĵ���, �������� 250*a9643ea8Slogwang * @param req_list - �����б� 251*a9643ea8Slogwang * @param timeout - ��ʱʱ�� ���뵥λ 252*a9643ea8Slogwang * @return 0 �ɹ�, <0ʧ�� 253*a9643ea8Slogwang */ 254*a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_sendto(IMtActList& req_list, int timeout) 255*a9643ea8Slogwang { 256*a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 257*a9643ea8Slogwang utime64_t end_ms = start_ms + timeout; 258*a9643ea8Slogwang utime64_t curr_ms = 0; 259*a9643ea8Slogwang 260*a9643ea8Slogwang int ret = 0, has_send = 0; 261*a9643ea8Slogwang IMtAction* action = NULL; 262*a9643ea8Slogwang IMtConnection* net_handler = NULL; 263*a9643ea8Slogwang 264*a9643ea8Slogwang while (1) 265*a9643ea8Slogwang { 266*a9643ea8Slogwang IMtActList wait_list; 267*a9643ea8Slogwang for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 268*a9643ea8Slogwang { 269*a9643ea8Slogwang action = *it; 270*a9643ea8Slogwang if (action->GetErrno() != ERR_NONE) { 271*a9643ea8Slogwang continue; 272*a9643ea8Slogwang } 273*a9643ea8Slogwang 274*a9643ea8Slogwang if (action->GetMsgFlag() == MULTI_FLAG_SEND) { 275*a9643ea8Slogwang has_send = 1; 276*a9643ea8Slogwang continue; 277*a9643ea8Slogwang } 278*a9643ea8Slogwang 279*a9643ea8Slogwang net_handler = action->GetIConnection(); 280*a9643ea8Slogwang if (NULL == net_handler) 281*a9643ea8Slogwang { 282*a9643ea8Slogwang action->SetErrno(ERR_FRAME_ERROR); 283*a9643ea8Slogwang MTLOG_ERROR("Invalid param, conn %p null!!", net_handler); 284*a9643ea8Slogwang return -2; 285*a9643ea8Slogwang } 286*a9643ea8Slogwang 287*a9643ea8Slogwang // 0 -��Ҫ��������; -1 ֹͣ����; > 0 ����OK 288*a9643ea8Slogwang ret = net_handler->SendData(); 289*a9643ea8Slogwang if (ret == -1) 290*a9643ea8Slogwang { 291*a9643ea8Slogwang action->SetErrno(ERR_SEND_FAIL); 292*a9643ea8Slogwang MTLOG_ERROR("MultiItem msg send error, %d", errno); 293*a9643ea8Slogwang continue; 294*a9643ea8Slogwang } 295*a9643ea8Slogwang else if (ret == 0) 296*a9643ea8Slogwang { 297*a9643ea8Slogwang wait_list.push_back(action); 298*a9643ea8Slogwang continue; 299*a9643ea8Slogwang } 300*a9643ea8Slogwang else 301*a9643ea8Slogwang { 302*a9643ea8Slogwang action->SetMsgFlag(MULTI_FLAG_SEND); 303*a9643ea8Slogwang } 304*a9643ea8Slogwang } 305*a9643ea8Slogwang 306*a9643ea8Slogwang curr_ms = MtFrame::Instance()->GetLastClock(); 307*a9643ea8Slogwang if (curr_ms > end_ms) 308*a9643ea8Slogwang { 309*a9643ea8Slogwang MTLOG_DEBUG("send data timeout"); 310*a9643ea8Slogwang for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it) 311*a9643ea8Slogwang { 312*a9643ea8Slogwang (*it)->SetErrno(ERR_SEND_FAIL); 313*a9643ea8Slogwang } 314*a9643ea8Slogwang 315*a9643ea8Slogwang if (has_send) 316*a9643ea8Slogwang { 317*a9643ea8Slogwang return 0; 318*a9643ea8Slogwang } 319*a9643ea8Slogwang else 320*a9643ea8Slogwang { 321*a9643ea8Slogwang return -5; 322*a9643ea8Slogwang } 323*a9643ea8Slogwang } 324*a9643ea8Slogwang 325*a9643ea8Slogwang if (!wait_list.empty()) 326*a9643ea8Slogwang { 327*a9643ea8Slogwang mt_multi_netfd_poll(wait_list, KQ_EVENT_WRITE, end_ms - curr_ms); 328*a9643ea8Slogwang } 329*a9643ea8Slogwang else 330*a9643ea8Slogwang { 331*a9643ea8Slogwang return 0; 332*a9643ea8Slogwang } 333*a9643ea8Slogwang } 334*a9643ea8Slogwang 335*a9643ea8Slogwang return 0; 336*a9643ea8Slogwang } 337*a9643ea8Slogwang 338*a9643ea8Slogwang 339*a9643ea8Slogwang 340*a9643ea8Slogwang /** 341*a9643ea8Slogwang * @brief ��·IO�������մ��� 342*a9643ea8Slogwang */ 343*a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_recvfrom(IMtActList& req_list, int timeout) 344*a9643ea8Slogwang { 345*a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 346*a9643ea8Slogwang utime64_t end_ms = start_ms + timeout; 347*a9643ea8Slogwang utime64_t curr_ms = 0; 348*a9643ea8Slogwang 349*a9643ea8Slogwang int ret = 0; 350*a9643ea8Slogwang IMtAction* action = NULL; 351*a9643ea8Slogwang IMtConnection* net_handler = NULL; 352*a9643ea8Slogwang 353*a9643ea8Slogwang while (1) 354*a9643ea8Slogwang { 355*a9643ea8Slogwang IMtActList wait_list; 356*a9643ea8Slogwang for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 357*a9643ea8Slogwang { 358*a9643ea8Slogwang action = *it; 359*a9643ea8Slogwang if (action->GetErrno() != ERR_NONE) { 360*a9643ea8Slogwang continue; 361*a9643ea8Slogwang } 362*a9643ea8Slogwang 363*a9643ea8Slogwang if (MULTI_FLAG_FIN == action->GetMsgFlag()) ///< �Ѵ������ 364*a9643ea8Slogwang { 365*a9643ea8Slogwang continue; 366*a9643ea8Slogwang } 367*a9643ea8Slogwang 368*a9643ea8Slogwang net_handler = action->GetIConnection(); 369*a9643ea8Slogwang if (NULL == net_handler) 370*a9643ea8Slogwang { 371*a9643ea8Slogwang action->SetErrno(ERR_FRAME_ERROR); 372*a9643ea8Slogwang MTLOG_ERROR("Invalid param, conn %p null!!", net_handler); 373*a9643ea8Slogwang return -2; 374*a9643ea8Slogwang } 375*a9643ea8Slogwang 376*a9643ea8Slogwang // <0 ʧ��, 0 ������, >0 �ɹ� 377*a9643ea8Slogwang ret = net_handler->RecvData(); 378*a9643ea8Slogwang if (ret < 0) 379*a9643ea8Slogwang { 380*a9643ea8Slogwang action->SetErrno(ERR_RECV_FAIL); 381*a9643ea8Slogwang MTLOG_ERROR("MultiItem msg recv failed: %p", net_handler); 382*a9643ea8Slogwang continue; 383*a9643ea8Slogwang } 384*a9643ea8Slogwang else if (ret == 0) 385*a9643ea8Slogwang { 386*a9643ea8Slogwang wait_list.push_back(action); 387*a9643ea8Slogwang continue; 388*a9643ea8Slogwang } 389*a9643ea8Slogwang else 390*a9643ea8Slogwang { 391*a9643ea8Slogwang action->SetMsgFlag(MULTI_FLAG_FIN); 392*a9643ea8Slogwang action->SetCost(MtFrame::Instance()->GetLastClock() - start_ms); 393*a9643ea8Slogwang } 394*a9643ea8Slogwang } 395*a9643ea8Slogwang 396*a9643ea8Slogwang curr_ms = MtFrame::Instance()->GetLastClock(); 397*a9643ea8Slogwang if (curr_ms > end_ms) 398*a9643ea8Slogwang { 399*a9643ea8Slogwang MTLOG_DEBUG("Recv data timeout, curr %llu, start: %llu", curr_ms, start_ms); 400*a9643ea8Slogwang for (IMtActList::iterator it = wait_list.begin(); it != wait_list.end(); ++it) 401*a9643ea8Slogwang { 402*a9643ea8Slogwang (*it)->SetErrno(ERR_RECV_TIMEOUT); 403*a9643ea8Slogwang } 404*a9643ea8Slogwang return -5; 405*a9643ea8Slogwang } 406*a9643ea8Slogwang 407*a9643ea8Slogwang if (!wait_list.empty()) 408*a9643ea8Slogwang { 409*a9643ea8Slogwang mt_multi_netfd_poll(wait_list, KQ_EVENT_READ, end_ms - curr_ms); 410*a9643ea8Slogwang } 411*a9643ea8Slogwang else 412*a9643ea8Slogwang { 413*a9643ea8Slogwang return 0; 414*a9643ea8Slogwang } 415*a9643ea8Slogwang } 416*a9643ea8Slogwang } 417*a9643ea8Slogwang 418*a9643ea8Slogwang /** 419*a9643ea8Slogwang * @brief ��·IO�������մ��� 420*a9643ea8Slogwang */ 421*a9643ea8Slogwang int NS_MICRO_THREAD::mt_multi_sendrcv_ex(IMtActList& req_list, int timeout) 422*a9643ea8Slogwang { 423*a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 424*a9643ea8Slogwang utime64_t curr_ms = 0; 425*a9643ea8Slogwang 426*a9643ea8Slogwang int rc = mt_multi_newsock(req_list); // TODO, ����ȡconnect��ʱʱ��� 427*a9643ea8Slogwang if (rc < 0) 428*a9643ea8Slogwang { 429*a9643ea8Slogwang MT_ATTR_API(320842, 1); // socketʧ�� 430*a9643ea8Slogwang MTLOG_ERROR("mt_multi_sendrcv new sock failed, ret: %d", rc); 431*a9643ea8Slogwang return -1; 432*a9643ea8Slogwang } 433*a9643ea8Slogwang 434*a9643ea8Slogwang rc = mt_multi_open(req_list, timeout); 435*a9643ea8Slogwang if (rc < 0) 436*a9643ea8Slogwang { 437*a9643ea8Slogwang MT_ATTR_API(320843, 1); // connectʧ�� 438*a9643ea8Slogwang MTLOG_ERROR("mt_multi_sendrcv open failed, ret: %d", rc); 439*a9643ea8Slogwang return -2; 440*a9643ea8Slogwang } 441*a9643ea8Slogwang 442*a9643ea8Slogwang curr_ms = MtFrame::Instance()->GetLastClock(); 443*a9643ea8Slogwang rc = mt_multi_sendto(req_list, timeout - (curr_ms - start_ms)); 444*a9643ea8Slogwang if (rc < 0) 445*a9643ea8Slogwang { 446*a9643ea8Slogwang MT_ATTR_API(320844, 1); // ����ʧ�� 447*a9643ea8Slogwang MTLOG_ERROR("mt_multi_sendrcv send failed, ret: %d", rc); 448*a9643ea8Slogwang return -3; 449*a9643ea8Slogwang } 450*a9643ea8Slogwang 451*a9643ea8Slogwang curr_ms = MtFrame::Instance()->GetLastClock(); 452*a9643ea8Slogwang rc = mt_multi_recvfrom(req_list, timeout - (curr_ms - start_ms)); 453*a9643ea8Slogwang if (rc < 0) 454*a9643ea8Slogwang { 455*a9643ea8Slogwang MT_ATTR_API(320845, 1); // ����δ��ȫ�ɹ� 456*a9643ea8Slogwang MTLOG_ERROR("mt_multi_sendrcv recv failed, ret: %d", rc); 457*a9643ea8Slogwang return -4; 458*a9643ea8Slogwang } 459*a9643ea8Slogwang 460*a9643ea8Slogwang return 0; 461*a9643ea8Slogwang } 462*a9643ea8Slogwang 463*a9643ea8Slogwang 464*a9643ea8Slogwang /** 465*a9643ea8Slogwang * @brief ��·IO�������մ���ӿ�, ��װACTON�ӿ�ģ��, �ڲ�����msg 466*a9643ea8Slogwang * @param req_list -action list ʵ�ַ�װ�����ӿ� 467*a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 468*a9643ea8Slogwang * @return 0 �ɹ�, -1 ��ʼ������ʧ��, �����ɹ��ֳɹ� 469*a9643ea8Slogwang */ 470*a9643ea8Slogwang int NS_MICRO_THREAD::mt_msg_sendrcv(IMtActList& req_list, int timeout) 471*a9643ea8Slogwang { 472*a9643ea8Slogwang int iRet = 0; 473*a9643ea8Slogwang 474*a9643ea8Slogwang // ��һ��, ��ʼ��action����, ��װ������ 475*a9643ea8Slogwang for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 476*a9643ea8Slogwang { 477*a9643ea8Slogwang IMtAction* pAction = *it; 478*a9643ea8Slogwang if (!pAction || pAction->InitConnEnv() < 0) 479*a9643ea8Slogwang { 480*a9643ea8Slogwang MTLOG_ERROR("invalid action(%p) or int failed, error", pAction); 481*a9643ea8Slogwang return -1; 482*a9643ea8Slogwang } 483*a9643ea8Slogwang 484*a9643ea8Slogwang iRet = pAction->DoEncode(); 485*a9643ea8Slogwang if (iRet < 0) 486*a9643ea8Slogwang { 487*a9643ea8Slogwang pAction->SetErrno(ERR_ENCODE_ERROR); 488*a9643ea8Slogwang MTLOG_ERROR("pack action pkg failed, act %p, ret %d", pAction, iRet); 489*a9643ea8Slogwang continue; 490*a9643ea8Slogwang } 491*a9643ea8Slogwang 492*a9643ea8Slogwang } 493*a9643ea8Slogwang 494*a9643ea8Slogwang // �ڶ���, ͬ���շ���Ϣ, ʧ��Ҳ��Ҫ֪ͨ���� 495*a9643ea8Slogwang mt_multi_sendrcv_ex(req_list, timeout); 496*a9643ea8Slogwang 497*a9643ea8Slogwang // ������, ͬ��֪ͨ������� 498*a9643ea8Slogwang for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 499*a9643ea8Slogwang { 500*a9643ea8Slogwang IMtAction* pAction = *it; 501*a9643ea8Slogwang 502*a9643ea8Slogwang if (pAction->GetMsgFlag() != MULTI_FLAG_FIN) 503*a9643ea8Slogwang { 504*a9643ea8Slogwang pAction->DoError(); 505*a9643ea8Slogwang MTLOG_DEBUG("send recv failed: %d", pAction->GetErrno()); 506*a9643ea8Slogwang continue; 507*a9643ea8Slogwang } 508*a9643ea8Slogwang 509*a9643ea8Slogwang iRet = pAction->DoProcess(); 510*a9643ea8Slogwang if (iRet < 0) 511*a9643ea8Slogwang { 512*a9643ea8Slogwang MTLOG_DEBUG("action process failed: %d", iRet); 513*a9643ea8Slogwang continue; 514*a9643ea8Slogwang } 515*a9643ea8Slogwang } 516*a9643ea8Slogwang 517*a9643ea8Slogwang // ���IJ�, �������ڲ���Դ, ���ݸ����÷� 518*a9643ea8Slogwang for (IMtActList::iterator it = req_list.begin(); it != req_list.end(); ++it) 519*a9643ea8Slogwang { 520*a9643ea8Slogwang IMtAction* pAction = *it; 521*a9643ea8Slogwang pAction->Reset(); 522*a9643ea8Slogwang } 523*a9643ea8Slogwang 524*a9643ea8Slogwang return 0; 525*a9643ea8Slogwang } 526*a9643ea8Slogwang 527*a9643ea8Slogwang 528*a9643ea8Slogwang 529