2008年09月28日

前言:
关于线程池(thread pool)的概念请参考http://en.wikipedia.org/wiki/Thread_pool_pattern。在中使用线程是有硬伤的,因为(这里指C语言实现的)的基本调用都最后生成对应C语言的函数调用,因此中使用线程的开销太大,不过可以使用Stackless (的一个修改版)来增强中使用线程的表现。
同时由于中GIL的存在,导制在使用多CPU时无法充分利用多个CPU,目前pysco这个模块可以针对多CPU提高的效率。

在C语言里要实现个线程池,就要面对一堆的指针,还有pthread这个库中那些看起来很让人头痛的一些函数:
int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void*), void *restrict arg);
而如果用来实现一个线程池的话就好多了,不仅结构十分清晰,而且代码看起来会很优美:

:
  1. import  threading
  2. from  time  import  sleep
  3.  
  4. class  ThreadPool:
  5.  
  6.      """Flexible  thread  pool  class.   Creates  a  pool  of  threads,  then
  7.     accepts  tasks  that  will  be  dispatched  to  the  next  available
  8.     thread."""
  9.      
  10.      def  __init__(self,  numThreads):
  11.  
  12.          """Initialize  the  thread  pool  with  numThreads  workers."""
  13.          
  14.          self.__threads  =  []
  15.          self.__resizeLock  =  threading.Condition(threading.Lock())
  16.          self.__taskLock  =  threading.Condition(threading.Lock())
  17.          self.__tasks  =  []
  18.          self.__isJoining  =  False
  19.          self.setThreadCount(numThreads)
  20.  
  21.      def  setThreadCount(self,  newNumThreads):
  22.  
  23.          """  External  method  to  set  the  current  pool  size.   Acquires
  24.         the  resizing  lock,  then  calls  the  internal  version  to  do  real
  25.         work."""
  26.          
  27.          #  Can't  change  the  thread  count  if  we're  shutting  down  the  pool!
  28.          if  self.__isJoining:
  29.              return  False
  30.          
  31.          self.__resizeLock.acquire()
  32.          try:
  33.              self.__setThreadCountNolock(newNumThreads)
  34.          finally:
  35.              self.__resizeLock.release()
  36.          return  True
  37.  
  38.      def  __setThreadCountNolock(self,  newNumThreads):
  39.          
  40.          """Set  the  current  pool  size,  spawning  or  terminating  threads
  41.         if  necessary.   Internal  use  only;  assumes  the  resizing  lock  is
  42.         held."""
  43.          
  44.          #  If  we  need  to  grow  the  pool,  do  so
  45.          while  newNumThreads >  len(self.__threads):
  46.              newThread  =  ThreadPoolThread(self)
  47.              self.__threads.append(newThread)
  48.              newThread.start()
  49.          #  If  we  need  to  shrink  the  pool,  do  so
  50.          while  newNumThreads  <len(self.__threads):
  51.              self.__threads[0].goAway()
  52.              del  self.__threads[0]
  53.  
  54.      def  getThreadCount(self):
  55.  
  56.          """Return  the  number  of  threads  in  the  pool."""
  57.          
  58.          self.__resizeLock.acquire()
  59.          try:
  60.              return  len(self.__threads)
  61.          finally:
  62.              self.__resizeLock.release()
  63.  
  64.      def  queueTask(self,  task,  args=None,  taskCallback=None):
  65.  
  66.          """Insert  a  task  into  the  queue.   task  must  be  callable;
  67.         args  and  taskCallback  can  be  None."""
  68.          
  69.          if  self.__isJoining  ==  True:
  70.              return  False
  71.          if  not  callable(task):
  72.              return  False
  73.          
  74.          self.__taskLock.acquire()
  75.          try:
  76.              self.__tasks.append((task,  args,  taskCallback))
  77.              return  True
  78.          finally:
  79.              self.__taskLock.release()
  80.  
  81.      def  getNextTask(self):
  82.  
  83.          """  Retrieve  the  next  task  from  the  task  queue.   For  use
  84.         only  by  ThreadPoolThread  objects  contained  in  the  pool."""
  85.          
  86.          self.__taskLock.acquire()
  87.          try:
  88.              if  self.__tasks  ==  []:
  89.                  return  (None,  None,  None)
  90.              else:
  91.                  return  self.__tasks.pop(0)
  92.          finally:
  93.              self.__taskLock.release()
  94.      
  95.      def  joinAll(self,  waitForTasks  =  True,  waitForThreads  =  True):
  96.  
  97.          """  Clear  the  task  queue  and  terminate  all  pooled  threads,
  98.         optionally  allowing  the  tasks  and  threads  to  finish."""
  99.          
  100.          #  Mark  the  pool  as  joining  to  prevent  any  more  task  queueing
  101.          self.__isJoining  =  True
  102.  
  103.          #  Wait  for  tasks  to  finish
  104.          if  waitForTasks:
  105.              while  self.__tasks  !=  []:
  106.                  sleep(.1)
  107.  
  108.          #  Tell  all  the  threads  to  quit
  109.          self.__resizeLock.acquire()
  110.          try:
  111.              self.__setThreadCountNolock(0)
  112.              self.__isJoining  =  True
  113.  
  114.              #  Wait  until  all  threads  have  exited
  115.              if  waitForThreads:
  116.                  for  t  in  self.__threads:
  117.                      t.join()
  118.                      del  t
  119.  
  120.              #  Reset  the  pool  for  potential  reuse
  121.              self.__isJoining  =  False
  122.          finally:
  123.              self.__resizeLock.release()
  124.        
  125. class  ThreadPoolThread(threading.Thread):
  126.       """  Pooled  thread  class.  """
  127.      
  128.      threadSleepTime  =  0.1
  129.  
  130.      def  __init__(self,  pool):
  131.  
  132.          """  Initialize  the  thread  and  remember  the  pool.  """
  133.          
  134.          threading.Thread.__init__(self)
  135.          self.__pool  =  pool
  136.          self.__isDying  =  False
  137.          
  138.      def  run(self):
  139.  
  140.          """  Until  told  to  quit,  retrieve  the  next  task  and  execute
  141.         it,  calling  the  callback  if  any.   """
  142.          
  143.          while  self.__isDying  ==  False:
  144.              cmd,  args,  callback  =  self.__pool.getNextTask()
  145.              #  If  there's  nothing  to  do,  just  sleep  a  bit
  146.              if  cmd  is  None:
  147.                  sleep(ThreadPoolThread.threadSleepTime)
  148.              elif  callback  is  None:
  149.                  cmd(args)
  150.              else:
  151.                  callback(cmd(args))
  152.      
  153.      def  goAway(self):
  154.  
  155.          """  Exit  the  run  loop  next  time  through."""
  156.          
  157.          self.__isDying  =  True

这段100多行的代码完成了一个可动态改变的线程池,并且包含了详细的注释,这里是代码的出处。我觉得这段代码比Python官方给出的那个还要好些。他们实现的原理都是一样的,使用了一个队列(Queue)来存储任务。

关于Python中线程同步的问题,这里有不错的介绍。

标签 :

5 楼了已经

  • luguo写于08年09月28日

    也不怎么简单嘛 ~~
    thread sucks... :-)

  • Amankwah写于08年09月28日

    C对Thread的支持有问题吗?这个应该看内核和pthread库以及其他库怎么样吧?
    我最近一直线程着呢,感觉Linux的线程确实不怎么样,我现在的感觉是它在凑合~

  • crazyfranc写于08年10月03日

    你们公司主要用python做开发?

  • 可可熊写于08年10月10日

    俺们公司用C++;
    老大是越来越老糊涂了,俺什么时候说C对Thread的支持有问题!!

  • 可可熊写于08年10月10日

    老大说Linux的线程不怎么样,你的意思是Windows的好还是Mac的好!!

发表评论

在下面加入你的评论,或者 trackback 从你的博客站点。 订阅本文的评论。

:

:

:

« wordpress漏洞利用-更改任意用户的密码
» configure时遇到的问题