`
helpbs
  • 浏览: 1165007 次
文章分类
社区版块
存档分类
最新评论

BOOST 线程完全攻略 - 扩展 - 线程消息通讯

 
阅读更多

转自:http://blog.csdn.net/IamNieo/archive/2008/09/10/2909236.aspx

  1. //controlled_module_ex.hpp:controlled_module类的扩展
  2. //增强线程之间消息通讯
  3. //增加线程安全启动和安全关闭功能
  4. //增加定时器功能
  5. #pragmaonce
  6. #include<boost/shared_ptr.hpp>
  7. #include<boost/any.hpp>
  8. #include"controlled_module.hpp"
  9. struct_command
  10. {
  11. typedefboost::shared_ptr<_command>CCmdPtr;
  12. unsignedintnCmd;
  13. boost::anyanyParam;
  14. };
  15. struct_wait_command
  16. {
  17. boost::anypar;
  18. unsignedintcommand;
  19. void*event;
  20. boost::shared_ptr<boost::any>resp;
  21. };
  22. classcontrolled_module_ex;
  23. struct_notify
  24. {
  25. controlled_module_ex*sender;
  26. intid;
  27. boost::anypar;
  28. };
  29. #defineBM_RESERVE1000
  30. #defineBM_RING_STARTBM_RESERVE+1
  31. #defineBM_RING_STOPBM_RESERVE+2
  32. #defineBM_RING_SETTIMEBM_RESERVE+3
  33. #defineBM_RING_SETPARENTBM_RESERVE+4
  34. #defineBM_RING_CYCLEBM_RESERVE+5
  35. #defineBM_RING_PROCESSBM_RESERVE+6
  36. #defineBM_RING_PROCESSENDBM_RESERVE+7
  37. #defineBM_RING_PROCESSFAILBM_RESERVE+8
  38. #defineBM_TIMERBM_RESERVE+9
  39. #defineBM_COMMANDBM_RESERVE+10
  40. #defineBM_NOTIFYBM_RESERVE+11
  41. #defineBM_USER9000
  42. classcontrolled_timer;
  43. classcontrolled_module_ex:publiccontrolled_module
  44. {
  45. public:
  46. controlled_module_ex()
  47. {
  48. m_safe=false;
  49. }
  50. ~controlled_module_ex()
  51. {
  52. safestop();
  53. }
  54. public:
  55. template<typenameT>
  56. boolpostmessage(unsignedintnCmd,constboost::shared_ptr<T>&p)
  57. {
  58. if(this==0||!m_safe)returnfalse;
  59. boost::mutex::scoped_locklock(m_mutex_command);
  60. _command::CCmdPtrcmd(new_command);
  61. cmd->nCmd=nCmd;
  62. cmd->anyParam=p;
  63. m_list_command.push_back(cmd);
  64. returntrue;
  65. }
  66. boost::anyexecute(unsignedintcommand,boost::anypar,inttimeout=-1)
  67. {
  68. boost::shared_ptr<_wait_command>shared(new_wait_command);
  69. _wait_command&cmd=*shared;
  70. cmd.command=command;
  71. cmd.event=(void*)CreateEvent(0,FALSE,FALSE,0);
  72. cmd.par=par;
  73. cmd.resp=boost::shared_ptr<boost::any>(newboost::any);
  74. if(this->postmessage(BM_COMMAND,shared))
  75. {
  76. DWORDdw=WaitForSingleObject(cmd.event,timeout);
  77. CloseHandle(cmd.event);
  78. if(dw!=WAIT_OBJECT_0)
  79. returnboost::any();
  80. else
  81. return*cmd.resp;
  82. }
  83. else
  84. {
  85. CloseHandle(cmd.event);
  86. returnboost::any();
  87. }
  88. }
  89. voidnotify(_notifyp)
  90. {
  91. this->postmessage(BM_NOTIFY,p);
  92. }
  93. boolpostmessage(unsignedintnCmd,boost::anyp)
  94. {
  95. if(this==0||!m_safe)
  96. returnfalse;
  97. boost::mutex::scoped_locklock(m_mutex_command);
  98. _command::CCmdPtrcmd(new_command);
  99. cmd->nCmd=nCmd;
  100. cmd->anyParam=p;
  101. m_list_command.push_back(cmd);
  102. returntrue;
  103. }
  104. boolpostmessage(unsignedintnCmd)
  105. {
  106. if(this==0||!m_safe)
  107. returnfalse;
  108. boost::mutex::scoped_locklock(m_mutex_command);
  109. _command::CCmdPtrcmd(new_command);
  110. cmd->nCmd=nCmd;
  111. cmd->anyParam=0;
  112. m_list_command.push_back(cmd);
  113. returntrue;
  114. }
  115. virtualboolwork()
  116. {
  117. if(!getmessage())
  118. returnfalse;
  119. else
  120. {
  121. Sleep(this->m_sleeptime);
  122. returntrue;
  123. }
  124. }
  125. virtualvoidmessage(const_command&cmd)
  126. {
  127. if(cmd.nCmd==BM_RING_START)
  128. {
  129. this->on_safestart();
  130. }
  131. elseif(cmd.nCmd==BM_RING_STOP)
  132. {
  133. this->on_safestop();
  134. }
  135. elseif(cmd.nCmd==BM_TIMER)
  136. {
  137. this->on_timer(boost::any_cast<controlled_timer*>(cmd.anyParam));
  138. }
  139. elseif(cmd.nCmd==BM_COMMAND)
  140. {
  141. boost::shared_ptr<_wait_command>shared=boost::any_cast<boost::shared_ptr<_wait_command>>(cmd.anyParam);
  142. _wait_command&cmd=*shared;
  143. *cmd.resp=this->on_command(cmd.command,cmd.par);
  144. SetEvent((HANDLE)cmd.event);
  145. }
  146. elseif(cmd.nCmd==BM_NOTIFY)
  147. {
  148. try
  149. {
  150. _notifypar=boost::any_cast<_notify>(cmd.anyParam);
  151. this->on_notify(par);
  152. }
  153. catch(boost::bad_any_cast)
  154. {
  155. }
  156. }
  157. }
  158. virtualvoidrelease()
  159. {
  160. boost::mutex::scoped_locklock(m_mutex_command);
  161. m_list_command.clear();
  162. }
  163. voidsafestart()
  164. {
  165. if(!islive())
  166. start();
  167. m_safe=true;
  168. m_safestart_event=(void*)CreateEvent(NULL,FALSE,FALSE,0);
  169. postmessage(BM_RING_START);
  170. ::WaitForSingleObject((HANDLE)m_safestart_event,INFINITE);
  171. CloseHandle(m_safestart_event);
  172. }
  173. voidsafestop()
  174. {
  175. if(this->islive())
  176. {
  177. m_safe=false;
  178. m_safestop_event=(void*)CreateEvent(NULL,FALSE,FALSE,0);
  179. {
  180. boost::mutex::scoped_locklock(m_mutex_command);
  181. _command::CCmdPtrcmd(new_command);
  182. cmd->nCmd=BM_RING_STOP;
  183. cmd->anyParam=0;
  184. m_list_command.push_back(cmd);
  185. }
  186. DWORDdw=::WaitForSingleObject((HANDLE)m_safestop_event,3*1000);
  187. if(WAIT_OBJECT_0!=dw)
  188. {
  189. }
  190. CloseHandle(m_safestop_event);
  191. stop();
  192. }
  193. }
  194. virtualvoidon_timer(constcontrolled_timer*p){}
  195. virtualvoidon_safestart()
  196. {
  197. SetEvent(m_safestart_event);
  198. }
  199. virtualvoidon_safestop()
  200. {
  201. SetEvent(m_safestop_event);
  202. }
  203. virtualvoidon_notify(const_notify&p)
  204. {
  205. }
  206. protected:
  207. virtualboost::anyon_command(constunsignedintcommand,constboost::anypar)
  208. {
  209. returnboost::any();
  210. }
  211. boolgetmessage()
  212. {
  213. std::list<_command::CCmdPtr>cache;
  214. {
  215. boost::mutex::scoped_locklock(m_mutex_command);
  216. while(!m_list_command.empty())
  217. {
  218. _command::CCmdPtrp=m_list_command.front();
  219. m_list_command.pop_front();
  220. cache.push_back(p);
  221. }
  222. }
  223. _command::CCmdPtrstop_command;
  224. std::list<_command::CCmdPtr>::iteratoritem;
  225. for(item=cache.begin();item!=cache.end();item++)
  226. {
  227. if((*(*item)).nCmd==BM_RING_STOP)
  228. {
  229. stop_command=*item;
  230. break;
  231. }
  232. }
  233. if(stop_command.get()==0)
  234. {
  235. while(!cache.empty())
  236. {
  237. _command::CCmdPtrp=cache.front();
  238. cache.pop_front();
  239. try
  240. {
  241. if((*p).nCmd!=BM_RING_START)
  242. {
  243. if(!this->m_safe)
  244. continue;
  245. }
  246. this->message(*p);
  247. }
  248. catch(boost::bad_any_cast&)
  249. {
  250. }
  251. }
  252. returntrue;
  253. }
  254. else
  255. {
  256. cache.clear();
  257. this->message(*stop_command);
  258. returnfalse;
  259. }
  260. }
  261. private:
  262. void*m_safestart_event;
  263. void*m_safestop_event;
  264. boolm_safe;//在多线程,尤其牵涉到线程之间有类似socket级别关联时,当父线程safestop以后有可能会收到其他线程的postmessage,这时会引起线程死锁,这个m_safe就是解决这个问题的,当safestop以后不再接收新消息处理
  265. boost::mutexm_mutex_command;
  266. std::list<_command::CCmdPtr>m_list_command;
  267. };
  268. classcontrolled_timer:publiccontrolled_module_ex
  269. {
  270. public:
  271. controlled_timer()
  272. {
  273. this->m_time=0;
  274. this->m_parent=0;
  275. this->m_step=0;
  276. }
  277. ~controlled_timer(){
  278. }
  279. protected:
  280. controlled_module_ex*m_parent;
  281. intm_time;
  282. intm_step;
  283. public:
  284. voidstarttimer(inttime,controlled_module_ex*parent)
  285. {
  286. this->safestart();
  287. this->postmessage(BM_RING_SETPARENT,parent);
  288. this->postmessage(BM_RING_SETTIME,time);
  289. }
  290. voidstoptimer()
  291. {
  292. this->safestop();
  293. }
  294. public:
  295. virtualvoidon_safestop()
  296. {
  297. m_time=0;
  298. controlled_module_ex::on_safestop();
  299. }
  300. virtualvoidmessage(const_command&cmd)
  301. {
  302. controlled_module_ex::message(cmd);
  303. if(cmd.nCmd==BM_RING_SETTIME)
  304. {
  305. inttime=boost::any_cast<int>(cmd.anyParam);
  306. this->m_time=time/this->m_sleeptime;
  307. this->postmessage(BM_RING_CYCLE);
  308. }
  309. elseif(cmd.nCmd==BM_RING_SETPARENT)
  310. {
  311. this->m_parent=boost::any_cast<controlled_module_ex*>(cmd.anyParam);
  312. }
  313. elseif(cmd.nCmd==BM_RING_CYCLE)
  314. {
  315. if(m_time>0)
  316. {
  317. if(m_step>m_time)
  318. {
  319. m_parent->postmessage(BM_TIMER,this);
  320. m_step=0;
  321. }
  322. m_step++;
  323. }
  324. this->postmessage(BM_RING_CYCLE);
  325. }
  326. }
  327. };
1.向线程PostMessage
函数controlled_module_ex::postmessage完成消息推送。
虚拟函数controlled_module_ex::message(const _command & cmd)实现消息接收。
  1. #include"controlled_module_ex.hpp"
  2. classthdex:publiccontrolled_module_ex
  3. {
  4. protected:
  5. virtualvoidmessage(const_command&cmd)
  6. {
  7. controlled_module_ex::message(cmd);
  8. if(cmd.nCmd==BM_USER+1)
  9. {
  10. cout<<"getmessage"<<endl;
  11. }
  12. }
  13. };
  14. int_tmain(intargc,_TCHAR*argv[])
  15. {
  16. thdext;
  17. t.safestart();
  18. t.postmessage(BM_USER+1);
  19. charbuf[10];
  20. gets_s(buf,sizeofbuf);
  21. t.safestop();
  22. return0;
  23. }
2.向线程PostMessage,并携带简单对象参数
我们都知道常规的PostMessage要传参,如果是整型参数,还可以用强制转换,但如果是其他类型,例如字符串,我们就必须创建一个字符串缓冲,把缓冲指针作为参数传过去,线程还不能忘记删除,否则导致内存泄漏,自定义结构也是一样的操作,如果想尝试传递一个CString对象,是不可能的。
幸运的是boost提供了boost::any来抽象任何对象类型,controlled_module_ex的消息传递都是基于boost::any来完成,程序员可以由此写出干净而且内存安全的代码。
  1. #include"controlled_module_ex.hpp"
  2. structmystruct
  3. {
  4. stringa;
  5. intb;
  6. };
  7. classthdex:publiccontrolled_module_ex
  8. {
  9. protected:
  10. virtualvoidmessage(const_command&cmd)
  11. {
  12. controlled_module_ex::message(cmd);
  13. if(cmd.nCmd==BM_USER+1)
  14. {
  15. cout<<"getinteger:"<<boost::any_cast<int>(cmd.anyParam)<<endl;
  16. }
  17. if(cmd.nCmd==BM_USER+2)
  18. {
  19. cout<<"getstring:"<<boost::any_cast<string>(cmd.anyParam)<<endl;
  20. }
  21. if(cmd.nCmd==BM_USER+3)
  22. {
  23. mystructpar=boost::any_cast<mystruct>(cmd.anyParam);
  24. cout<<"getmystruct:"<<par.a<<","<<par.b<<endl;
  25. }
  26. }
  27. };
  28. int_tmain(intargc,_TCHAR*argv[])
  29. {
  30. thdext;
  31. t.safestart();
  32. t.postmessage(BM_USER+1,123);
  33. t.postmessage(BM_USER+2,string("helloworld"));
  34. mystructpar;
  35. par.a="helloworld";
  36. par.b=123;
  37. t.postmessage(BM_USER+3,par);
  38. charbuf[10];
  39. gets_s(buf,sizeofbuf);
  40. t.safestop();
  41. return0;
  42. }
3.向线程PostMessage,并传递内存块参数
假如我们书写了一个录音子线程,如何将录制的语音数据传递给其他线程呢,常规做法是创建一个缓冲,将语音数据填充进去,然后将缓冲地址作为参数传递,这种做法要求接收线程不能忘记删除,否则会导致内存泄漏。
幸运的是boost提供了智能指针,可以类似java,c#的智能内存回收一样来管理内存分配,我们如果使用这个对象来作为参数传递,就可以完美的防范内存泄漏行为,就算子线程没有处理,别担心,内存它会自动回收的。
  1. #include"controlled_module_ex.hpp"
  2. structmystruct
  3. {
  4. boost::shared_ptr<char>data;
  5. intdatalen;
  6. };
  7. classthdex:publiccontrolled_module_ex
  8. {
  9. protected:
  10. virtualvoidmessage(const_command&cmd)
  11. {
  12. controlled_module_ex::message(cmd);
  13. if(cmd.nCmd==BM_USER+1)
  14. {
  15. cout<<"getsharedptr"<<endl;//仅仅得到数据,得不到数据长度
  16. }
  17. if(cmd.nCmd==BM_USER+2)
  18. {
  19. mystructpar=boost::any_cast<mystruct>(cmd.anyParam);
  20. cout<<"getsharedptrlen:"<<par.datalen<<endl;
  21. }
  22. }
  23. };
  24. int_tmain(intargc,_TCHAR*argv[])
  25. {
  26. thdext;
  27. t.safestart();
  28. t.postmessage(BM_USER+1,boost::shared_ptr<char>(newchar[1000]));
  29. mystructpar;
  30. par.datalen=1000;
  31. par.data=boost::shared_ptr<char>(newchar[par.datalen]);
  32. t.postmessage(BM_USER+2,par);
  33. charbuf[10];
  34. gets_s(buf,sizeofbuf);
  35. t.safestop();
  36. return0;
  37. }
3.向线程SendMessage
函数controlled_module_ex::execute完成这个工作
虚拟函数controlled_module_ex::on_command(const unsigned int command,const boost::any par)响应消息处理
  1. #include"controlled_module_ex.hpp"
  2. classthdex:publiccontrolled_module_ex
  3. {
  4. protected:
  5. boost::anyon_command(constunsignedintcommand,constboost::anypar)
  6. {
  7. if(command==1)
  8. {
  9. cout<<"oncommand"<<endl;
  10. return0;
  11. }
  12. if(command==2)
  13. {
  14. cout<<"oncommand,par:"<<boost::any_cast<string>(par)<<endl;
  15. return0;
  16. }
  17. if(command==3)
  18. {
  19. returntrue;
  20. }
  21. else
  22. returncontrolled_module_ex::on_command(command,par);
  23. }
  24. };
  25. int_tmain(intargc,_TCHAR*argv[])
  26. {
  27. thdext;
  28. t.safestart();
  29. t.execute(1,0);//等待子线程处理完成
  30. t.execute(2,string("helloworld"));//带参数等待子线程完成
  31. boolrs=boost::any_cast<bool>(t.execute(3,0));//等待子线程处理完成,并取得返回值
  32. cout<<"getthreadresult:"<<rs<<endl;
  33. boost::anytimeout=t.execute(4,0,1000);//等待子线程处理,超时1秒
  34. if(timeout.empty())
  35. cout<<"timeout"<<endl;
  36. charbuf[10];
  37. gets_s(buf,sizeofbuf);
  38. t.safestop();
  39. return0;
  40. }
4.定时器
类似于CWnd::OnTimer,controlled_module_ex也提供一个虚拟函数virtual void on_timer(const controlled_timer * p);来处理定时
  1. #include"controlled_module_ex.hpp"
  2. classthdex:publiccontrolled_module_ex
  3. {
  4. protected:
  5. virtualvoidon_timer(constcontrolled_timer*p)
  6. {
  7. cout<<"ontimer"<<endl;
  8. }
  9. };
  10. int_tmain(intargc,_TCHAR*argv[])
  11. {
  12. thdext;
  13. controlled_timertimer;
  14. t.safestart();
  15. timer.starttimer(1000,&t);
  16. charbuf[10];
  17. gets_s(buf,sizeofbuf);
  18. t.safestop();
  19. return0;
  20. }
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics