多线程爬取国内高匿代理ip构建IP池

多线程爬取国内高匿代理ip构建IP池

做爬虫的经常会需要构建免费代理IP池,话不多说,上代码


import requests
from lxml import etree
import time
import csv
import threading
import queue
mu = threading.Lock()  # 创建一个锁

class MyThread(threading.Thread):
   def __init__(self, func):
       threading.Thread.__init__(self)
       self.func = func

   def run(self):
       self.func()



#生产者
def get_proxy():
   addr_ip_list = list()
   for i in range(1, 20):

       print('正在获取第{}页的数据'.format(i))
       url = 'https://www.kuaidaili.com/free/inha/{}/'.format(i)

       header = {
           "User-Agent": "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)"}

       resp = requests.get(url, headers=header)

       time.sleep(1)
       ip_xml = etree.HTML(resp.text)
       ip = ip_xml.xpath('//*[@id="list"]/table/tbody/tr/td[1]/text()')
       port = ip_xml.xpath('//*[@id="list"]/table/tbody/tr/td[2]/text()')

       zip_dict = list(zip(ip, port))

       for addr in zip_dict:
           addr_ip = addr[0] + ':' + addr[1]
           # print(addr_ip)
           data_ip.put(addr_ip)
           addr_ip_list.append(addr_ip)
   print(len(addr_ip_list))
   # test_agent(addr_ip_list)
   # data_ip.put(addr_ip_list)
   # main()



def write_file(data):
   #获取锁的状态,一个线程正在写入,其他的等待
   if mu.acquire(True):


       # for data in datas:
       with open('all_ip.txt', 'a+') as f:
           f.write(data + '\n')
       #写完释放锁
       mu.release()


def test_agent(item):
   """测试代理是否有效"""
   url = 'http://httpbin.org/ip'
   all_ip = list()
   # for proxy_addr in data:

   try:

       resp = requests.get(url, proxies={'http': 'http://%s' % item}, timeout=10)
       print(resp.status_code, resp.text)
       # all_ip.append(item)
       write_file(item)

   except Exception as e:
       pass
       # print(e)
       # continue
   # print(all_ip)
   # write_file(all_ip)


# 消费者
def worker():
   while not data_ip.empty():
       item = data_ip.get()  # 或得任务
       test_agent(item)
       # print('Processing : ', item)
       threadName = threading.current_thread().getName()
       tid = threading.current_thread().ident
       print('我的名字是{},我的id是{},我测试的ip是{}'.format(threadName,tid,item))
       # time.sleep(1)


def main():
   get_proxy()
   threads = []
   for i in range(threadNum):  # 开启三个线程
       thread = MyThread(worker)
       thread.start()
       threads.append(thread)
   for thread in threads:
       thread.join()

if __name__ == '__main__':
   data_ip = queue.Queue()

   threadNum = 10

   main()

测试可用的ip写入了文件中 需要可以直接取


版权声明:本文为qq_43511026原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。