第106讲 boost::asio(2)

嗯,上一讲我们用boost的asio搭建了一个服务端,那么为了完整性,我们今天再用asio来搭建一个客户端,这个客户端的功能和服务端的一样很简单,就是负责一些网络消息的传输和接收而已,当然我们也用他来处理一些网络协议,比如说今天我们会看到write和read这两个函数,这就是专门用来传输协议用的,send和read用于同步操作,当我们选择使用同步的时候那么连接也需要用到相应的同步连接来连接服务端(Connect),否则就会路由到异步操作上面去,我们使用的同步会永远抓不到我们想要的结果。


//=============================================

#pragma once

#include <boost/asio.hpp>

#include <boost/date_time/posix_time/posix_time.hpp>

#include <memory>

#include <
functional>

#include <vector>

#include <algorithm>

#include <thread>

#include <mutex>

#include <boost/algorithm/string.hpp>


typedef std::function<void(const std::string)>Msg_Proc_Fun;

using namespace boost;

class HClient

{


typedef std::shared_ptr<asio::ip::tcp::socket>Sock_Ptr;

typedef std::shared_ptr<std::vector<char>> Buffer;

public:

HClient(const std::string&

ip = "

127.0.0.1"

, unsigned port = 12000);

virtual ~HClient();


void setIp(const std::string&

ip);


void setPort(unsigned port);


void exit();

//异步操作


void PostConnect();


void PostReceive();

//投递一个接收


void PostSend(const std::string &

send);

//投递一个发送

//同步操作


void Connect();

int Send(const std::string&

send);

int Read(std::string&

str, size_t size = 8192000);

//默认创建8M大小缓冲器接收数据


//========================================================

// 适用于关于命令协议的操作

//========================================================

__int64 write(const char* cmd, __int16 size);

__int64 read(char* result, __int16 size);

//开启线程


void run();

//连接状态

bool IsConnected();


void close();


void bindFun(Msg_Proc_Fun fun);

protected:


void start();


void conn_handle(const system::error_code&

e);


void read_handle(const system::error_code&

e);


void send_handle(const system::error_code&

e);


void timeout_handle(const system::error_code&

e, std::shared_ptr<boost::asio::deadline_timer>t);


void threadrun();

private:

boost::asio::io_service m_io;

asio::ip::tcp::endpoint m_port;

Sock_Ptr

m_socket;

Buffer

m_readbuf;

std::string

m_ip;

unsigned int

n_port;

bool

b_IsConnect;

bool b_is_status;

//状态

bool b_is_timeout{ true
};


std::shared_ptr<std::thread>m_thread;

std::shared_ptr<boost::asio::io_service::work>m_work;


std::mutex

m_mutex;


Logger

m_logger;

Msg_Proc_Fun m_msg_fun{nullptr };


HClient(const HClient&

) = delete;

HClient&

operator=(const HClient&

) = delete;

};


//——————————————


#include "

HClient.h"

#include <boost/date_time/posix_time/posix_time.hpp>




HClient::HClient(const std::string &

ip, unsigned port) :

m_ip(ip), n_port(port), m_socket(nullptr)


{

m_work = std::shared_ptr<boost::asio::io_service::work>(new boost::asio::io_service::work(m_io));

m_readbuf = Buffer(new std::vector<char>(8192000, 0));

b_IsConnect = false;

try{

m_logger.EnableConsole(true);

}

catch (std::runtime_error e){

std::cout <<e.what() <<std::endl;

}

}




HClient::~HClient()

{

if (m_thread->joinable())

m_thread->join();

}


void HClient::setIp(const std::string &

ip){

m_ip = ip;

}


void HClient::setPort(unsigned port){

n_port = port;

}


void HClient::exit()

{

m_thread->join();

}


//异步连接

void HClient::PostConnect()

{

m_port = asio::ip::tcp::endpoint(asio::ip::address::from_string(m_ip), n_port);

start();

}


//异步接收

void HClient::PostReceive()

{

if (!b_IsConnect){

return;

}

std::fill(m_readbuf->begin(), m_readbuf->end(), 0);

m_socket->async_receive(asio::buffer(*m_readbuf), std::bind(&

HClient::read_handle, this,

std::placeholders::_1));

}


//异步发送

void HClient::PostSend(const std::string &

send)

{

if (!b_IsConnect){

return;

}

m_socket->async_send(asio::buffer(&

send[0], send.length()),

std::bind(&

HClient::send_handle, this,

std::placeholders::_1));

}


//同步连接

void HClient::Connect()

{

m_port = asio::ip::tcp::endpoint(asio::ip::address::from_string(m_ip), n_port);

m_socket = Sock_Ptr(new asio::ip::tcp::socket(m_io));


system::error_code e;

b_is_status = false;

m_socket->connect(m_port, e);

b_is_status = true;


if (e){

static
int count = 0;

if (count == 200){

m_socket.reset();

m_socket = nullptr;

b_IsConnect = false;

count = 0;

}


m_logger.log("

网络连接失败,将再次尝试连接……"

);

asio::deadline_timer t(m_io, posix_time::seconds(2));

t.wait();

//等待2秒后重新连接

++count;

Connect();

}

else{

b_IsConnect = true;

}

}


//同步发送消息

int HClient::Send(const std::string &

send)

{

std::unique_lock<std::mutex>lock(m_mutex);

std::shared_ptr<boost::asio::deadline_timer>t(new boost::asio::deadline_timer(m_io, boost::posix_time::seconds(1)));

t->async_wait(std::bind(&

HClient::timeout_handle, this, std::placeholders::_1, t));

while (!b_is_timeout){

;

}


b_is_timeout = false;


b_is_status = false;

const char* ch = send.c_str();

size_t len = strlen(ch);

int size = m_socket->send(asio::buffer(send.c_str(),send.size()));

b_is_status = true;

t->cancel();


return size;

}


int HClient::SendCmd(const std::string &

send,size_t size){

int re_size = m_socket->send(asio::buffer(send.c_str(), size));

b_is_status = true;


return re_size;

}


//同步接收

int HClient::Read(std::string &

str,size_t size)

{

std::unique_lock<std::mutex>lock(m_mutex);

std::shared_ptr<boost::asio::deadline_timer>t(new boost::asio::deadline_timer(m_io, boost::posix_time::seconds(1)));

t->async_wait(std::bind(&

HClient::timeout_handle, this, std::placeholders::_1, t));

while (!b_is_timeout){

;

}


b_is_timeout = false;

Buffer read(new std::vector<char>(size, 0));

b_is_status = false;

int ret = m_socket->read_some(asio::buffer(*read));

b_is_status = true;

t->cancel();

str = std::string(&

(*read)[0]);


return ret;

}


__int64 HClient::write(const char* cmd, __int16 size){


std::unique_lock<std::mutex>lock(m_mutex);

std::shared_ptr<boost::asio::deadline_timer>t(new boost::asio::deadline_timer(m_io, boost::posix_time::seconds(1)));

t->async_wait(std::bind(&

HClient::timeout_handle, this, std::placeholders::_1, t));

while (!b_is_timeout){

;

}

b_is_timeout = false;

b_is_status = false;

int ret = m_socket->send(asio::buffer(cmd, size));

b_is_status = true;

t->cancel();


return ret;

}


__int64 HClient::read(char* result, __int16 size){

std::unique_lock<std::mutex>lock(m_mutex);

std::shared_ptr<boost::asio::deadline_timer>t(new boost::asio::deadline_timer(m_io, boost::posix_time::seconds(1)));

t->async_wait(std::bind(&

HClient::timeout_handle, this, std::placeholders::_1, t));

//========================================

//等待上一次操作完成

//========================================

while (!b_is_timeout){

;

}

b_is_timeout = false;

b_is_status = false;

int ret = m_socket->read_some(asio::buffer(result,size));

b_is_status = true;

t->cancel();


return ret;

}



void HClient::run()

{

m_thread = std::shared_ptr<std::thread>(

new std::thread(&

HClient::threadrun, this));

}


bool HClient::IsConnected()

{


return b_IsConnect;

}


void HClient::start()

{

m_socket = Sock_Ptr(new asio::ip::tcp::socket(m_io));

m_socket->async_connect(m_port, std::bind(&

HClient::conn_handle,

this, std::placeholders::_1));

}


void HClient::conn_handle(const system::error_code &

e)

{

if (e){

static
int count = 0;

//控制网络连接次数

if (count == 200){


m_socket.reset();

m_socket = nullptr;

b_IsConnect = false;

count = 0;

return;

}


asio::deadline_timer t(m_io, posix_time::seconds(5));

t.wait();

//等待5秒后重新连接

++count;

start();

}

else{


b_IsConnect = true;

PostReceive();

}

}


void HClient::read_handle(const system::error_code &

e)

{

if (e){

m_socket.reset();

m_socket = nullptr;

b_IsConnect = false;

}

else{

std::string str(&

(*m_readbuf)[0]);

if(m_msg_fun)

m_msg_fun(str);

PostReceive();

}

}


void HClient::send_handle(const system::error_code &

e)

{

if (e){


m_socket.reset();

m_socket = nullptr;

b_IsConnect = false;

}

}




//开启线程循环处理消息

void HClient::threadrun()

{

m_io.run();

}


//超时回调

void HClient::timeout_handle(const system::error_code&

e, std::shared_ptr<boost::asio::deadline_timer>t){

if (!b_is_status){

m_logger.log("

socket operation fail——–>time out!!!"

);

}

else{

m_logger.log("

socket operation success !!!!"

);

b_is_timeout = true;

}

}



void HClient::close(){

try{

if (b_IsConnect){

if (m_socket){

m_socket->close();

b_IsConnect = false;

}

}

}

catch (system::system_error e){

m_logger.log(e.what());

}

catch (…){

m_logger.log("

close socket fail……."

);

}

}


//================================

如果大家看了昨天的服务端的代码,那么今天客户端的代码就相待来说简单得多了,唯一不同的就是客户端的操作加入了一个超时等待的操作,所以在上一讲里面我们特意说了关于boost的asio的定时器,就是为了应用在这里面,因为在asio里面没有超时等待这一功能,那么如果我们想要超时的话,就要自己动手了,在同步操作中使用异步超时这一方法是我想到最简单的。

嗯,还有大家也看到了,在客户端的编程中我们使用了在前面的章节中说到的logger来打印信息,嗯,为什么要提这一个问题呢?这是一个充满血和泪的教训啊,在以前我通常都是使用cout或者cerr来打印信息,后来是在一个实时性极强的的项目中发现了问题,我在和外协做的一个服务器交互,当时用的就是这个客户端,但是打印信息的不是logger,我和之间都是毫秒级的交互,所以会很频繁的打印信息,当时没发现什么问题,后来有人用鼠标在黑乎乎的控制台上拉了一下,然后他的服务器那边就出错了,出错原因是因为等我这边的相应超时,当时弄了好几天都没想清楚是怎么回事,实在是不明白为什么动一下程序就影响到性能,直到后来才发现是因为线程被干扰了,也是换用logger来打印信息,问题瞬间迎刃而解,我那个叉,这东西居然一直没发现,所以这算是一些实战经验吧,对一些实时性高的东西,就要采取一些隐晦手段。


关于怎么使用这两个东西,其实是相当简单事,我简单的说个例子大家一看就会明白:


//=======================================

// 服务器

#include "

HService.h"


int main(){

std::shared_ptr<HService>service = HService::Instance();

service->bindProcMsgFun([&

](const std::string&

str,Sock_Ptr sock){

std::cout<<str<<std::endl;

service->PostSend(str,sock);

});

service->run();


return 0;

}


//客户端


#include "

HClient.h"


int main(){

HClient client;

client.setIp("

127.0.0.1"

);

//其实这个ip和12000的端口已经是默认的了,这里为了演示所以完整走一遍

client.setPort(12000);

client.run();

//开启异步操作服务

client.PostSend("

Hello World"

);

client.bindFun([&

](const std::string&

str){

std::cout<<str<<std::endl;

client.PostSend(str);

});


return 0;

}


//================================

先启动服务器,再启动客户端就会看到一段信息来回不停的发送和接收…….

使用起来是不是相当的简单的呢?经过测试过,这个服务器和客户端都还算是相当的稳定的,扔在项目里跑过几天几夜都没出现问题过。


//==================================

回复D查看目录,回复数字查看相应章节。


原文始发于微信公众号(

C/C++的编程教室

):第106讲 boost::asio(2)

|

发表评论