1*a9643ea8Slogwang 2*a9643ea8Slogwang /** 3*a9643ea8Slogwang * Tencent is pleased to support the open source community by making MSEC available. 4*a9643ea8Slogwang * 5*a9643ea8Slogwang * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. 6*a9643ea8Slogwang * 7*a9643ea8Slogwang * Licensed under the GNU General Public License, Version 2.0 (the "License"); 8*a9643ea8Slogwang * you may not use this file except in compliance with the License. You may 9*a9643ea8Slogwang * obtain a copy of the License at 10*a9643ea8Slogwang * 11*a9643ea8Slogwang * https://opensource.org/licenses/GPL-2.0 12*a9643ea8Slogwang * 13*a9643ea8Slogwang * Unless required by applicable law or agreed to in writing, software distributed under the 14*a9643ea8Slogwang * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 15*a9643ea8Slogwang * either express or implied. See the License for the specific language governing permissions 16*a9643ea8Slogwang * and limitations under the License. 17*a9643ea8Slogwang */ 18*a9643ea8Slogwang 19*a9643ea8Slogwang 20*a9643ea8Slogwang /** 21*a9643ea8Slogwang * @filename mt_sys_call.cpp 22*a9643ea8Slogwang * @info �̷߳�װϵͳapi, ͬ�������߳�API��ʵ���첽���� 23*a9643ea8Slogwang */ 24*a9643ea8Slogwang 25*a9643ea8Slogwang #include "kqueue_proxy.h" 26*a9643ea8Slogwang #include "micro_thread.h" 27*a9643ea8Slogwang #include "mt_connection.h" 28*a9643ea8Slogwang #include "mt_api.h" 29*a9643ea8Slogwang #include "ff_api.h" 30*a9643ea8Slogwang #include "mt_sys_hook.h" 31*a9643ea8Slogwang 32*a9643ea8Slogwang namespace NS_MICRO_THREAD { 33*a9643ea8Slogwang 34*a9643ea8Slogwang /** 35*a9643ea8Slogwang * @brief ��������˿ڵ�socket�շ��ӿ�, ��socket������������, ҵ������֤������ 36*a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 37*a9643ea8Slogwang * @param pkg -�������װ�İ��� 38*a9643ea8Slogwang * @param len -�������װ�İ��峤�� 39*a9643ea8Slogwang * @param rcv_buf -����Ӧ�����buff 40*a9643ea8Slogwang * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, ��ΪӦ������� 41*a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 42*a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, �ɴ�ӡerrno, -10 ������Ч 43*a9643ea8Slogwang */ 44*a9643ea8Slogwang int mt_udpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout) 45*a9643ea8Slogwang { 46*a9643ea8Slogwang int ret = 0; 47*a9643ea8Slogwang int rc = 0; 48*a9643ea8Slogwang int flags = 1; 49*a9643ea8Slogwang struct sockaddr_in from_addr = {0}; 50*a9643ea8Slogwang int addr_len = sizeof(from_addr); 51*a9643ea8Slogwang 52*a9643ea8Slogwang if(len<1 || buf_size<1 ||!dst || !pkg || !rcv_buf) 53*a9643ea8Slogwang { 54*a9643ea8Slogwang MTLOG_ERROR("mt_udpsendrcv input params invalid, dst[%p], pkg[%p], rcv_buf[%p], len[%d], buf_size[%d]", 55*a9643ea8Slogwang dst, pkg, rcv_buf, len, buf_size); 56*a9643ea8Slogwang return -10; 57*a9643ea8Slogwang } 58*a9643ea8Slogwang 59*a9643ea8Slogwang int sock = socket(PF_INET, SOCK_DGRAM, 0); 60*a9643ea8Slogwang if ((sock < 0) || (ioctl(sock, FIONBIO, &flags) < 0)) 61*a9643ea8Slogwang { 62*a9643ea8Slogwang MT_ATTR_API(320842, 1); // socketʧ�� 63*a9643ea8Slogwang MTLOG_ERROR("mt_udpsendrcv new sock failed, sock: %d, errno: %d (%m)", sock, errno); 64*a9643ea8Slogwang ret = -1; 65*a9643ea8Slogwang goto EXIT_LABEL; 66*a9643ea8Slogwang } 67*a9643ea8Slogwang 68*a9643ea8Slogwang rc = MtFrame::sendto(sock, pkg, len, 0, (struct sockaddr*)dst, (int)sizeof(*dst), timeout); 69*a9643ea8Slogwang if (rc < 0) 70*a9643ea8Slogwang { 71*a9643ea8Slogwang MT_ATTR_API(320844, 1); // ����ʧ�� 72*a9643ea8Slogwang MTLOG_ERROR("mt_udpsendrcv send failed, rc: %d, errno: %d (%m)", rc, errno); 73*a9643ea8Slogwang ret = -2; 74*a9643ea8Slogwang goto EXIT_LABEL; 75*a9643ea8Slogwang } 76*a9643ea8Slogwang 77*a9643ea8Slogwang rc = MtFrame::recvfrom(sock, rcv_buf, buf_size, 0, (struct sockaddr*)&from_addr, (socklen_t*)&addr_len, timeout); 78*a9643ea8Slogwang if (rc < 0) 79*a9643ea8Slogwang { 80*a9643ea8Slogwang MT_ATTR_API(320845, 1); // ����δ��ȫ�ɹ� 81*a9643ea8Slogwang MTLOG_ERROR("mt_udpsendrcv recv failed, rc: %d, errno: %d (%m)", rc, errno); 82*a9643ea8Slogwang ret = -3; 83*a9643ea8Slogwang goto EXIT_LABEL; 84*a9643ea8Slogwang } 85*a9643ea8Slogwang buf_size = rc; 86*a9643ea8Slogwang 87*a9643ea8Slogwang EXIT_LABEL: 88*a9643ea8Slogwang 89*a9643ea8Slogwang if (sock > 0) 90*a9643ea8Slogwang { 91*a9643ea8Slogwang close(sock); 92*a9643ea8Slogwang sock = -1; 93*a9643ea8Slogwang } 94*a9643ea8Slogwang 95*a9643ea8Slogwang return ret; 96*a9643ea8Slogwang } 97*a9643ea8Slogwang 98*a9643ea8Slogwang /** 99*a9643ea8Slogwang * @brief ����TCP���֣�������Ϊ������ 100*a9643ea8Slogwang * @return >=0 �ɹ�, <0 ʧ�� 101*a9643ea8Slogwang */ 102*a9643ea8Slogwang int mt_tcp_create_sock(void) 103*a9643ea8Slogwang { 104*a9643ea8Slogwang int fd; 105*a9643ea8Slogwang int flag; 106*a9643ea8Slogwang 107*a9643ea8Slogwang // ����socket 108*a9643ea8Slogwang fd = ::socket(AF_INET, SOCK_STREAM, 0); 109*a9643ea8Slogwang if (fd < 0) 110*a9643ea8Slogwang { 111*a9643ea8Slogwang MTLOG_ERROR("create tcp socket failed, error: %m"); 112*a9643ea8Slogwang return -1; 113*a9643ea8Slogwang } 114*a9643ea8Slogwang 115*a9643ea8Slogwang // ����socket������ 116*a9643ea8Slogwang flag = fcntl(fd, F_GETFL, 0); 117*a9643ea8Slogwang if (flag == -1) 118*a9643ea8Slogwang { 119*a9643ea8Slogwang ::close(fd); 120*a9643ea8Slogwang MTLOG_ERROR("get fd flags failed, error: %m"); 121*a9643ea8Slogwang return -2; 122*a9643ea8Slogwang } 123*a9643ea8Slogwang 124*a9643ea8Slogwang if (flag & O_NONBLOCK) 125*a9643ea8Slogwang return fd; 126*a9643ea8Slogwang 127*a9643ea8Slogwang if (fcntl(fd, F_SETFL, flag | O_NONBLOCK | O_NDELAY) == -1) 128*a9643ea8Slogwang { 129*a9643ea8Slogwang ::close(fd); 130*a9643ea8Slogwang MTLOG_ERROR("set fd flags failed, error: %m"); 131*a9643ea8Slogwang return -3; 132*a9643ea8Slogwang } 133*a9643ea8Slogwang 134*a9643ea8Slogwang return fd; 135*a9643ea8Slogwang } 136*a9643ea8Slogwang 137*a9643ea8Slogwang /** 138*a9643ea8Slogwang * @brief TCP��ȡ������֪ͨ������socket 139*a9643ea8Slogwang */ 140*a9643ea8Slogwang static TcpKeepConn* mt_tcp_get_keep_conn(struct sockaddr_in* dst, int& sock) 141*a9643ea8Slogwang { 142*a9643ea8Slogwang // 1. ��ȡ�߳�֪ͨע����� 143*a9643ea8Slogwang KqueuerObj* ntfy_obj = NtfyObjMgr::Instance()->GetNtfyObj(NTFY_OBJ_THREAD, 0); 144*a9643ea8Slogwang if (NULL == ntfy_obj) 145*a9643ea8Slogwang { 146*a9643ea8Slogwang MTLOG_ERROR("get notify failed, logit"); 147*a9643ea8Slogwang return NULL; 148*a9643ea8Slogwang } 149*a9643ea8Slogwang 150*a9643ea8Slogwang // 2. ��ȡ���Ӷ���, ����֪ͨ��Ϣ 151*a9643ea8Slogwang TcpKeepConn* conn = dynamic_cast<TcpKeepConn*>(ConnectionMgr::Instance()->GetConnection(OBJ_TCP_KEEP, dst)); 152*a9643ea8Slogwang if (NULL == conn) 153*a9643ea8Slogwang { 154*a9643ea8Slogwang MTLOG_ERROR("get connection failed, dst[%p]", dst); 155*a9643ea8Slogwang NtfyObjMgr::Instance()->FreeNtfyObj(ntfy_obj); 156*a9643ea8Slogwang return NULL; 157*a9643ea8Slogwang } 158*a9643ea8Slogwang conn->SetNtfyObj(ntfy_obj); 159*a9643ea8Slogwang 160*a9643ea8Slogwang // 3. ��������socket��� 161*a9643ea8Slogwang int osfd = conn->CreateSocket(); 162*a9643ea8Slogwang if (osfd < 0) 163*a9643ea8Slogwang { 164*a9643ea8Slogwang ConnectionMgr::Instance()->FreeConnection(conn, true); 165*a9643ea8Slogwang MTLOG_ERROR("create socket failed, ret[%d]", osfd); 166*a9643ea8Slogwang return NULL; 167*a9643ea8Slogwang } 168*a9643ea8Slogwang 169*a9643ea8Slogwang // 4. �ɹ��������� 170*a9643ea8Slogwang sock = osfd; 171*a9643ea8Slogwang return conn; 172*a9643ea8Slogwang } 173*a9643ea8Slogwang 174*a9643ea8Slogwang /** 175*a9643ea8Slogwang * @brief TCPѭ������, ֱ������OK��ʱ 176*a9643ea8Slogwang * [ע��] �����߲�Ҫ�����ĺ�������ֵ����֤��Ҫ��mt_tcpsendrcv�ȵ��ýӿڳ�ͻ [��Ҫ] 177*a9643ea8Slogwang */ 178*a9643ea8Slogwang static int mt_tcp_check_recv(int sock, char* rcv_buf, int &len, int flags, int timeout, MtFuncTcpMsgLen func) 179*a9643ea8Slogwang { 180*a9643ea8Slogwang int recv_len = 0; 181*a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 182*a9643ea8Slogwang do 183*a9643ea8Slogwang { 184*a9643ea8Slogwang utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 185*a9643ea8Slogwang if (cost_time > (utime64_t)timeout) 186*a9643ea8Slogwang { 187*a9643ea8Slogwang errno = ETIME; 188*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock); 189*a9643ea8Slogwang return -3; 190*a9643ea8Slogwang } 191*a9643ea8Slogwang 192*a9643ea8Slogwang int rc = MtFrame::recv(sock, (rcv_buf + recv_len), (len - recv_len), 0, (timeout - (int)cost_time)); 193*a9643ea8Slogwang if (rc < 0) 194*a9643ea8Slogwang { 195*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc); 196*a9643ea8Slogwang return -3; 197*a9643ea8Slogwang } 198*a9643ea8Slogwang else if (rc == 0) 199*a9643ea8Slogwang { 200*a9643ea8Slogwang len = recv_len; 201*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] remote close", sock); 202*a9643ea8Slogwang return -7; 203*a9643ea8Slogwang } 204*a9643ea8Slogwang recv_len += rc; 205*a9643ea8Slogwang 206*a9643ea8Slogwang /* ��鱨�������� */ 207*a9643ea8Slogwang rc = func(rcv_buf, recv_len); 208*a9643ea8Slogwang if (rc < 0) 209*a9643ea8Slogwang { 210*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc); 211*a9643ea8Slogwang return -5; 212*a9643ea8Slogwang } 213*a9643ea8Slogwang else if (rc == 0) // ����δ������ 214*a9643ea8Slogwang { 215*a9643ea8Slogwang if (len == recv_len) // û�ռ��ٽ�����, ���� 216*a9643ea8Slogwang { 217*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] user check pkg not ok, but no more buff", sock); 218*a9643ea8Slogwang return -6; 219*a9643ea8Slogwang } 220*a9643ea8Slogwang continue; 221*a9643ea8Slogwang } 222*a9643ea8Slogwang else // �ɹ����㱨�ij��� 223*a9643ea8Slogwang { 224*a9643ea8Slogwang if (rc > recv_len) // ���Ļ�δ��ȫ 225*a9643ea8Slogwang { 226*a9643ea8Slogwang continue; 227*a9643ea8Slogwang } 228*a9643ea8Slogwang else 229*a9643ea8Slogwang { 230*a9643ea8Slogwang len = rc; 231*a9643ea8Slogwang break; 232*a9643ea8Slogwang } 233*a9643ea8Slogwang } 234*a9643ea8Slogwang } while (true); 235*a9643ea8Slogwang 236*a9643ea8Slogwang return 0; 237*a9643ea8Slogwang } 238*a9643ea8Slogwang 239*a9643ea8Slogwang /** 240*a9643ea8Slogwang * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10���� 241*a9643ea8Slogwang * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 242*a9643ea8Slogwang * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 243*a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 244*a9643ea8Slogwang * @param pkg -�������װ�İ��� 245*a9643ea8Slogwang * @param len -�������װ�İ��峤�� 246*a9643ea8Slogwang * @param rcv_buf -����Ӧ�����buff 247*a9643ea8Slogwang * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, ��ΪӦ������� 248*a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 249*a9643ea8Slogwang * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 250*a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 251*a9643ea8Slogwang * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر����ӣ�-10 ������Ч 252*a9643ea8Slogwang */ 253*a9643ea8Slogwang int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func) 254*a9643ea8Slogwang { 255*a9643ea8Slogwang if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1) 256*a9643ea8Slogwang { 257*a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]", 258*a9643ea8Slogwang dst, pkg, rcv_buf, func, len, buf_size); 259*a9643ea8Slogwang return -10; 260*a9643ea8Slogwang } 261*a9643ea8Slogwang 262*a9643ea8Slogwang int ret = 0, rc = 0; 263*a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 264*a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 265*a9643ea8Slogwang utime64_t cost_time = 0; 266*a9643ea8Slogwang int time_left = timeout; 267*a9643ea8Slogwang 268*a9643ea8Slogwang // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ���� 269*a9643ea8Slogwang int sock = -1; 270*a9643ea8Slogwang TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 271*a9643ea8Slogwang if ((conn == NULL) || (sock < 0)) 272*a9643ea8Slogwang { 273*a9643ea8Slogwang MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 274*a9643ea8Slogwang ret = -1; 275*a9643ea8Slogwang goto EXIT_LABEL; 276*a9643ea8Slogwang } 277*a9643ea8Slogwang 278*a9643ea8Slogwang // 2. ���Լ����½����� 279*a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 280*a9643ea8Slogwang if (rc < 0) 281*a9643ea8Slogwang { 282*a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 283*a9643ea8Slogwang ret = -4; 284*a9643ea8Slogwang goto EXIT_LABEL; 285*a9643ea8Slogwang } 286*a9643ea8Slogwang 287*a9643ea8Slogwang // 3. �������ݴ��� 288*a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 289*a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 290*a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 291*a9643ea8Slogwang if (rc < 0) 292*a9643ea8Slogwang { 293*a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 294*a9643ea8Slogwang ret = -2; 295*a9643ea8Slogwang goto EXIT_LABEL; 296*a9643ea8Slogwang } 297*a9643ea8Slogwang 298*a9643ea8Slogwang // 4. �������ݴ��� 299*a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 300*a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 301*a9643ea8Slogwang rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func); 302*a9643ea8Slogwang if (rc < 0) 303*a9643ea8Slogwang { 304*a9643ea8Slogwang MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 305*a9643ea8Slogwang ret = rc; 306*a9643ea8Slogwang goto EXIT_LABEL; 307*a9643ea8Slogwang } 308*a9643ea8Slogwang 309*a9643ea8Slogwang ret = 0; 310*a9643ea8Slogwang 311*a9643ea8Slogwang EXIT_LABEL: 312*a9643ea8Slogwang 313*a9643ea8Slogwang // ʧ����ǿ���ͷ�����, ����ʱ���� 314*a9643ea8Slogwang if (conn != NULL) 315*a9643ea8Slogwang { 316*a9643ea8Slogwang ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 317*a9643ea8Slogwang } 318*a9643ea8Slogwang 319*a9643ea8Slogwang return ret; 320*a9643ea8Slogwang } 321*a9643ea8Slogwang 322*a9643ea8Slogwang /** 323*a9643ea8Slogwang * @brief TCP�������շ����� 324*a9643ea8Slogwang * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 325*a9643ea8Slogwang * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 326*a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 327*a9643ea8Slogwang * @param pkg -�������װ�İ��� 328*a9643ea8Slogwang * @param len -�������װ�İ��峤�� 329*a9643ea8Slogwang * @param rcv_buf -����Ӧ�����buff 330*a9643ea8Slogwang * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, ��ΪӦ������� 331*a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 332*a9643ea8Slogwang * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 333*a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 334*a9643ea8Slogwang * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر����ӣ�-10 ������Ч 335*a9643ea8Slogwang */ 336*a9643ea8Slogwang int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int& buf_size, int timeout, MtFuncTcpMsgLen func) 337*a9643ea8Slogwang { 338*a9643ea8Slogwang int ret = 0, rc = 0; 339*a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 340*a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 341*a9643ea8Slogwang utime64_t cost_time = 0; 342*a9643ea8Slogwang int time_left = timeout; 343*a9643ea8Slogwang 344*a9643ea8Slogwang // 1. ������� 345*a9643ea8Slogwang if (!dst || !pkg || !rcv_buf || !func || len<1 || buf_size<1) 346*a9643ea8Slogwang { 347*a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%d]", 348*a9643ea8Slogwang dst, pkg, rcv_buf, func, len, buf_size); 349*a9643ea8Slogwang return -10; 350*a9643ea8Slogwang } 351*a9643ea8Slogwang 352*a9643ea8Slogwang // 2. ����TCP socket 353*a9643ea8Slogwang int sock; 354*a9643ea8Slogwang sock = mt_tcp_create_sock(); 355*a9643ea8Slogwang if (sock < 0) 356*a9643ea8Slogwang { 357*a9643ea8Slogwang MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 358*a9643ea8Slogwang return -1; 359*a9643ea8Slogwang } 360*a9643ea8Slogwang 361*a9643ea8Slogwang // 3. ���Լ����½����� 362*a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 363*a9643ea8Slogwang if (rc < 0) 364*a9643ea8Slogwang { 365*a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 366*a9643ea8Slogwang ret = -4; 367*a9643ea8Slogwang goto EXIT_LABEL; 368*a9643ea8Slogwang } 369*a9643ea8Slogwang 370*a9643ea8Slogwang // 4. �������ݴ��� 371*a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 372*a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 373*a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 374*a9643ea8Slogwang if (rc < 0) 375*a9643ea8Slogwang { 376*a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 377*a9643ea8Slogwang ret = -2; 378*a9643ea8Slogwang goto EXIT_LABEL; 379*a9643ea8Slogwang } 380*a9643ea8Slogwang 381*a9643ea8Slogwang // 5. �������ݴ��� 382*a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 383*a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 384*a9643ea8Slogwang rc = mt_tcp_check_recv(sock, (char*)rcv_buf, buf_size, 0, time_left, func); 385*a9643ea8Slogwang if (rc < 0) 386*a9643ea8Slogwang { 387*a9643ea8Slogwang MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 388*a9643ea8Slogwang ret = rc; 389*a9643ea8Slogwang goto EXIT_LABEL; 390*a9643ea8Slogwang } 391*a9643ea8Slogwang 392*a9643ea8Slogwang ret = 0; 393*a9643ea8Slogwang 394*a9643ea8Slogwang EXIT_LABEL: 395*a9643ea8Slogwang if (sock >= 0) 396*a9643ea8Slogwang ::close(sock); 397*a9643ea8Slogwang 398*a9643ea8Slogwang return ret; 399*a9643ea8Slogwang } 400*a9643ea8Slogwang 401*a9643ea8Slogwang 402*a9643ea8Slogwang /** 403*a9643ea8Slogwang * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10���� 404*a9643ea8Slogwang * [ע��] tcp����buff, ��������static����, ����������Ĵ��� [��Ҫ] 405*a9643ea8Slogwang * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 406*a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 407*a9643ea8Slogwang * @param pkg -�������װ�İ��� 408*a9643ea8Slogwang * @param len -�������װ�İ��峤�� 409*a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 410*a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -4 ����ʧ��, -10 ������Ч 411*a9643ea8Slogwang */ 412*a9643ea8Slogwang int mt_tcpsend(struct sockaddr_in* dst, void* pkg, int len, int timeout) 413*a9643ea8Slogwang { 414*a9643ea8Slogwang if (!dst || !pkg || len<1) 415*a9643ea8Slogwang { 416*a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len); 417*a9643ea8Slogwang return -10; 418*a9643ea8Slogwang } 419*a9643ea8Slogwang 420*a9643ea8Slogwang int ret = 0, rc = 0; 421*a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 422*a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 423*a9643ea8Slogwang utime64_t cost_time = 0; 424*a9643ea8Slogwang int time_left = timeout; 425*a9643ea8Slogwang 426*a9643ea8Slogwang // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ���� 427*a9643ea8Slogwang int sock = -1; 428*a9643ea8Slogwang TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 429*a9643ea8Slogwang if ((conn == NULL) || (sock < 0)) 430*a9643ea8Slogwang { 431*a9643ea8Slogwang MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 432*a9643ea8Slogwang ret = -1; 433*a9643ea8Slogwang goto EXIT_LABEL; 434*a9643ea8Slogwang } 435*a9643ea8Slogwang 436*a9643ea8Slogwang // 2. ���Լ����½����� 437*a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 438*a9643ea8Slogwang if (rc < 0) 439*a9643ea8Slogwang { 440*a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 441*a9643ea8Slogwang ret = -4; 442*a9643ea8Slogwang goto EXIT_LABEL; 443*a9643ea8Slogwang } 444*a9643ea8Slogwang 445*a9643ea8Slogwang // 3. �������ݴ��� 446*a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 447*a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 448*a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 449*a9643ea8Slogwang if (rc < 0) 450*a9643ea8Slogwang { 451*a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 452*a9643ea8Slogwang ret = -2; 453*a9643ea8Slogwang goto EXIT_LABEL; 454*a9643ea8Slogwang } 455*a9643ea8Slogwang 456*a9643ea8Slogwang ret = 0; 457*a9643ea8Slogwang 458*a9643ea8Slogwang EXIT_LABEL: 459*a9643ea8Slogwang 460*a9643ea8Slogwang // ʧ����ǿ���ͷ�����, ����ʱ���� 461*a9643ea8Slogwang if (conn != NULL) 462*a9643ea8Slogwang { 463*a9643ea8Slogwang ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 464*a9643ea8Slogwang } 465*a9643ea8Slogwang 466*a9643ea8Slogwang return ret; 467*a9643ea8Slogwang } 468*a9643ea8Slogwang 469*a9643ea8Slogwang /** 470*a9643ea8Slogwang * @brief TCP������ֻ�����սӿ� 471*a9643ea8Slogwang * [ע��] tcp����buff, ��������static����, ����������Ĵ��� [��Ҫ] 472*a9643ea8Slogwang * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 473*a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 474*a9643ea8Slogwang * @param pkg -�������װ�İ��� 475*a9643ea8Slogwang * @param len -�������װ�İ��峤�� 476*a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 477*a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -4 ����ʧ��, -10 ������Ч 478*a9643ea8Slogwang */ 479*a9643ea8Slogwang int mt_tcpsend_short(struct sockaddr_in* dst, void* pkg, int len, int timeout) 480*a9643ea8Slogwang { 481*a9643ea8Slogwang // 1. ��μ�� 482*a9643ea8Slogwang if (!dst || !pkg || len<1) 483*a9643ea8Slogwang { 484*a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d]", dst, pkg, len); 485*a9643ea8Slogwang return -10; 486*a9643ea8Slogwang } 487*a9643ea8Slogwang 488*a9643ea8Slogwang int ret = 0, rc = 0; 489*a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 490*a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 491*a9643ea8Slogwang utime64_t cost_time = 0; 492*a9643ea8Slogwang int time_left = timeout; 493*a9643ea8Slogwang 494*a9643ea8Slogwang // 2. ����TCP socket 495*a9643ea8Slogwang int sock = -1; 496*a9643ea8Slogwang sock = mt_tcp_create_sock(); 497*a9643ea8Slogwang if (sock < 0) 498*a9643ea8Slogwang { 499*a9643ea8Slogwang MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 500*a9643ea8Slogwang ret = -1; 501*a9643ea8Slogwang goto EXIT_LABEL; 502*a9643ea8Slogwang } 503*a9643ea8Slogwang 504*a9643ea8Slogwang // 2. ���Լ����½����� 505*a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 506*a9643ea8Slogwang if (rc < 0) 507*a9643ea8Slogwang { 508*a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 509*a9643ea8Slogwang ret = -4; 510*a9643ea8Slogwang goto EXIT_LABEL; 511*a9643ea8Slogwang } 512*a9643ea8Slogwang 513*a9643ea8Slogwang // 3. �������ݴ��� 514*a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 515*a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 516*a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 517*a9643ea8Slogwang if (rc < 0) 518*a9643ea8Slogwang { 519*a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 520*a9643ea8Slogwang ret = -2; 521*a9643ea8Slogwang goto EXIT_LABEL; 522*a9643ea8Slogwang } 523*a9643ea8Slogwang 524*a9643ea8Slogwang ret = 0; 525*a9643ea8Slogwang 526*a9643ea8Slogwang EXIT_LABEL: 527*a9643ea8Slogwang 528*a9643ea8Slogwang if (sock >= 0) 529*a9643ea8Slogwang ::close(sock); 530*a9643ea8Slogwang 531*a9643ea8Slogwang return ret; 532*a9643ea8Slogwang } 533*a9643ea8Slogwang 534*a9643ea8Slogwang 535*a9643ea8Slogwang /** 536*a9643ea8Slogwang * @brief TCP�շ��ӿڣ�����ѡ���˱������ӻ��߶����� 537*a9643ea8Slogwang * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 538*a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 539*a9643ea8Slogwang * @param pkg -�������װ�İ��� 540*a9643ea8Slogwang * @param len -�������װ�İ��峤�� 541*a9643ea8Slogwang * @param rcv_buf -����Ӧ�����buff��ֻ�����տ�������ΪNULL 542*a9643ea8Slogwang * @param buf_size -modify-����Ӧ�����buff��С, �ɹ�����ʱ, ��ΪӦ������ȣ�ֻ�����գ�����ΪNULL 543*a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 544*a9643ea8Slogwang * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 545*a9643ea8Slogwang * @param type - �������� 546*a9643ea8Slogwang * MT_TCP_SHORT: һ��һ�������ӣ� 547*a9643ea8Slogwang * MT_TCP_LONG : һ��һ�ճ����ӣ� 548*a9643ea8Slogwang * MT_TCP_LONG_SNDONLY : ֻ�����ճ����ӣ� 549*a9643ea8Slogwang * MT_TCP_SHORT_SNDONLY: ֻ�����ն����ӣ� 550*a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 551*a9643ea8Slogwang * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч 552*a9643ea8Slogwang */ 553*a9643ea8Slogwang int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void* rcv_buf, int* buf_size, int timeout, MtFuncTcpMsgLen func, MT_TCP_CONN_TYPE type) 554*a9643ea8Slogwang { 555*a9643ea8Slogwang if(!dst || !pkg || len<1) 556*a9643ea8Slogwang { 557*a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]", 558*a9643ea8Slogwang dst, pkg, rcv_buf, func, len, buf_size,type); 559*a9643ea8Slogwang return -10; 560*a9643ea8Slogwang } 561*a9643ea8Slogwang 562*a9643ea8Slogwang switch (type) 563*a9643ea8Slogwang { 564*a9643ea8Slogwang // TCP�����ӵ������� 565*a9643ea8Slogwang case MT_TCP_LONG: 566*a9643ea8Slogwang { 567*a9643ea8Slogwang return mt_tcpsendrcv(dst, pkg, len, rcv_buf, *buf_size, timeout, func); 568*a9643ea8Slogwang } 569*a9643ea8Slogwang 570*a9643ea8Slogwang // TCP������ֻ������ 571*a9643ea8Slogwang case MT_TCP_LONG_SNDONLY: 572*a9643ea8Slogwang { 573*a9643ea8Slogwang return mt_tcpsend(dst, pkg, len, timeout); 574*a9643ea8Slogwang } 575*a9643ea8Slogwang 576*a9643ea8Slogwang // TCP�����ӵ������� 577*a9643ea8Slogwang case MT_TCP_SHORT: 578*a9643ea8Slogwang { 579*a9643ea8Slogwang return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, *buf_size, timeout, func); 580*a9643ea8Slogwang } 581*a9643ea8Slogwang 582*a9643ea8Slogwang // TCP������ֻ������ 583*a9643ea8Slogwang case MT_TCP_SHORT_SNDONLY: 584*a9643ea8Slogwang { 585*a9643ea8Slogwang return mt_tcpsend_short(dst, pkg, len, timeout); 586*a9643ea8Slogwang } 587*a9643ea8Slogwang 588*a9643ea8Slogwang default: 589*a9643ea8Slogwang { 590*a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], rcv_buf[%p], fun[%p], len[%d], buf_size[%p]type[%d]", 591*a9643ea8Slogwang dst, pkg, rcv_buf, func, len, buf_size,type); 592*a9643ea8Slogwang return -10; 593*a9643ea8Slogwang } 594*a9643ea8Slogwang } 595*a9643ea8Slogwang 596*a9643ea8Slogwang return 0; 597*a9643ea8Slogwang } 598*a9643ea8Slogwang 599*a9643ea8Slogwang 600*a9643ea8Slogwang 601*a9643ea8Slogwang /** 602*a9643ea8Slogwang * @brief ������ص��������� 603*a9643ea8Slogwang */ 604*a9643ea8Slogwang static void mt_task_process(void* arg) 605*a9643ea8Slogwang { 606*a9643ea8Slogwang int rc = 0; 607*a9643ea8Slogwang IMtTask* task = (IMtTask*)arg; 608*a9643ea8Slogwang if (!task) 609*a9643ea8Slogwang { 610*a9643ea8Slogwang MTLOG_ERROR("Invalid arg, error"); 611*a9643ea8Slogwang return; 612*a9643ea8Slogwang } 613*a9643ea8Slogwang 614*a9643ea8Slogwang rc = task->Process(); 615*a9643ea8Slogwang if (rc != 0) 616*a9643ea8Slogwang { 617*a9643ea8Slogwang MTLOG_DEBUG("task process failed(%d), log", rc); 618*a9643ea8Slogwang } 619*a9643ea8Slogwang 620*a9643ea8Slogwang task->SetResult(rc); 621*a9643ea8Slogwang 622*a9643ea8Slogwang return; 623*a9643ea8Slogwang }; 624*a9643ea8Slogwang 625*a9643ea8Slogwang /** 626*a9643ea8Slogwang * @brief ��·IO�Ĵ���, ��������̹߳��� 627*a9643ea8Slogwang * @param req_list - �����б� 628*a9643ea8Slogwang * @return 0 �ɹ�, <0ʧ�� 629*a9643ea8Slogwang */ 630*a9643ea8Slogwang int mt_exec_all_task(IMtTaskList& req_list) 631*a9643ea8Slogwang { 632*a9643ea8Slogwang MtFrame* mtframe = MtFrame::Instance(); 633*a9643ea8Slogwang MicroThread* thread = mtframe->GetActiveThread(); 634*a9643ea8Slogwang IMtTask* task = NULL; 635*a9643ea8Slogwang MicroThread* sub = NULL; 636*a9643ea8Slogwang MicroThread* tmp = NULL; 637*a9643ea8Slogwang int rc = -1; 638*a9643ea8Slogwang 639*a9643ea8Slogwang MicroThread::SubThreadList list; 640*a9643ea8Slogwang TAILQ_INIT(&list); 641*a9643ea8Slogwang 642*a9643ea8Slogwang // ��ֹû��task�������߳�һֱ����ס 643*a9643ea8Slogwang if (0 == req_list.size()) 644*a9643ea8Slogwang { 645*a9643ea8Slogwang MTLOG_DEBUG("no task for execult"); 646*a9643ea8Slogwang return 0; 647*a9643ea8Slogwang } 648*a9643ea8Slogwang 649*a9643ea8Slogwang // 1. �����̶߳��� 650*a9643ea8Slogwang for (IMtTaskList::iterator it = req_list.begin(); it != req_list.end(); ++it) 651*a9643ea8Slogwang { 652*a9643ea8Slogwang task = *it; 653*a9643ea8Slogwang sub = MtFrame::CreateThread(mt_task_process, task, false); 654*a9643ea8Slogwang if (NULL == sub) 655*a9643ea8Slogwang { 656*a9643ea8Slogwang MTLOG_ERROR("create sub thread failed"); 657*a9643ea8Slogwang goto EXIT_LABEL; 658*a9643ea8Slogwang } 659*a9643ea8Slogwang 660*a9643ea8Slogwang sub->SetType(MicroThread::SUB_THREAD); 661*a9643ea8Slogwang TAILQ_INSERT_TAIL(&list, sub, _sub_entry); 662*a9643ea8Slogwang } 663*a9643ea8Slogwang 664*a9643ea8Slogwang // 2. ����ִ������ 665*a9643ea8Slogwang TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp) 666*a9643ea8Slogwang { 667*a9643ea8Slogwang TAILQ_REMOVE(&list, sub, _sub_entry); 668*a9643ea8Slogwang thread->AddSubThread(sub); 669*a9643ea8Slogwang mtframe->InsertRunable(sub); 670*a9643ea8Slogwang } 671*a9643ea8Slogwang 672*a9643ea8Slogwang // 3. �ȴ����߳�ִ�н��� 673*a9643ea8Slogwang thread->Wait(); 674*a9643ea8Slogwang rc = 0; 675*a9643ea8Slogwang 676*a9643ea8Slogwang EXIT_LABEL: 677*a9643ea8Slogwang 678*a9643ea8Slogwang TAILQ_FOREACH_SAFE(sub, &list, _sub_entry, tmp) 679*a9643ea8Slogwang { 680*a9643ea8Slogwang TAILQ_REMOVE(&list, sub, _sub_entry); 681*a9643ea8Slogwang mtframe->FreeThread(sub); 682*a9643ea8Slogwang } 683*a9643ea8Slogwang 684*a9643ea8Slogwang return rc; 685*a9643ea8Slogwang 686*a9643ea8Slogwang } 687*a9643ea8Slogwang 688*a9643ea8Slogwang /** 689*a9643ea8Slogwang * @brief ���õ�ǰIMtMsg��˽�б��� 690*a9643ea8Slogwang * @info ֻ����ָ�룬�ڴ���Ҫҵ����� 691*a9643ea8Slogwang */ 692*a9643ea8Slogwang void mt_set_msg_private(void *data) 693*a9643ea8Slogwang { 694*a9643ea8Slogwang MicroThread *msg_thread = MtFrame::Instance()->GetRootThread(); 695*a9643ea8Slogwang if (msg_thread != NULL) 696*a9643ea8Slogwang msg_thread->SetPrivate(data); 697*a9643ea8Slogwang } 698*a9643ea8Slogwang 699*a9643ea8Slogwang /** 700*a9643ea8Slogwang * @brief ��ȡ��ǰIMtMsg��˽�б��� 701*a9643ea8Slogwang * @return ˽�б���ָ�� 702*a9643ea8Slogwang */ 703*a9643ea8Slogwang void* mt_get_msg_private() 704*a9643ea8Slogwang { 705*a9643ea8Slogwang MicroThread *msg_thread = MtFrame::Instance()->GetRootThread(); 706*a9643ea8Slogwang if (NULL == msg_thread) 707*a9643ea8Slogwang { 708*a9643ea8Slogwang return NULL; 709*a9643ea8Slogwang } 710*a9643ea8Slogwang 711*a9643ea8Slogwang return msg_thread->GetPrivate(); 712*a9643ea8Slogwang } 713*a9643ea8Slogwang 714*a9643ea8Slogwang /** 715*a9643ea8Slogwang * @brief �߳̿�ܳ�ʼ�� 716*a9643ea8Slogwang * @info ҵ��ʹ��spp�������̣߳���Ҫ���øó�ʼ������ 717*a9643ea8Slogwang * @return false:��ʼ��ʧ�� true:��ʼ���ɹ� 718*a9643ea8Slogwang */ 719*a9643ea8Slogwang bool mt_init_frame(const char *conf, int argc, char * const argv[]) 720*a9643ea8Slogwang { 721*a9643ea8Slogwang if (conf && argc) { 722*a9643ea8Slogwang ff_init(conf, argc, argv); 723*a9643ea8Slogwang ff_set_hook_flag(); 724*a9643ea8Slogwang } 725*a9643ea8Slogwang memset(&g_mt_syscall_tab, 0, sizeof(g_mt_syscall_tab)); 726*a9643ea8Slogwang return MtFrame::Instance()->InitFrame(); 727*a9643ea8Slogwang } 728*a9643ea8Slogwang 729*a9643ea8Slogwang /** 730*a9643ea8Slogwang * @brief �����̶߳���ջ�ռ��С 731*a9643ea8Slogwang * @info �DZ������ã�Ĭ�ϴ�СΪ128K 732*a9643ea8Slogwang */ 733*a9643ea8Slogwang void mt_set_stack_size(unsigned int bytes) 734*a9643ea8Slogwang { 735*a9643ea8Slogwang ThreadPool::SetDefaultStackSize(bytes); 736*a9643ea8Slogwang } 737*a9643ea8Slogwang 738*a9643ea8Slogwang /** 739*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� recvfrom 740*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 741*a9643ea8Slogwang * @param buf ������Ϣ������ָ�� 742*a9643ea8Slogwang * @param len ������Ϣ���������� 743*a9643ea8Slogwang * @param from ��Դ��ַ��ָ�� 744*a9643ea8Slogwang * @param fromlen ��Դ��ַ�Ľṹ���� 745*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 746*a9643ea8Slogwang * @return >0 �ɹ����ճ���, <0 ʧ�� 747*a9643ea8Slogwang */ 748*a9643ea8Slogwang int mt_recvfrom(int fd, void *buf, int len, int flags, struct sockaddr *from, socklen_t *fromlen, int timeout) 749*a9643ea8Slogwang { 750*a9643ea8Slogwang return MtFrame::recvfrom(fd, buf, len, flags, from, fromlen, timeout); 751*a9643ea8Slogwang } 752*a9643ea8Slogwang 753*a9643ea8Slogwang /** 754*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� sendto 755*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 756*a9643ea8Slogwang * @param msg �����͵���Ϣָ�� 757*a9643ea8Slogwang * @param len �����͵���Ϣ���� 758*a9643ea8Slogwang * @param to Ŀ�ĵ�ַ��ָ�� 759*a9643ea8Slogwang * @param tolen Ŀ�ĵ�ַ�Ľṹ���� 760*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 761*a9643ea8Slogwang * @return >0 �ɹ����ͳ���, <0 ʧ�� 762*a9643ea8Slogwang */ 763*a9643ea8Slogwang int mt_sendto(int fd, const void *msg, int len, int flags, const struct sockaddr *to, int tolen, int timeout) 764*a9643ea8Slogwang { 765*a9643ea8Slogwang return MtFrame::sendto(fd, msg, len, flags, to, tolen, timeout); 766*a9643ea8Slogwang } 767*a9643ea8Slogwang 768*a9643ea8Slogwang /** 769*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� connect 770*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 771*a9643ea8Slogwang * @param addr ָ��server��Ŀ�ĵ�ַ 772*a9643ea8Slogwang * @param addrlen ��ַ�ij��� 773*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 774*a9643ea8Slogwang * @return >0 �ɹ����ͳ���, <0 ʧ�� 775*a9643ea8Slogwang */ 776*a9643ea8Slogwang int mt_connect(int fd, const struct sockaddr *addr, int addrlen, int timeout) 777*a9643ea8Slogwang { 778*a9643ea8Slogwang return MtFrame::connect(fd, addr, addrlen, timeout); 779*a9643ea8Slogwang } 780*a9643ea8Slogwang 781*a9643ea8Slogwang /** 782*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� accept 783*a9643ea8Slogwang * @param fd �������� 784*a9643ea8Slogwang * @param addr �ͻ��˵�ַ 785*a9643ea8Slogwang * @param addrlen ��ַ�ij��� 786*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 787*a9643ea8Slogwang * @return >=0 accept��socket������, <0 ʧ�� 788*a9643ea8Slogwang */ 789*a9643ea8Slogwang int mt_accept(int fd, struct sockaddr *addr, socklen_t *addrlen, int timeout) 790*a9643ea8Slogwang { 791*a9643ea8Slogwang return MtFrame::accept(fd, addr, addrlen, timeout); 792*a9643ea8Slogwang } 793*a9643ea8Slogwang 794*a9643ea8Slogwang 795*a9643ea8Slogwang /** 796*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� read 797*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 798*a9643ea8Slogwang * @param buf ������Ϣ������ָ�� 799*a9643ea8Slogwang * @param nbyte ������Ϣ���������� 800*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 801*a9643ea8Slogwang * @return >0 �ɹ����ճ���, <0 ʧ�� 802*a9643ea8Slogwang */ 803*a9643ea8Slogwang ssize_t mt_read(int fd, void *buf, size_t nbyte, int timeout) 804*a9643ea8Slogwang { 805*a9643ea8Slogwang return MtFrame::read(fd, buf, nbyte, timeout); 806*a9643ea8Slogwang } 807*a9643ea8Slogwang 808*a9643ea8Slogwang /** 809*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� write 810*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 811*a9643ea8Slogwang * @param buf �����͵���Ϣָ�� 812*a9643ea8Slogwang * @param nbyte �����͵���Ϣ���� 813*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 814*a9643ea8Slogwang * @return >0 �ɹ����ͳ���, <0 ʧ�� 815*a9643ea8Slogwang */ 816*a9643ea8Slogwang ssize_t mt_write(int fd, const void *buf, size_t nbyte, int timeout) 817*a9643ea8Slogwang { 818*a9643ea8Slogwang return MtFrame::write(fd, buf, nbyte, timeout); 819*a9643ea8Slogwang } 820*a9643ea8Slogwang 821*a9643ea8Slogwang /** 822*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� recv 823*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 824*a9643ea8Slogwang * @param buf ������Ϣ������ָ�� 825*a9643ea8Slogwang * @param len ������Ϣ���������� 826*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 827*a9643ea8Slogwang * @return >0 �ɹ����ճ���, <0 ʧ�� 828*a9643ea8Slogwang */ 829*a9643ea8Slogwang ssize_t mt_recv(int fd, void *buf, int len, int flags, int timeout) 830*a9643ea8Slogwang { 831*a9643ea8Slogwang return MtFrame::recv(fd, buf, len, flags, timeout); 832*a9643ea8Slogwang } 833*a9643ea8Slogwang 834*a9643ea8Slogwang /** 835*a9643ea8Slogwang * @brief �̰߳�����ϵͳIO���� send 836*a9643ea8Slogwang * @param fd ϵͳsocket��Ϣ 837*a9643ea8Slogwang * @param buf �����͵���Ϣָ�� 838*a9643ea8Slogwang * @param nbyte �����͵���Ϣ���� 839*a9643ea8Slogwang * @param timeout ��ȴ�ʱ��, ���� 840*a9643ea8Slogwang * @return >0 �ɹ����ͳ���, <0 ʧ�� 841*a9643ea8Slogwang */ 842*a9643ea8Slogwang ssize_t mt_send(int fd, const void *buf, size_t nbyte, int flags, int timeout) 843*a9643ea8Slogwang { 844*a9643ea8Slogwang return MtFrame::send(fd, buf, nbyte, flags, timeout); 845*a9643ea8Slogwang } 846*a9643ea8Slogwang 847*a9643ea8Slogwang /** 848*a9643ea8Slogwang * @brief �߳�����sleep�ӿ�, ��λms 849*a9643ea8Slogwang */ 850*a9643ea8Slogwang void mt_sleep(int ms) 851*a9643ea8Slogwang { 852*a9643ea8Slogwang MtFrame::sleep(ms); 853*a9643ea8Slogwang } 854*a9643ea8Slogwang 855*a9643ea8Slogwang /** 856*a9643ea8Slogwang * @brief �̻߳�ȡϵͳʱ�䣬��λms 857*a9643ea8Slogwang */ 858*a9643ea8Slogwang unsigned long long mt_time_ms(void) 859*a9643ea8Slogwang { 860*a9643ea8Slogwang return MtFrame::Instance()->GetLastClock(); 861*a9643ea8Slogwang } 862*a9643ea8Slogwang 863*a9643ea8Slogwang /** 864*a9643ea8Slogwang * @brief �̵߳ȴ�epoll�¼��İ������� 865*a9643ea8Slogwang */ 866*a9643ea8Slogwang int mt_wait_events(int fd, int events, int timeout) 867*a9643ea8Slogwang { 868*a9643ea8Slogwang return MtFrame::Instance()->WaitEvents(fd, events, timeout); 869*a9643ea8Slogwang } 870*a9643ea8Slogwang 871*a9643ea8Slogwang void* mt_start_thread(void* entry, void* args) 872*a9643ea8Slogwang { 873*a9643ea8Slogwang return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true); 874*a9643ea8Slogwang } 875*a9643ea8Slogwang 876*a9643ea8Slogwang #define BUF_ALIGNMENT_SIZE 4096 877*a9643ea8Slogwang #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1)) 878*a9643ea8Slogwang #define BUF_DEFAULT_SIZE 4096 879*a9643ea8Slogwang 880*a9643ea8Slogwang class ScopedBuf 881*a9643ea8Slogwang { 882*a9643ea8Slogwang public: 883*a9643ea8Slogwang ScopedBuf(void*& buf_keeper, bool keep) 884*a9643ea8Slogwang :buf_keeper_(buf_keeper),buf_(0),len_(0),len_watermark_(0),keep_(keep) 885*a9643ea8Slogwang {} 886*a9643ea8Slogwang 887*a9643ea8Slogwang int Alloc(int len) 888*a9643ea8Slogwang { 889*a9643ea8Slogwang if(len<len_) 890*a9643ea8Slogwang { 891*a9643ea8Slogwang return -1; // �ռ��СԽ�� 892*a9643ea8Slogwang } 893*a9643ea8Slogwang 894*a9643ea8Slogwang if(len==0) 895*a9643ea8Slogwang { 896*a9643ea8Slogwang len = BUF_ALIGNMENT_SIZE; 897*a9643ea8Slogwang } 898*a9643ea8Slogwang if(len_==len) 899*a9643ea8Slogwang { 900*a9643ea8Slogwang return 0; 901*a9643ea8Slogwang } 902*a9643ea8Slogwang 903*a9643ea8Slogwang len_ = BUF_ALIGN_SIZE(len); 904*a9643ea8Slogwang if(len_==0) 905*a9643ea8Slogwang { 906*a9643ea8Slogwang len_ = BUF_DEFAULT_SIZE; 907*a9643ea8Slogwang } 908*a9643ea8Slogwang len_watermark_ = len_-BUF_ALIGNMENT_SIZE; 909*a9643ea8Slogwang char* tmp = (char*)realloc(buf_, len_); 910*a9643ea8Slogwang if(tmp==NULL) 911*a9643ea8Slogwang { 912*a9643ea8Slogwang return -2; // �����ڴ�ʧ�� 913*a9643ea8Slogwang } 914*a9643ea8Slogwang 915*a9643ea8Slogwang buf_ = tmp; 916*a9643ea8Slogwang return 0; 917*a9643ea8Slogwang } 918*a9643ea8Slogwang 919*a9643ea8Slogwang void reset() 920*a9643ea8Slogwang { 921*a9643ea8Slogwang if(keep_) 922*a9643ea8Slogwang { 923*a9643ea8Slogwang buf_keeper_ = (void*)buf_; 924*a9643ea8Slogwang buf_ = NULL; 925*a9643ea8Slogwang } 926*a9643ea8Slogwang } 927*a9643ea8Slogwang 928*a9643ea8Slogwang ~ScopedBuf() 929*a9643ea8Slogwang { 930*a9643ea8Slogwang if(buf_!=NULL) 931*a9643ea8Slogwang { 932*a9643ea8Slogwang free(buf_); 933*a9643ea8Slogwang buf_ = NULL; 934*a9643ea8Slogwang } 935*a9643ea8Slogwang } 936*a9643ea8Slogwang 937*a9643ea8Slogwang public: 938*a9643ea8Slogwang void* &buf_keeper_; 939*a9643ea8Slogwang char* buf_; 940*a9643ea8Slogwang int len_; 941*a9643ea8Slogwang int len_watermark_; 942*a9643ea8Slogwang bool keep_; 943*a9643ea8Slogwang 944*a9643ea8Slogwang }; 945*a9643ea8Slogwang 946*a9643ea8Slogwang /** 947*a9643ea8Slogwang * @brief TCPѭ������, ֱ������OK��ʱ 948*a9643ea8Slogwang * [ע��] �����߲�Ҫ�����ĺ�������ֵ����֤��Ҫ��mt_tcpsendrcv�ȵ��ýӿڳ�ͻ [��Ҫ] 949*a9643ea8Slogwang */ 950*a9643ea8Slogwang static int mt_tcp_check_recv(int sock, void*& rcv_buf, int &len, int flags, 951*a9643ea8Slogwang int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 952*a9643ea8Slogwang { 953*a9643ea8Slogwang int recv_len = 0; 954*a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 955*a9643ea8Slogwang 956*a9643ea8Slogwang int rc = 0; 957*a9643ea8Slogwang int ret = 0; 958*a9643ea8Slogwang int pkg_len = 0; 959*a9643ea8Slogwang bool msg_len_detected = false; 960*a9643ea8Slogwang 961*a9643ea8Slogwang ScopedBuf sbuf(rcv_buf, keep_rcv_buf); 962*a9643ea8Slogwang ret = sbuf.Alloc(len); 963*a9643ea8Slogwang 964*a9643ea8Slogwang if(ret!=0) 965*a9643ea8Slogwang { 966*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 967*a9643ea8Slogwang return -11; 968*a9643ea8Slogwang } 969*a9643ea8Slogwang 970*a9643ea8Slogwang do 971*a9643ea8Slogwang { 972*a9643ea8Slogwang utime64_t cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 973*a9643ea8Slogwang if (cost_time > (utime64_t)timeout) 974*a9643ea8Slogwang { 975*a9643ea8Slogwang errno = ETIME; 976*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv not ok, timeout", sock); 977*a9643ea8Slogwang return -3; 978*a9643ea8Slogwang } 979*a9643ea8Slogwang 980*a9643ea8Slogwang rc = MtFrame::recv(sock, (sbuf.buf_ + recv_len), (sbuf.len_ - recv_len), 0, (timeout - (int)cost_time)); 981*a9643ea8Slogwang if (rc < 0) 982*a9643ea8Slogwang { 983*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d][%m]", sock, rc); 984*a9643ea8Slogwang return -3; 985*a9643ea8Slogwang } 986*a9643ea8Slogwang else if (rc == 0) // Զ�˹ر� 987*a9643ea8Slogwang { 988*a9643ea8Slogwang 989*a9643ea8Slogwang if(recv_len==0) // δ�ذ���ֱ�ӷ���Զ�˹ر� 990*a9643ea8Slogwang { 991*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] remote close", sock); 992*a9643ea8Slogwang return -7; 993*a9643ea8Slogwang } 994*a9643ea8Slogwang 995*a9643ea8Slogwang /* ��鱨�������� */ 996*a9643ea8Slogwang rc = check_func(sbuf.buf_, recv_len, true, msg_ctx, msg_len_detected); 997*a9643ea8Slogwang 998*a9643ea8Slogwang if(rc!=recv_len) // ҵ�����Զ�˹رգ�Ӧ�÷������������ȣ�����<=0,��ʾ���������� 999*a9643ea8Slogwang { 1000*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] remote close", sock); 1001*a9643ea8Slogwang return -7; 1002*a9643ea8Slogwang } 1003*a9643ea8Slogwang len = recv_len; 1004*a9643ea8Slogwang break; 1005*a9643ea8Slogwang } 1006*a9643ea8Slogwang recv_len += rc; 1007*a9643ea8Slogwang 1008*a9643ea8Slogwang /* ��鱨�������� */ 1009*a9643ea8Slogwang if((!msg_len_detected)||recv_len==pkg_len) 1010*a9643ea8Slogwang { 1011*a9643ea8Slogwang rc = check_func(sbuf.buf_, recv_len, false, msg_ctx,msg_len_detected); 1012*a9643ea8Slogwang if(msg_len_detected) 1013*a9643ea8Slogwang { 1014*a9643ea8Slogwang pkg_len = rc; 1015*a9643ea8Slogwang } 1016*a9643ea8Slogwang } 1017*a9643ea8Slogwang else 1018*a9643ea8Slogwang { 1019*a9643ea8Slogwang rc = pkg_len; 1020*a9643ea8Slogwang } 1021*a9643ea8Slogwang 1022*a9643ea8Slogwang if (rc < 0) 1023*a9643ea8Slogwang { 1024*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] user check pkg error[%d]", sock, rc); 1025*a9643ea8Slogwang return -5; 1026*a9643ea8Slogwang } 1027*a9643ea8Slogwang else if (rc == 0) // ����δ������,�Ҳ�ȷ����С 1028*a9643ea8Slogwang { 1029*a9643ea8Slogwang if(sbuf.len_ > recv_len) 1030*a9643ea8Slogwang { 1031*a9643ea8Slogwang continue; 1032*a9643ea8Slogwang } 1033*a9643ea8Slogwang // û�ռ��ٽ�����, 2����С��չbuf 1034*a9643ea8Slogwang ret = sbuf.Alloc(sbuf.len_<<1); 1035*a9643ea8Slogwang 1036*a9643ea8Slogwang if(ret!=0) 1037*a9643ea8Slogwang { 1038*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 1039*a9643ea8Slogwang return -11; 1040*a9643ea8Slogwang } 1041*a9643ea8Slogwang } 1042*a9643ea8Slogwang else // �ɹ����㱨�ij��� 1043*a9643ea8Slogwang { 1044*a9643ea8Slogwang if (rc > recv_len) // ���Ļ�δ��ȫ 1045*a9643ea8Slogwang { 1046*a9643ea8Slogwang if(sbuf.len_ > recv_len) // recv buf���пռ�.��δ����ˮλ 1047*a9643ea8Slogwang { 1048*a9643ea8Slogwang continue; 1049*a9643ea8Slogwang } 1050*a9643ea8Slogwang 1051*a9643ea8Slogwang // û�ռ��ٽ�����, ����ҵ��ָʾ��С��չ�ڴ� 1052*a9643ea8Slogwang ret = sbuf.Alloc(rc); 1053*a9643ea8Slogwang 1054*a9643ea8Slogwang if(ret!=0) 1055*a9643ea8Slogwang { 1056*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] recv failed ret[%d], alloc rcv buf failed, [%m]", sock, ret); 1057*a9643ea8Slogwang return -11; 1058*a9643ea8Slogwang } 1059*a9643ea8Slogwang } 1060*a9643ea8Slogwang else if(rc==recv_len) // �հ����� 1061*a9643ea8Slogwang { 1062*a9643ea8Slogwang len = rc; 1063*a9643ea8Slogwang break; 1064*a9643ea8Slogwang } 1065*a9643ea8Slogwang else // �߳����ģʽ�£�������ճ�� 1066*a9643ea8Slogwang { 1067*a9643ea8Slogwang MTLOG_ERROR("tcp socket[%d] user check pkg error, pkg len < recv_len", sock); 1068*a9643ea8Slogwang return -5; 1069*a9643ea8Slogwang } 1070*a9643ea8Slogwang } 1071*a9643ea8Slogwang } while (true); 1072*a9643ea8Slogwang 1073*a9643ea8Slogwang sbuf.reset(); 1074*a9643ea8Slogwang 1075*a9643ea8Slogwang return 0; 1076*a9643ea8Slogwang } 1077*a9643ea8Slogwang 1078*a9643ea8Slogwang 1079*a9643ea8Slogwang 1080*a9643ea8Slogwang /** 1081*a9643ea8Slogwang * @brief TCP��������ӳصķ�ʽ����IP/PORT����, ���ӱ���Ĭ��10���� 1082*a9643ea8Slogwang * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 1083*a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 1084*a9643ea8Slogwang * @param pkg -�������װ�İ��� 1085*a9643ea8Slogwang * @param len -�������װ�İ��峤�� 1086*a9643ea8Slogwang * @param rcv_buf -���������������ο����� keep_rcv_buf�� 1087*a9643ea8Slogwang * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ� 1088*a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 1089*a9643ea8Slogwang * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 1090*a9643ea8Slogwang * @param msg_ctx -�������ĵ������ı����� 1091*a9643ea8Slogwang * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿ 1092*a9643ea8Slogwang * ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿 1093*a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 1094*a9643ea8Slogwang * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч 1095*a9643ea8Slogwang */ 1096*a9643ea8Slogwang int mt_tcpsendrcv(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size, 1097*a9643ea8Slogwang int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 1098*a9643ea8Slogwang { 1099*a9643ea8Slogwang if(!dst || !pkg || len<1) 1100*a9643ea8Slogwang { 1101*a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]", 1102*a9643ea8Slogwang dst, pkg, len, check_func); 1103*a9643ea8Slogwang return -10; 1104*a9643ea8Slogwang } 1105*a9643ea8Slogwang 1106*a9643ea8Slogwang 1107*a9643ea8Slogwang int ret = 0, rc = 0; 1108*a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 1109*a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 1110*a9643ea8Slogwang utime64_t cost_time = 0; 1111*a9643ea8Slogwang int time_left = timeout; 1112*a9643ea8Slogwang 1113*a9643ea8Slogwang // 1. ��ȡTCP���ӳض���, �ҽ�֪ͨ���� 1114*a9643ea8Slogwang int sock = -1; 1115*a9643ea8Slogwang TcpKeepConn* conn = mt_tcp_get_keep_conn(dst, sock); 1116*a9643ea8Slogwang if ((conn == NULL) || (sock < 0)) 1117*a9643ea8Slogwang { 1118*a9643ea8Slogwang MTLOG_ERROR("socket[%d] get conn failed, ret[%m]", sock); 1119*a9643ea8Slogwang ret = -1; 1120*a9643ea8Slogwang goto EXIT_LABEL; 1121*a9643ea8Slogwang } 1122*a9643ea8Slogwang 1123*a9643ea8Slogwang // 2. ���Լ����½����� 1124*a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 1125*a9643ea8Slogwang if (rc < 0) 1126*a9643ea8Slogwang { 1127*a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 1128*a9643ea8Slogwang ret = -4; 1129*a9643ea8Slogwang goto EXIT_LABEL; 1130*a9643ea8Slogwang } 1131*a9643ea8Slogwang 1132*a9643ea8Slogwang // 3. �������ݴ��� 1133*a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 1134*a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 1135*a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 1136*a9643ea8Slogwang if (rc < 0) 1137*a9643ea8Slogwang { 1138*a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 1139*a9643ea8Slogwang ret = -2; 1140*a9643ea8Slogwang goto EXIT_LABEL; 1141*a9643ea8Slogwang } 1142*a9643ea8Slogwang 1143*a9643ea8Slogwang // 4. �������ݴ��� 1144*a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 1145*a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 1146*a9643ea8Slogwang 1147*a9643ea8Slogwang rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf); 1148*a9643ea8Slogwang if (rc < 0) 1149*a9643ea8Slogwang { 1150*a9643ea8Slogwang MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 1151*a9643ea8Slogwang ret = rc; 1152*a9643ea8Slogwang goto EXIT_LABEL; 1153*a9643ea8Slogwang } 1154*a9643ea8Slogwang 1155*a9643ea8Slogwang ret = 0; 1156*a9643ea8Slogwang 1157*a9643ea8Slogwang EXIT_LABEL: 1158*a9643ea8Slogwang 1159*a9643ea8Slogwang // ʧ����ǿ���ͷ�����, ����ʱ���� 1160*a9643ea8Slogwang if (conn != NULL) 1161*a9643ea8Slogwang { 1162*a9643ea8Slogwang ConnectionMgr::Instance()->FreeConnection(conn, (ret < 0)); 1163*a9643ea8Slogwang } 1164*a9643ea8Slogwang 1165*a9643ea8Slogwang return ret; 1166*a9643ea8Slogwang } 1167*a9643ea8Slogwang 1168*a9643ea8Slogwang 1169*a9643ea8Slogwang /** 1170*a9643ea8Slogwang * @brief TCP�������շ����� 1171*a9643ea8Slogwang * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 1172*a9643ea8Slogwang * [ע��] �Ľӿڣ���ע�ⲻҪ����ķ���ֵ������֤��mt_tcpsendrcv_ex����ֵƥ�� [��Ҫ] 1173*a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 1174*a9643ea8Slogwang * @param pkg -�������װ�İ��� 1175*a9643ea8Slogwang * @param len -�������װ�İ��峤�� 1176*a9643ea8Slogwang * @param rcv_buf -���������������ο����� keep_rcv_buf�� 1177*a9643ea8Slogwang * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ� 1178*a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 1179*a9643ea8Slogwang * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 1180*a9643ea8Slogwang * @param msg_ctx -�������ĵ������ı����� 1181*a9643ea8Slogwang * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿ 1182*a9643ea8Slogwang * ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿 1183*a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 1184*a9643ea8Slogwang * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч 1185*a9643ea8Slogwang */ 1186*a9643ea8Slogwang int mt_tcpsendrcv_short(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& recv_pkg_size, 1187*a9643ea8Slogwang int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, bool keep_rcv_buf) 1188*a9643ea8Slogwang { 1189*a9643ea8Slogwang int ret = 0, rc = 0; 1190*a9643ea8Slogwang int addr_len = sizeof(struct sockaddr_in); 1191*a9643ea8Slogwang utime64_t start_ms = MtFrame::Instance()->GetLastClock(); 1192*a9643ea8Slogwang utime64_t cost_time = 0; 1193*a9643ea8Slogwang int time_left = timeout; 1194*a9643ea8Slogwang 1195*a9643ea8Slogwang // 1. ������� 1196*a9643ea8Slogwang if(!dst || !pkg || len<1) 1197*a9643ea8Slogwang { 1198*a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p]", 1199*a9643ea8Slogwang dst, pkg, len, check_func); 1200*a9643ea8Slogwang return -10; 1201*a9643ea8Slogwang } 1202*a9643ea8Slogwang 1203*a9643ea8Slogwang // 2. ����TCP socket 1204*a9643ea8Slogwang int sock; 1205*a9643ea8Slogwang sock = mt_tcp_create_sock(); 1206*a9643ea8Slogwang if (sock < 0) 1207*a9643ea8Slogwang { 1208*a9643ea8Slogwang MTLOG_ERROR("create tcp socket failed, ret: %d", sock); 1209*a9643ea8Slogwang return -1; 1210*a9643ea8Slogwang } 1211*a9643ea8Slogwang 1212*a9643ea8Slogwang // 3. ���Լ����½����� 1213*a9643ea8Slogwang rc = MtFrame::connect(sock, (struct sockaddr *)dst, addr_len, time_left); 1214*a9643ea8Slogwang if (rc < 0) 1215*a9643ea8Slogwang { 1216*a9643ea8Slogwang MTLOG_ERROR("socket[%d] connect failed, ret[%d][%m]", sock, rc); 1217*a9643ea8Slogwang ret = -4; 1218*a9643ea8Slogwang goto EXIT_LABEL; 1219*a9643ea8Slogwang } 1220*a9643ea8Slogwang 1221*a9643ea8Slogwang // 4. �������ݴ��� 1222*a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 1223*a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 1224*a9643ea8Slogwang rc = MtFrame::send(sock, pkg, len, 0, time_left); 1225*a9643ea8Slogwang if (rc < 0) 1226*a9643ea8Slogwang { 1227*a9643ea8Slogwang MTLOG_ERROR("socket[%d] send failed, ret[%d][%m]", sock, rc); 1228*a9643ea8Slogwang ret = -2; 1229*a9643ea8Slogwang goto EXIT_LABEL; 1230*a9643ea8Slogwang } 1231*a9643ea8Slogwang 1232*a9643ea8Slogwang // 5. �������ݴ��� 1233*a9643ea8Slogwang cost_time = MtFrame::Instance()->GetLastClock() - start_ms; 1234*a9643ea8Slogwang time_left = (timeout > (int)cost_time) ? (timeout - (int)cost_time) : 0; 1235*a9643ea8Slogwang rc = mt_tcp_check_recv(sock, rcv_buf, recv_pkg_size, 0, time_left, check_func, msg_ctx, keep_rcv_buf); 1236*a9643ea8Slogwang 1237*a9643ea8Slogwang if (rc < 0) 1238*a9643ea8Slogwang { 1239*a9643ea8Slogwang MTLOG_ERROR("socket[%d] rcv failed, ret[%d][%m]", sock, rc); 1240*a9643ea8Slogwang ret = rc; 1241*a9643ea8Slogwang goto EXIT_LABEL; 1242*a9643ea8Slogwang } 1243*a9643ea8Slogwang 1244*a9643ea8Slogwang ret = 0; 1245*a9643ea8Slogwang 1246*a9643ea8Slogwang EXIT_LABEL: 1247*a9643ea8Slogwang if (sock >= 0) 1248*a9643ea8Slogwang ::close(sock); 1249*a9643ea8Slogwang 1250*a9643ea8Slogwang return ret; 1251*a9643ea8Slogwang } 1252*a9643ea8Slogwang 1253*a9643ea8Slogwang 1254*a9643ea8Slogwang 1255*a9643ea8Slogwang /** 1256*a9643ea8Slogwang * @brief TCP�շ��ӿڣ�����ѡ���˱������ӻ��߶����� 1257*a9643ea8Slogwang * [ע��] tcp���շ���buff, ��������static����, ����������Ĵ��� [��Ҫ] 1258*a9643ea8Slogwang * @param dst -�����͵�Ŀ�ĵ�ַ 1259*a9643ea8Slogwang * @param pkg -�������װ�İ��� 1260*a9643ea8Slogwang * @param len -�������װ�İ��峤�� 1261*a9643ea8Slogwang * @param rcv_buf -���������������ο����� keep_rcv_buf�� 1262*a9643ea8Slogwang * @param recv_pkg_size -����Ӧ�����buff�ij�ʼ��С����������0ʱ�����ر���ʵ�ʳ��ȡ� 1263*a9643ea8Slogwang * @param timeout -��ʱʱ��, ��λms 1264*a9643ea8Slogwang * @param check_func -��ⱨ���Ƿ�ɹ����ﺯ�� 1265*a9643ea8Slogwang * @param msg_ctx -�������ĵ������ı����� 1266*a9643ea8Slogwang * 1267*a9643ea8Slogwang * @param type - �������� 1268*a9643ea8Slogwang * MT_TCP_SHORT: һ��һ�������ӣ� 1269*a9643ea8Slogwang * MT_TCP_LONG : һ��һ�ճ����ӣ� 1270*a9643ea8Slogwang * MT_TCP_LONG_SNDONLY : ֻ�����ճ����ӣ� 1271*a9643ea8Slogwang * MT_TCP_SHORT_SNDONLY: ֻ�����ն����ӣ� 1272*a9643ea8Slogwang * @param keep_rcv_buf -true,��ܽ�����rcv_buf��ֵ������������ҵ�����ͷŸ�buf���������ڵ��ý���ǰ�ͷŸ�buf����ע�⡿ 1273*a9643ea8Slogwang * ҵ����Ҫ�Լ���msg_ctx�б��������Ϣ�������ͨ��malloc�����ڴ桿 1274*a9643ea8Slogwang * @return 0 �ɹ�, -1 ��socketʧ��, -2 ��������ʧ��, -3 ����Ӧ��ʧ��, 1275*a9643ea8Slogwang * -4 ����ʧ��, -5 ��ⱨ��ʧ��, -6 ���տռ䲻��, -7 ��������ر�����, -10 ������Ч 1276*a9643ea8Slogwang */ 1277*a9643ea8Slogwang int mt_tcpsendrcv_ex(struct sockaddr_in* dst, void* pkg, int len, void*& rcv_buf, int& rcv_pkg_size, 1278*a9643ea8Slogwang int timeout, MtFuncTcpMsgChecker check_func, void* msg_ctx, 1279*a9643ea8Slogwang MT_TCP_CONN_TYPE type, bool keep_rcv_buf) 1280*a9643ea8Slogwang { 1281*a9643ea8Slogwang if(!dst || !pkg || len<1) 1282*a9643ea8Slogwang { 1283*a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]", 1284*a9643ea8Slogwang dst, pkg, len, check_func, msg_ctx, type); 1285*a9643ea8Slogwang return -10; 1286*a9643ea8Slogwang } 1287*a9643ea8Slogwang 1288*a9643ea8Slogwang switch (type) 1289*a9643ea8Slogwang { 1290*a9643ea8Slogwang // TCP�����ӵ������� 1291*a9643ea8Slogwang case MT_TCP_LONG: 1292*a9643ea8Slogwang { 1293*a9643ea8Slogwang return mt_tcpsendrcv(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf); 1294*a9643ea8Slogwang } 1295*a9643ea8Slogwang 1296*a9643ea8Slogwang // TCP������ֻ������ 1297*a9643ea8Slogwang case MT_TCP_LONG_SNDONLY: 1298*a9643ea8Slogwang { 1299*a9643ea8Slogwang return mt_tcpsend(dst, pkg, len, timeout); 1300*a9643ea8Slogwang } 1301*a9643ea8Slogwang 1302*a9643ea8Slogwang // TCP�����ӵ������� 1303*a9643ea8Slogwang case MT_TCP_SHORT: 1304*a9643ea8Slogwang { 1305*a9643ea8Slogwang return mt_tcpsendrcv_short(dst, pkg, len, rcv_buf, rcv_pkg_size, timeout, check_func, msg_ctx, keep_rcv_buf); 1306*a9643ea8Slogwang } 1307*a9643ea8Slogwang 1308*a9643ea8Slogwang // TCP������ֻ������ 1309*a9643ea8Slogwang case MT_TCP_SHORT_SNDONLY: 1310*a9643ea8Slogwang { 1311*a9643ea8Slogwang return mt_tcpsend_short(dst, pkg, len, timeout); 1312*a9643ea8Slogwang } 1313*a9643ea8Slogwang 1314*a9643ea8Slogwang default: 1315*a9643ea8Slogwang { 1316*a9643ea8Slogwang MTLOG_ERROR("input params invalid, dst[%p], pkg[%p], len[%d], fun[%p], msg_ctx[%p], type[%d]", 1317*a9643ea8Slogwang dst, pkg, len, check_func, msg_ctx, type); 1318*a9643ea8Slogwang return -10; 1319*a9643ea8Slogwang } 1320*a9643ea8Slogwang } 1321*a9643ea8Slogwang 1322*a9643ea8Slogwang return 0; 1323*a9643ea8Slogwang } 1324*a9643ea8Slogwang 1325*a9643ea8Slogwang 1326*a9643ea8Slogwang 1327*a9643ea8Slogwang } 1328*a9643ea8Slogwang 1329*a9643ea8Slogwang 1330