Raphael +
  • Stay hungry, Stay foolish.
  • albus.zly@gmail.com
    albus12138

    使用Python投票

    同学参加了一个比赛,有一个网投的环节,帮同学写了个程序→_→【好吧,刷票是不对的,但票数看的过去的又有几个人没刷呢。。。废话不多说了,进入正题:

    正文

    投票逻辑

    投票的实现可以有很多方法,如:socket、urllib2、httplib等等…在这里我选择了urllib2 感谢比赛主办方没做图片验证码Orz不然。。。

    import random
    
    def get_user_agent():
        user_agents = [
            "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.71 Safari/537.1 LBBROWSER",
            "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; QQBrowser/7.0.3698.400) ",
            "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SV1; QQDownload 732; .NET4.0C; .NET4.0E; 360SE)",
            "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:2.0b13pre) Gecko/20110307 Firefox/4.0b13pre",
            "Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US) AppleWebKit/534.16 (KHTML, like Gecko) Chrome/10.0.648.133 Safari/534.16",
            "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)"
        ]
        return random.choice(user_agents)
    
    def poll(proxy)
        try:
            #使用代理IP
            proxy_handler = urllib2.ProxyHandler({'http': proxy})
            opener = urllib2.build_opener(proxy_handler)
            urllib2.install_opener(opener)
            request = urllib2.Request('http://example.com/vote/Vote?voteId=233')
            #模仿用户浏览器User-Agent,不必要,没有需求可以不加
            request.add_header('User-Agent', user_agent())
            #添加Referer,不必要
            request.add_header('Referer', 'http://example.com/vote/Vote')
            response = opener.open(request, timeout=10)
            print "Result %s." % response.read()
            return True
        except:
            return False

    获取代理

    网投基本上都是限制每个IP地址投票数或者是多长时间内投一票,所以我们要通过代理来隐藏我们的真实IP地址。你可以选择在一些网站上面购买免费的代理,或是写个爬虫去获取各种免费代理,百度上面搜索一下就有一堆这样的网站。因为程序运行过程中需要经常读写代理信息,所以使用数据库来存储这些数据,查询起来比较方便。

    import re, sqlite, urllib2, os, sys
    
    def get_proxy(num):
        #URL替换为你所购买的代理商提供的API,用num替换URL中的数量部分
        response = urllib2.urlopen('URL')
        html = response.read()
        #通过正则将获取到的代理转换为列表,便于后续使用
        regex = re.compile(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5})')
        ip_list = regex.findall(html)
        #存入数据库
        con = sqlite3.connect(os.path.abspath(os.path.dirname(sys.argv[0]))+"/ipAdress.db")
        cur = con.cursor()
        for ip in ip_list:
            cur.execute("select * from ipadress where ip='http://%s'" % ip)
            if cur.fetchall() == []:
                con.execute("insert into ipadress values ('http://%s','yes')" % ip)
                con.commit()
            print "Proxy: %s is OK." % ip
        con.close()
        print "Done!"

    多线程

    因为某宝上面的收费代刷的速度实在逆天,简单的程序已经不能满足需求,所以有两个想法,一是多线程,二是非阻塞式【这个不会做TAT

    import thread, sqlite
    
    #投票逻辑线程
    class PollThread(threading.Thread):
        def __init__(self, threadID, name, proxy):
            threading.Thread.__init__(self)
            self.threadID = threadID
            self.name = name
            self.proxy = []
            con = sqlite3.connect(os.path.abspath(os.path.dirname(sys.argv[0]))+"/ipAdress.db")
            cur = con.cursor()
            for i in proxy:
                for j in i['http']:
                    self.proxy.append({'http': j[1]})
                    cur.execute("update ipadress set status='no' where ip='%s'" % j[1])
            con.commit()
            con.close()
    
    
        def run(self):
            while self.proxy != []:
                con = sqlite3.connect(os.path.abspath(os.path.dirname(sys.argv[0]))+"/ipAdress.db")
                cur = con.cursor()
                for item in self.proxy:
                    if not poll(item):
                        self.proxy.remove(item)
                        cur.execute("delete from ipadress where ip='%s'" % item["http"])
                        con.commit()
                        new_proxy=[]
                        while new_proxy == []:
                            cur.execute("select * from ipadress where status='yes' limit 1")
                            new_proxy = cur.fetchall()
                            if new_proxy != []:
                                print "%s: No Proxy To Use!" % self.name
                                cur.execute("update ipadress set status='no' where ip='%s'" % new_proxy[0][1])
                                con.commit()
                        self.proxy += [{'http': new_proxy[0][1]},]
                        print "%s: 10060 Error" % self.name
                con.close()
            print "Exiting thread %s." % self.name
            f.close()
    
    #获取代理线程
    class GetProxyThread(threading.Thread):
        def __init__(self, threadID, name):
            threading.Thread.__init__(self)
            self.threadID = threadID
            self.name = name
    
        def run(self):
            con = sqlite3.connect(os.path.abspath(os.path.dirname(sys.argv[0]))+"/ipAdress.db")
            cur = con.cursor()
            while True:
                try:
                    cur.execute("select * from ipadress where status='yes'")
                    proxy_list = cur.fetchall()
                    # 建议设定数值为线程数*1.5
                    if len(proxy_list) < 15:
                        get_proxy(15)
                    time.sleep(1)
                except:
                    print "Warning: Get Proxy Failed!!!!"
            con.close()
    
    
    if __name__ == '__main__':
        con = sqlite3.connect(os.path.abspath(os.path.dirname(sys.argv[0]))+"/ipAdress.db")
        cur = con.cursor()
        try:
            cur.execute("create table ipadress (ip varchar(21),status varchar(3))")
        except:
            pass
    
        cur.execute("update ipadress set status='yes' where status='no'")
        con.commit()
        cur.execute("select * from ipadress where status='yes'")
        print "Available Proxies: %s" % len(cur.fetchall())
    
        thread_getproxy = GetProxyThread(1, "Thread-GetProxy")
        thread_getproxy.start()
        time.sleep(1)
    
        print "Start polling..."
        threads = []
        #线程数
        thread_ans = 10
        #每个线程分配的代理数
        proxy_ans = 5
        cur.execute("select * from ipadress where status='yes'")
        proxies = cur.fetchall()
        if len(proxies) < thread_ans*proxy_ans:
            get_proxy(thread_ans*proxy_ans-len(proxies))
            cur.execute("select * from ipadress where status='yes'")
            proxies = cur.fetchall()
        for i in range(1, thread_ans+1):
            threads.append(PollThread(1, "Thread-Poll-"+str(i), [{'http': proxies[proxy_ans*(i-1):proxy_ans*i]},]))
        con.close()
        for thread in threads:
            thread.start()

    总结

    虽然使用多线程之后速度有了明显提升,但仍然比不上专业代刷QAQ,而且多线程对于服务器性能的压力比较大…我的服务器单核CPU吃不消+_+ 还有就是尝试做了一下日志文件,但查看起来很不方便,并没有想到更好的解决办法。

    Blog

    Coding

    Bookmarks