前言:
关于线程池(thread pool)的概念请参考http://en.wikipedia.org/wiki/Thread_pool_pattern。在Python中使用线程是有硬伤的,因为Python(这里指C语言实现的Python)的基本调用都最后生成对应C语言的函数调用,因此Python中使用线程的开销太大,不过可以使用Stackless Python(Python的一个修改版)来增强Python中使用线程的表现。
同时由于Python中GIL的存在,导制在使用多CPU时Python无法充分利用多个CPU,目前pysco这个模块可以针对多CPU提高Python的效率。
在C语言里要实现个线程池,就要面对一堆的指针,还有pthread这个库中那些看起来很让人头痛的一些函数:
int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void*), void *restrict arg);
而如果用Python来实现一个线程池的话就好多了,不仅结构十分清晰,而且代码看起来会很优美:
PYTHON:
-
import threading
-
from time import sleep
-
-
class ThreadPool:
-
-
"""Flexible thread pool class. Creates a pool of threads, then
-
accepts tasks that will be dispatched to the next available
-
thread."""
-
-
def __init__(self, numThreads):
-
-
"""Initialize the thread pool with numThreads workers."""
-
-
self.__threads = []
-
self.__resizeLock = threading.Condition(threading.Lock())
-
self.__taskLock = threading.Condition(threading.Lock())
-
self.__tasks = []
-
self.__isJoining = False
-
self.setThreadCount(numThreads)
-
-
def setThreadCount(self, newNumThreads):
-
-
""" External method to set the current pool size. Acquires
-
the resizing lock, then calls the internal version to do real
-
work."""
-
-
# Can't change the thread count if we're shutting down the pool!
-
if self.__isJoining:
-
return False
-
-
self.__resizeLock.acquire()
-
try:
-
self.__setThreadCountNolock(newNumThreads)
-
finally:
-
self.__resizeLock.release()
-
return True
-
-
def __setThreadCountNolock(self, newNumThreads):
-
-
"""Set the current pool size, spawning or terminating threads
-
if necessary. Internal use only; assumes the resizing lock is
-
held."""
-
-
# If we need to grow the pool, do so
-
while newNumThreads > len(self.__threads):
-
newThread = ThreadPoolThread(self)
-
self.__threads.append(newThread)
-
newThread.start()
-
# If we need to shrink the pool, do so
-
while newNumThreads <len(self.__threads):
-
self.__threads[0].goAway()
-
del self.__threads[0]
-
-
def getThreadCount(self):
-
-
"""Return the number of threads in the pool."""
-
-
self.__resizeLock.acquire()
-
try:
-
return len(self.__threads)
-
finally:
-
self.__resizeLock.release()
-
-
def queueTask(self, task, args=None, taskCallback=None):
-
-
"""Insert a task into the queue. task must be callable;
-
args and taskCallback can be None."""
-
-
if self.__isJoining == True:
-
return False
-
if not callable(task):
-
return False
-
-
self.__taskLock.acquire()
-
try:
-
self.__tasks.append((task, args, taskCallback))
-
return True
-
finally:
-
self.__taskLock.release()
-
-
def getNextTask(self):
-
-
""" Retrieve the next task from the task queue. For use
-
only by ThreadPoolThread objects contained in the pool."""
-
-
self.__taskLock.acquire()
-
try:
-
if self.__tasks == []:
-
return (None, None, None)
-
else:
-
return self.__tasks.pop(0)
-
finally:
-
self.__taskLock.release()
-
-
def joinAll(self, waitForTasks = True, waitForThreads = True):
-
-
""" Clear the task queue and terminate all pooled threads,
-
optionally allowing the tasks and threads to finish."""
-
-
# Mark the pool as joining to prevent any more task queueing
-
self.__isJoining = True
-
-
# Wait for tasks to finish
-
if waitForTasks:
-
while self.__tasks != []:
-
sleep(.1)
-
-
# Tell all the threads to quit
-
self.__resizeLock.acquire()
-
try:
-
self.__setThreadCountNolock(0)
-
self.__isJoining = True
-
-
# Wait until all threads have exited
-
if waitForThreads:
-
for t in self.__threads:
-
t.join()
-
del t
-
-
# Reset the pool for potential reuse
-
self.__isJoining = False
-
finally:
-
self.__resizeLock.release()
-
-
class ThreadPoolThread(threading.Thread):
-
""" Pooled thread class. """
-
-
threadSleepTime = 0.1
-
-
def __init__(self, pool):
-
-
""" Initialize the thread and remember the pool. """
-
-
threading.Thread.__init__(self)
-
self.__pool = pool
-
self.__isDying = False
-
-
def run(self):
-
-
""" Until told to quit, retrieve the next task and execute
-
it, calling the callback if any. """
-
-
while self.__isDying == False:
-
cmd, args, callback = self.__pool.getNextTask()
-
# If there's nothing to do, just sleep a bit
-
if cmd is None:
-
sleep(ThreadPoolThread.threadSleepTime)
-
elif callback is None:
-
cmd(args)
-
else:
-
callback(cmd(args))
-
-
def goAway(self):
-
-
""" Exit the run loop next time through."""
-
-
self.__isDying = True
这段100多行的代码完成了一个可动态改变的线程池,并且包含了详细的注释,这里是代码的出处。我觉得这段代码比Python官方给出的那个还要好些。他们实现的原理都是一样的,使用了一个队列(Queue)来存储任务。
关于Python中线程同步的问题,这里有不错的介绍。
也不怎么简单嘛 ~~
thread sucks...
C对Thread的支持有问题吗?这个应该看内核和pthread库以及其他库怎么样吧?
我最近一直线程着呢,感觉Linux的线程确实不怎么样,我现在的感觉是它在凑合~
你们公司主要用python做开发?
俺们公司用C++;
老大是越来越老糊涂了,俺什么时候说C对Thread的支持有问题!!
老大说Linux的线程不怎么样,你的意思是Windows的好还是Mac的好!!