`
helpbs
  • 浏览: 1162758 次
文章分类
社区版块
存档分类
最新评论

BOOST 线程完全攻略 - 扩展 - 事务线程

 
阅读更多

转自:http://blog.csdn.net/IamNieo/archive/2008/09/10/2910595.aspx

什么叫事务线程

举个例子:
我们写一个IM客户端的登录子线程,则该子线程会有这么几个事务要处理
No.1 TCP Socket物理连接
No.2 逻辑登录
No.3 好友在线查询
No.4 状态更新
我们通常的代码写法是
  1. voidThreadLogin()
  2. {
  3. try
  4. {
  5. if(fail(物理连接))
  6. throw;
  7. if(fail(登录))
  8. throw;
  9. if(fail(查询好友))
  10. throw;
  11. if(fail(更新))
  12. throw;
  13. }
  14. catch(exception)
  15. {
  16. }
  17. }
串行的逻辑用串行的代码写,不太好看,况且中途如果主线程发出取消指令,还不好处理。
这里扩展的thread类,就是来解决这个问题的,他会提供给程序员一种事件处理的模式
  1. classthreadLogin
  2. {
  3. voidonEventConnect()
  4. {
  5. 物理连接
  6. }
  7. voidonEventLogin()
  8. {
  9. 登录
  10. }
  11. voidonEventQuery()
  12. {
  13. 查询
  14. }
  15. voidonEventUpdate()
  16. {
  17. 更新
  18. }
  19. }
源码如下
  1. //thread.hpp:controlled_module_ex类的扩展
  2. //增强线程事务处理能力
  3. #pragmaonce
  4. #include"controlled_module_ex.hpp"
  5. classthread:publiccontrolled_module_ex
  6. {
  7. protected:
  8. staticconstintNONE=-1;
  9. staticconstintWAITING=-2;
  10. staticconstintDONE=-3;
  11. staticconstintFAILED=-4;
  12. protected:
  13. structprocess
  14. {
  15. intlevel;
  16. intstatus;
  17. intsequence;
  18. inttrycount;
  19. inttryindex;
  20. std::stringlasterror;
  21. doubletimeout;
  22. boolbTimeout;
  23. };
  24. processm_process;
  25. controlled_timerm_timer_process;
  26. intm_process_begin,m_process_end;
  27. doublem_timeout_default;
  28. public:
  29. voidstartprocess(intprocess_begin,intprocess_end,doubletimeout_default=1.0,intcycle=1000)
  30. {
  31. m_process_begin=process_begin;
  32. m_process_end=process_end;
  33. m_timeout_default=timeout_default;
  34. m_process.level=m_process_begin;
  35. m_process.tryindex=0;
  36. this->postmessage(BM_RING_PROCESS);
  37. m_timer_process.starttimer(cycle,this);
  38. }
  39. voidtryagain()
  40. {
  41. if(this->m_process.level==thread::NONE)
  42. return;
  43. this->m_process.tryindex++;
  44. if(this->m_process.trycount>0&&this->m_process.tryindex>=this->m_process.trycount)
  45. {
  46. this->fail();
  47. }
  48. else
  49. this->postmessage(BM_RING_PROCESS);
  50. }
  51. voidnext()
  52. {
  53. if(this->m_process.level==thread::NONE)
  54. return;
  55. if(this->m_process.level>=this->m_process_end)
  56. {
  57. this->m_timer_process.stoptimer();
  58. this->postmessage(BM_RING_PROCESSEND);
  59. }
  60. else
  61. {
  62. this->m_process.tryindex=0;
  63. this->m_process.level++;
  64. this->m_process.bTimeout=false;
  65. this->postmessage(BM_RING_PROCESS);
  66. }
  67. }
  68. voidfail()
  69. {
  70. m_process.level=thread::NONE;
  71. this->m_timer_process.stoptimer();
  72. this->postmessage(BM_RING_PROCESSFAIL);
  73. }
  74. virtualvoidon_safestart()
  75. {
  76. m_process.level=thread::NONE;
  77. m_process.status=thread::NONE;
  78. m_process_begin=m_process_end=thread::NONE;
  79. controlled_module_ex::on_safestart();
  80. }
  81. virtualvoidon_safestop()
  82. {
  83. m_timer_process.stoptimer();
  84. controlled_module_ex::on_safestop();
  85. }
  86. virtualvoidmessage(const_command&cmd)
  87. {
  88. controlled_module_ex::message(cmd);
  89. if(cmd.nCmd==BM_RING_PROCESS)
  90. {
  91. this->on_process();
  92. }
  93. if(cmd.nCmd==BM_RING_PROCESSEND)
  94. {
  95. this->m_process.level=thread::NONE;
  96. this->on_process_end();
  97. }
  98. if(cmd.nCmd==BM_RING_PROCESSFAIL)
  99. {
  100. this->m_process.level=thread::NONE;
  101. this->on_process_fail();
  102. }
  103. }
  104. virtualvoidon_timer(constcontrolled_timer*p)
  105. {
  106. if(p==this->m_timer_process)
  107. {
  108. if(this->m_process.level!=thread::NONE)
  109. {
  110. if(this->m_process.level>=this->m_process_begin&&this->m_process.level<=this->m_process_end)
  111. {
  112. if(this->m_process.status==thread::NONE)
  113. {
  114. this->m_process.level=this->m_process_begin;
  115. m_process.tryindex=0;
  116. on_process();
  117. }
  118. elseif(this->m_process.status==thread::WAITING)
  119. {
  120. if(this->m_process.timeout>0)
  121. {
  122. time_tcur;
  123. time(&cur);
  124. if(difftime(cur,(time_t)this->m_process.sequence)>this->m_process.timeout)
  125. {
  126. this->m_process.bTimeout=true;
  127. this->tryagain();
  128. }
  129. }
  130. }
  131. elseif(this->m_process.status==thread::FAILED)
  132. {
  133. this->tryagain();
  134. }
  135. elseif(this->m_process.status==thread::DONE)
  136. {
  137. this->m_process.level++;
  138. m_process.tryindex=0;
  139. this->on_process();
  140. }
  141. }
  142. }
  143. }
  144. }
  145. virtualvoidon_process()
  146. {
  147. time((time_t*)&m_process.sequence);
  148. m_process.timeout=m_timeout_default;
  149. m_process.status=thread::WAITING;
  150. m_process.trycount=-1;
  151. }
  152. virtualvoidon_process_end(){}
  153. virtualvoidon_process_fail(){}
  154. intget_sequence(){returnm_process.sequence;}
  155. voidput_timeout(doublev){m_process.timeout=v;}
  156. voidput_trycount(intv){m_process.trycount=v;}
  157. intget_level(){returnm_process.level;}
  158. voidput_level(intv){m_process.level=v;}
  159. std::stringget_lasterror(){returnm_process.lasterror;}
  160. voidput_lasterror(std::stringv){m_process.lasterror=v;}
  161. __declspec(property(put=put_trycount))inttrycount;
  162. __declspec(property(put=put_timeout))doubletimeout;
  163. __declspec(property(get=get_level,put=put_level))intlevel;
  164. __declspec(property(get=get_sequence))intsequence;
  165. __declspec(property(get=get_lasterror,put=put_lasterror))std::stringlasterror;
  166. };
虚拟函数thread::on_process()处理各种事务事件
虚拟函数thread::on_process_end()是所有事务处理完毕事件
虚拟函数thread::on_process_fail()是事务处理出现错误,这时所有事务被取消,线程终止
这里给一个简单的范例,
总共线程要完成3件事务,其中第二个事务要求用户确认是否继续
  1. #definePROCESS_11
  2. #definePROCESS_22
  3. #definePROCESS_33
  4. classthdex:publicthread
  5. {
  6. public:
  7. virtualvoidon_process()
  8. {
  9. thread::on_process();
  10. if(this->level==PROCESS_1)
  11. {
  12. cout<<"workonprocess1..."<<endl;
  13. Sleep(100);
  14. cout<<"process1done."<<endl;
  15. this->next();
  16. }
  17. elseif(this->level==PROCESS_2)
  18. {
  19. cout<<"workonprocess2..."<<endl;
  20. this->timeout=-1;
  21. if(IDNO==::MessageBox(0,"areyourwantcontinue?","ask",MB_ICONQUESTION|MB_YESNO))
  22. {
  23. this->lasterror="canceledbyuser";
  24. this->fail();
  25. }
  26. else
  27. {
  28. Sleep(100);
  29. cout<<"process2done."<<endl;
  30. this->next();
  31. }
  32. }
  33. elseif(this->level==PROCESS_3)
  34. {
  35. cout<<"workonprocess3..."<<endl;
  36. Sleep(100);
  37. cout<<"process3done."<<endl;
  38. this->next();
  39. }
  40. }
  41. virtualvoidon_process_fail()
  42. {
  43. cout<<this->lasterror<<endl;
  44. }
  45. virtualvoidon_process_end()
  46. {
  47. cout<<"allprocessdone."<<endl;
  48. }
  49. };
  50. int_tmain(intargc,_TCHAR*argv[])
  51. {
  52. thdext;
  53. t.safestart();
  54. t.startprocess(PROCESS_1,PROCESS_3);
  55. charbuf[10];
  56. gets_s(buf,sizeofbuf);
  57. t.safestop();
  58. return0;
  59. }
thread事务还支持超时设定和重试次数设定,这里就不做介绍,读者可以自己研究代码。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics