阅读 65

Boost.asio的简单使用(timer,thread,io_service类)

Boost.asio的简单使用(timer,thread,io_service类) - 简单的日志 - 网易博客

Boost.asio的简单使用(timer,thread,io_service类)   

 

2010-05-17 18:20:06|  分类:
C/C++之boost
|  标签: 

|字号  订阅


2. 同步Timer

本章介绍asio如何在定时器上进行阻塞等待(blocking wait).
实现,我们包含必要的头文件.
所有的asio类可以简单的通过include "asio.hpp"来调用.

  1. #include <iostream>

  2. #include <boost/asio.hpp>

此外,这个示例用到了timer,我们还要包含Boost.Date_Time的头文件来控制时间.

  1. #include <boost/date_time/posix_time/posix_time.hpp>

使用asio至少需要一个boost::asio::io_service对象.该类提供了访问I/O的功能.我们首先在main函数中声明它.

  1. int main()

  2. {

  3.     boost::asio::io_service io;

下一步我们声明boost::asio::deadline_timer对象.这个asio的核心类提供I/O的功能(这里更确切的说是定时功能),总是把一个io_service对象作为他的第一个构造函数,而第二个构造函数的参数设定timer会在5秒后到时(expired).

  1. boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));

这个简单的示例中我们演示了定时器上的一个阻塞等待.就是说,调用boost::asio::deadline_timer::wait()的在创建后5秒内(注意:不是等待开始后),timer到时之前不会返回任何值.
一个deadline_timer只有两种状态:到时,未到时.
如果boost::asio::deadline_timer::wait()在到时的timer对象上调用,会立即return.

  1. t.wait();

最后,我们输出理所当然的"Hello, world!"来演示timer到时了.

  1.     std::cout << "Hello, world! ";

  2.     return 0;

  3. }

完整的代码:

  1. #include <iostream>

  2. #include <boost/asio.hpp>

  3. #include <boost/date_time/posix_time/posix_time.hpp>

  4. int main()

  5. {

  6.     boost::asio::io_service io;

  7.     boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));

  8.     t.wait();

  9.     std::cout << "Hello, world! ";

  10.     return 0;

  11. }

 

3. 异步Timer

  1. #include <iostream>

  2. #include <asio.hpp>

  3. #include <boost/date_time/posix_time/posix_time.hpp>

asio的异步函数会在一个异步操作完成后被回调.这里我们定义了一个将被回调的函数.

  1. void print(const asio::error& /*e*/)

  2. {

  3.     std::cout << "Hello, world! ";

  4. }

  5. int main()

  6. {

  7.     asio::io_service io;

  8.     asio::deadline_timer t(io, boost::posix_time::seconds(5));

这里我们调用asio::deadline_timer::async_wait()来异步等待

  1. t.async_wait(print);

最后,我们必须调用asio::io_service::run().
asio库只会调用那个正在运行的asio::io_service::run()的回调函数.
如果asio::io_service::run()不被调用,那么回调永远不会发生.
asio::io_service::run()会持续工作到点,这里就是timer到时,回调完成.
别忘了在调用 asio::io_service::run()之前设置好io_service的任务.比如,这里,如果我们忘记先调用asio::deadline_timer::async_wait()asio::io_service::run()会在瞬间return.

  1.     io.run();

  2.     return 0;

  3. }

完整的代码:

  1. #include <iostream>

  2. #include <asio.hpp>

  3. #include <boost/date_time/posix_time/posix_time.hpp>

  4. void print(const asio::error& /*e*/)

  5. {

  6.     std::cout << "Hello, world! ";

  7. }

  8. int main()

  9. {

  10.     asio::io_service io;

  11.     asio::deadline_timer t(io, boost::posix_time::seconds(5));

  12.     t.async_wait(print);

  13.     io.run();

  14.     return 0;

  15. }


4. 回调函数的参数

这里我们将每秒回调一次,来演示如何回调函数参数的含义

  1. #include <iostream>

  2. #include <asio.hpp>

  3. #include <boost/bind.hpp>

  4. #include <boost/date_time/posix_time/posix_time.hpp>

首先,调整一下timer的持续时间,开始一个异步等待.显示,回调函数需要访问timer来实现周期运行,所以我们再介绍两个新参数

  • 指向timer的指针

  • 一个int*来指向计数器

  1. void print(const asio::error& /*e*/,

  2.     asio::deadline_timer* t, int* count)

  3. {

我们打算让这个函数运行6个周期,然而你会发现这里没有显式的方法来终止io_service.不过,回顾上一节,你会发现当 asio::io_service::run()会在所有任务完成时终止.这样我们当计算器的值达到5时(0为第一次运行的值),不再开启一个新的异步等待就可以了.

  1.     if (*count < 5)

  2.     {

  3.         std::cout << *count << " ";

  4.         ++(*count);

  5. ...

然后,我们推迟的timer的终止时间.通过在原先的终止时间上增加延时,我们可以确保timer不会在处理回调函数所需时间内的到期.
(原文:By calculating the new expiry time relative to the old, we can ensure that the timer does not drift away from the whole-second mark due to any delays in processing the handler.)

  1. t->expires_at(t->expires_at() + boost::posix_time::seconds(1));

然后我们开始一个新的同步等待.如您所见,我们用把print和他的多个参数用boost::bind函数合成一个的形为void(const asio::error&)回调函数(准确的说是function object).
在这个例子中, boost::bindasio::placeholders::error参数是为了给回调函数传入一个error对象.当进行一个异步操作,开始 boost::bind时,你需要使用它来匹配回调函数的参数表.下一节中你会学到回调函数不需要error参数时可以省略它.

  1.      t->async_wait(boost::bind(print,

  2.         asio::placeholders::error, t, count));

  3.     }

  4. }

  5. int main()

  6. {

  7.     asio::io_service io;

  8.     int count = 0;

  9.     asio::deadline_timer t(io, boost::posix_time::seconds(1));

和上面一样,我们再一次使用了绑定asio::deadline_timer::async_wait()

  1. t.async_wait(boost::bind(print,

  2.     asio::placeholders::error, &t, &count));

  3. io.run();

在结尾,我们打印出的最后一次没有设置timer的调用的count的值

  1.     std::cout << "Final count is " << count << " ";

  2.     return 0;

  3. }

完整的代码:

  1. #include <iostream>

  2. #include <asio.hpp>

  3. #include <boost/bind.hpp>

  4. #include <boost/date_time/posix_time/posix_time.hpp>

  5. void print(const asio::error& /*e*/,

  6.   bsp;     asio::deadline_timer* t, int* count)

  7. {

  8.     if (*count < 5)

  9.     {

  10.         std::cout << *count << " ";

  11.         ++(*count);

  12.         t->expires_at(t->expires_at() + boost::posix_time::seconds(1));

  13.         t->async_wait(boost::bind(print,

  14.                     asio::placeholders::error, t, count));

  15.     }

  16. }

  17. int main()

  18. {

  19.     asio::io_service io;

  20.     int count = 0;

  21.     asio::deadline_timer t(io, boost::posix_time::seconds(1));

  22.     t.async_wait(boost::bind(print,

  23.                 asio::placeholders::error, &t, &count));

  24.     io.run();

  25.     std::cout << "Final count is " << count << " ";

  26.     return 0;

  27. }

 

5. 成员函数作为回调函数

本例的运行结果和上一节类似

  1. #include <iostream>

  2. #include <boost/asio.hpp>

  3. #include <boost/bind.hpp>

  4. #include <boost/date_time/posix_time/posix_time.hpp>

我们先定义一个printer类

  1. class printer

  2. {

  3. public:

  4. //构造函数有一个io_service参数,并且在初始化timer_时用到了它.用来计数的count_这里同样作为了成员变量

  5.     printer(boost::asio::io_service& io)

  6.         : timer_(io, boost::posix_time::seconds(1)),

  7.             count_(0)

  8.     {

boost::bind 同样可以出色的工作在成员函数上.众所周知,所有的非静态成员函数都有一个隐式的this参数,我们需要把this作为参数bind到成员函数上.和上一节类似,我们再次用bind构造出void(const boost::asio::error&)形式的函数.
注意,这里没有指定boost::asio::placeholders::error占位符,因为这个print成员函数没有接受一个error对象作为参数.

  1. timer_.async_wait(boost::bind(&printer::print, this));

  2.  

在类的折构函数中我们输出最后一次回调的count的值

  1. ~printer()

  2. {

  3.     std::cout << "Final count is " << count_ << " ";

  4. }


print函数于上一节的十分类似,但是用成员变量取代了参数.

  1.     void print()

  2.     {

  3.         if (count_ < 5)

  4.         {

  5.             std::cout << count_ << " ";

  6.             ++count_;

  7.             timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));

  8.             timer_.async_wait(boost::bind(&printer::print, this));

  9.         }

  10.     }

  11. private:

  12.     boost::asio::deadline_timer timer_;

  13.     int count_;

  14. };

  15.  

现在main函数清爽多了,在运行io_service之前只需要简单的定义一个printer对象.

  1. int main()

  2. {

  3.     boost::asio::io_service io;

  4.     printer p(io);

  5.     io.run();

  6.     return 0;

  7. }

完整的代码:

  1. #include <iostream>

  2. #include <boost/asio.hpp>

  3. #include <boost/bind.hpp>

  4. #include <boost/date_time/posix_time/posix_time.hpp>

  5. class printer

  6. {

  7.     public:

  8.         printer(boost::asio::io_service& io)

  9.             : timer_(io, boost::posix_time::seconds(1)),

  10.             count_(0)

  11.     {

  12.         timer_.async_wait(boost::bind(&printer::print, this));

  13.     }

  14.         ~printer()

  15.         {

  16.             std::cout << "Final count is " << count_ << " ";

  17.         }

  18.         void print()

  19.         {

  20.             if (count_ < 5)

  21.             {

  22.                 std::cout << count_ << " ";

  23.                 ++count_;

  24.                 timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));

  25.                 timer_.async_wait(boost::bind(&printer::print, this));

  26.             }

  27.         }

  28.     private:

  29.         boost::asio::deadline_timer timer_;

  30.         int count_;

  31. };

  32. int main()

  33. {

  34.     boost::asio::io_service io;

  35.     printer p(io);

  36.     io.run();

  37.     return 0;

  38. }

 

 


6. 多线程回调同步

本节演示了使用boost::asio::strand在多线程程序中进行回调同步(synchronise).
先前的几节阐明了如何在单线程程序中用boost::asio::io_service::run()进行同步.如您所见,asio库确保 仅当当前线程调用boost::asio::io_service::run()时产生回调.显然,仅在一个线程中调用 boost::asio::io_service::run() 来确保回调是适用于并发编程的.
一个基于asio的程序最好是从单线程入手,但是单线程有如下的限制,这一点在服务器上尤其明显:

  • 当回调耗时较长时,反应迟钝.

  • 在多核的系统上无能为力

如果你发觉你陷入了这种困扰,可以替代的方法是建立一个boost::asio::io_service::run()的线程池.然而这样就允许回调函数并发执行.所以,当回调函数需要访问一个共享,线程不安全的资源时,我们需要一种方式来同步操作.

  1. #include <iostream>

  2. #include <boost/asio.hpp>

  3. #include <boost/thread.hpp>

  4. #include <boost/bind.hpp>

  5. #include <boost/date_time/posix_time/posix_time.hpp>

在上一节的基础上我们定义一个printer类,此次,它将并行运行两个timer

  1. class printer

  2. {

  3. public:

除了声明了一对boost::asio::deadline_timer,构造函数也初始化了类型为boost::asio::strandstrand_成员.
boost::asio::strand 可以分配的回调函数.它保证无论有多少线程调用了boost::asio::io_service::run(),下一个回调函数仅在前一个回调函数完成后开始,当然回调函数仍然可以和那些不使用boost::asio::strand分配,或是使用另一个boost::asio::strand分配的回调函数一起并发执行.

  1. printer(boost::asio::io_service& io)

  2.     : strand_(io),

  3.     timer1_(io, boost::posix_time::seconds(1)),

  4.     timer2_(io, boost::posix_time::seconds(1)),

  5.     count_(0)

  6. {

当一个异步操作开始时,用boost::asio::strand来 "wrapped(包装)"回调函数.boost::asio::strand::wrap()会返回一个由boost::asio::strand分配的新的handler(句柄),这样,我们可以确保它们不会同时运行.

  1.     timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));

  2.     timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));

  3. }

  4. ~printer()

  5. {

  6.     std::cout << "Final count is " << count_ << " ";

  7. }


多线程程序中,回调函数在访问共享资源前需要同步.这里共享资源是std::cout count_变量.

  1.     void print1()

  2.     {

  3.         if (count_ < 10)

  4.         {

  5.             std::cout << "Timer 1: " << count_ << " ";

  6.             ++count_;

  7.             timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));

  8.             timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));

  9.         }

  10.     }

  11.     void print2()

  12.     {

  13.         if (count_ < 10)

  14.         {

  15.             std::cout << "Timer 2: " << count_ << " ";

  16.             ++count_;

  17.             timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));

  18.             timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));

  19.         }

  20.     }

  21. private:

  22.     boost::asio::strand strand_;

  23.     boost::asio::deadline_timer timer1_;

  24.     boost::asio::deadline_timer timer2_;

  25.     int count_;

  26. };

main函数中boost::asio::io_service::run()在两个线程中被调用:主线程、一个boost::thread线程.
正如单线程中那样,并发的boost::asio::io_service::run()会一直运行直到完成任务.后台的线程将在所有异步线程完成后终结.

  1. int main()

  2. {

  3.     boost::asio::io_service io;

  4.     printer p(io);

  5.     boost::thread t(boost::bind(&boost::asio::io_service::run, &io));

  6.     io.run();

  7.     t.join();

  8.     return 0;

  9. }

完整的代码:

  1. #include <iostream>

  2. #include <boost/asio.hpp>

  3. #include <boost/thread.hpp>

  4. #include <boost/bind.hpp>

  5. #include <boost/date_time/posix_time/posix_time.hpp>

  6. class printer

  7. {

  8. public:

  9.         printer(boost::asio::io_service& io)

  10.             : strand_(io),

  11.             timer1_(io, boost::posix_time::seconds(1)),

  12.             timer2_(io, boost::posix_time::seconds(1)),

  13.             count_(0)

  14.     {

  15.         timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));

  16.         timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));

  17.     }

  18.         ~printer()

  19.         {

  20.             std::cout << "Final count is " << count_ << " ";

  21.         }

  22.         void print1()

  23.         {

  24.             if (count_ < 10)

  25.             {

  26.                 std::cout << "Timer 1: " << count_ << " ";

  27.                 ++count_;

  28.                 timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));

  29.                 timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));

  30.             }

  31.         }

  32.         void print2()

  33.         {

  34.             if (count_ < 10)

  35.             {

  36.                 std::cout << "Timer 2: " << count_ << " ";

  37.                 ++count_;

  38.                 timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));

  39.                 timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));

  40.             }

  41.         }

  42. private:

  43.         boost::asio::strand strand_;

  44.         boost::asio::deadline_timer timer1_;

  45.         boost::asio::deadline_timer timer2_;

  46.         int count_;

  47. };

  48. int main()

  49. {

  50.     boost::asio::io_service io;

  51.     printer p(io);

  52.     boost::thread t(boost::bind(&boost::asio::io_service::run, &io));

  53.     io.run();

  54.     t.join();

  55.     return 0;

  56. }

 


7. TCP客户端:对准时间

  1. #include <iostream>

  2. #include <boost/array.hpp>

  3. #include <boost/asio.hpp>

本程序的目的是访问一个时间同步服务器,我们需要用户指定一个服务器(如time-nw.nist.gov),用IP亦可.
(译者注:日期查询协议,这种时间传输协议不指定固定的传输格式,只要求按照ASCII标准发送数据。)

  1. using boost::asio::ip::tcp;

  2. int main(int argc, char* argv[])

  3. {

  4.     try

  5.     {

  6.         if (argc != 2)

  7.         {

  8.             std::cerr << "Usage: client <host>" << std::endl;

  9.             return 1;

  10.             }

用asio进行网络连接至少需要一个boost::asio::io_service对象

  1. boost::asio::io_service io_service;


我们需要把在命令行参数中指定的服务器转换为TCP上的节点.完成这项工作需要boost::asio::ip::tcp::resolver对象

  1. tcp::resolver resolver(io_service);


一个resolver对象查询一个参数,并将其转换为TCP上节点的列表.这里我们把argv[1]中的sever的名字和要查询字串daytime关联.

  1. tcp::resolver::query query(argv[1], "daytime");


节点列表可以用 boost::asio::ip::tcp::resolver::iterator 来进行迭代.iterator默认的构造函数生成一个end iterator.

  1. tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);

  2. tcp::resolver::iterator end;

现在我们建立一个连接的sockert,由于获得节点既有IPv4也有IPv6的.所以,我们需要依次尝试他们直到找到一个可以正常工作的.这步使得我们的程序独立于IP版本

  1. tcp::socket socket(io_service);

  2. boost::asio::error error = boost::asio::error::host_not_found;

  3. while (error && endpoint_iterator != end)

  4. {

  5.     socket.close();

  6.     socket.connect(*endpoint_iterator++, boost::asio::assign_error(error));

  7. }

  8. if (error)

  9.     throw error;

连接完成,我们需要做的是读取daytime服务器的响应.
我们用boost::array来保存得到的数据,boost::asio::buffer()会自动根据array的大小暂停工作,来防止缓冲溢出.除了使用boost::array,也可以使用char [] std::vector.

  1. for (;;)

  2. {

  3.     boost::array<char, 128> buf;

  4.     boost::asio::error error;

  5.     size_t len = socket.read_some(

  6.         boost::asio::buffer(buf), boost::asio::assign_error(error));

当服务器关闭连接时,boost::asio::ip::tcp::socket::read_some()会用boost::asio::error::eof标志完成, 这时我们应该退出读取循环了.

  1. if (error == boost::asio::error::eof)

  2.     break// Connection closed cleanly by peer.

  3. else if (error)

  4.     throw error; // Some other error.

  5. std::cout.write(buf.data(), len);

  6.  

如果发生了什么异常我们同样会抛出它

  1. }

  2. catch (std::exception& e)

  3. {

  4.     std::cerr << e.what() << std::endl;

  5. }


运行示例:在windowsXPcmd窗口下
输入:upload.exe time-a.nist.gov

输出:54031 06-10-23 01:50:45 07 0 0 454.2 UTC(NIST) *

完整的代码:

  1. #include <iostream>

  2. #include <boost/array.hpp>

  3. #include <asio.hpp>

  4. using asio::ip::tcp;

  5. int main(int argc, char* argv[])

  6. {

  7.     try

  8.     {

  9.         if (argc != 2)

  10.         {

  11.             std::cerr << "Usage: client <host>" << std::endl;

  12.             return 1;

  13.         }

  14.         asio::io_service io_service;

  15.         tcp::resolver resolver(io_service);

  16.         tcp::resolver::query query(argv[1], "daytime");

  17.         tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);

  18.         tcp::resolver::iterator end;

  19.         tcp::socket socket(io_service);

  20.         asio::error error = asio::error::host_not_found;

  21.         while (error && endpoint_iterator != end)

  22.         {

  23.             socket.close();

  24.             socket.connect(*endpoint_iterator++, asio::assign_error(error));

  25.         }

  26.         if (error)

  27.             throw error;

  28.         for (;;)

  29.         {

  30.             boost::array<char, 128> buf;

  31.             asio::error error;

  32.             size_t len = socket.read_some(

  33.                     asio::buffer(buf), asio::assign_error(error));

  34.             if (error == asio::error::eof)

  35.                 break// Connection closed cleanly by peer.

  36.             else if (error)

  37.                 throw error; // Some other error.

  38.             std::cout.write(buf.data(), len);

  39.         }

  40.     }

  41.     catch (std::exception& e)

  42.     {

  43.         std::cerr << e.what() << std::endl;

  44.     }

  45.     return 0;

  46. }

 

8. TCP同步时间服务器

  1. #include <ctime>

  2. #include <iostream>

  3. #include <string>

  4. #include <asio.hpp>

  5. using asio::ip::tcp;

我们先定义一个函数返回当前的时间的string形式.这个函数会在我们所有的时间服务器示例上被使用.

  1. std::string make_daytime_string()

  2. {

  3.     using namespace std; // For time_t, time and ctime;

  4.     time_t now = time(0);

  5.     return ctime(&now);

  6. }

  7. int main()

  8. {

  9.     try

  10.     {

  11.         asio::io_service io_service;

新建一个asio::ip::tcp::acceptor对象来监听新的连接.我们监听TCP端口13,IP版本为V4

  1. tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 13));


这是一个iterative server,也就是说同一时间只能处理一个连接.建立一个socket来表示一个和客户端的连接, 然后等待客户端的连接.

  1. for (;;)

  2. {

  3.     tcp::socket socket(io_service);

  4.     acceptor.accept(socket);

当客户端访问服务器时,我们获取当前时间,然后返回它.

  1.         std::string message = make_daytime_string();

  2.         asio::write(socket, asio::buffer(message),

  3.             asio::transfer_all(), asio::ignore_error());

  4.     }

  5. }

最后处理异常

  1. catch (std::exception& e)

  2.     {

  3.         std::cerr << e.what() << std::endl;

  4.     }

  5.     return 0;

  6.  

运行示例:运行服务器,然后运行上一节的客户端,在windowsXPcmd窗口下
输入:client.exe 127.0.0.1
输出:Mon Oct 23 09:44:48 2006

完整的代码:

  1. #include <ctime>

  2. #include <iostream>

  3. #include <string>

  4. #include <asio.hpp>

  5. using asio::ip::tcp;

  6. std::string make_daytime_string()

  7. {

  8.     using namespace std; // For time_t, time and ctime;

  9.     time_t now = time(0);

  10.     return ctime(&now);

  11. }

  12. int main()

  13. {

  14.     try

  15.     {

  16.         asio::io_service io_service;

  17.         tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 13));

  18.         for (;;)

  19.         {

  20.             tcp::socket socket(io_service);

  21.             acceptor.accept(socket);

  22.             std::string message = make_daytime_string();

  23.             asio::write(socket, asio::buffer(message),

  24.                     asio::transfer_all(), asio::ignore_error());

  25.         }

  26.     }

  27.     catch (std::exception& e)

  28.     {

  29.         std::cerr << e.what() << std::endl;

  30.     }

  31.     return 0;

  32. }


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐