教你如何在 Pycharm 中制作自己的爬虫代码模板

!/usr/bin/python3 -- coding=utf-8 -- @Author : lhys @FileName: proxy_tool.py import requests
import threading
timeout = 300
lock = threading.Lock()
请求头用自己的 headers = {

'': ''

【教你如何在 Pycharm 中制作自己的爬虫代码模板】}
class MyProxy:
def __init__(self, proxy_api='', proxy_server='', max_use=5000, try_count=5): if not (proxy_api or proxy_server): raise TypeError('Proxy_api and proxy_server cannot be empty at the same time.') self.proxies = None if not proxy_server else { 'http': proxy_server, 'https': proxy_server } # 代理API self.proxy_api = proxy_api # 代理 IP 最大使用次数 self.max_use = max_use # 测试代理 IP 次数,超过次数即认为代理 IP 不可用 self.try_count = try_count # 是否爬虫请求出错,如果出错,直接更换 IP self.flag = 0 # 代理 IP 剩余生存时间 self.proxy_ttl = 0 # 各种锁 self.lock = threading.Lock() self.ttl_lock = threading.Lock() self.flag_lock = threading.Lock() def set_flag(self): self.flag_lock.acquire() self.flag = 1 self.flag_lock.release() def get_flag(self): self.flag_lock.acquire() flag = self.flag self.flag_lock.release() return flag def decrease_ttl(self): self.ttl_lock.acquire() self.proxy_ttl -= 1 self.ttl_lock.release() def get_ttl(self): self.ttl_lock.acquire() ttl = self.proxy_ttl self.ttl_lock.release() return ttl def set_ttl(self): self.ttl_lock.acquire() self.proxy_ttl = self.max_use self.ttl_lock.release() def get_proxy(self): self.lock.acquire() proxy = self.proxies self.lock.release() return proxy def set_proxy(self): if self.proxy_ttl > 0 and self.flag == 0: return old = self.proxies if self.flag == 1: for try_count in range(self.try_count): try: requests.get('https://www.baidu.com', headers=headers, proxies=old, timeout=timeout) print(f'Test proxy {old} successfully.') return except requests.exceptions.ProxyError or requests.exceptions.ConnectionError or requests.exceptions.ConnectTimeout: print(f'Test proxy {old} failed.') break except Exception as e: print(e) if not self.proxy_api: raise ValueError('代理 IP 不可用,且代理 IP API未设置。') while True: res = requests.get(self.proxy_api) # [银行国际代码](https://www.gendan5.com/swiftcode.html)这一部分按照自己的代理 IP 文档来,仅供参考 try: if res.json()["ERRORCODE"] == "0": ip, port = res.json()["RESULT"][0]['ip'], res.json()["RESULT"][0]['port'] self.lock.acquire() self.proxies = { 'http': 'http://%s:%s' % (ip, port), 'https': 'http://%s:%s' % (ip, port) } print(f'Set proxy: {ip}:{port}.') self.flag = 0 self.lock.release() self.set_ttl() return else: print(f'Set proxy failed.') except Exception as e: print(e)

Proxy = MyProxy()
def request_by_proxy(url, use_proxy=True):
while True: try: # 使用代理 if use_proxy: proxy_ttl = Proxy.get_ttl() print(proxy_ttl) # 如果 超过最大使用次数 或者 请求出现错误,重新设置 IP if proxy_ttl <= 0 or Proxy.get_flag(): Proxy.set_proxy() print(Proxy.get_ttl()) proxy = Proxy.get_proxy() lock.acquire() res = requests.get(url, headers=headers, proxies=proxy, timeout=timeout) lock.release() Proxy.decrease_ttl() return res else: res = requests.get(url, headers=headers, timeout=timeout) return res except requests.exceptions.ProxyError as pe: if use_proxy: lock.release() print(f'Proxy {Proxy.proxies} is not available, reason: {pe}.') Proxy.set_flag() except requests.exceptions.Timeout as t: if use_proxy: lock.release() print(f'Time out, reason: {t}.') Proxy.set_flag() except Exception as e: if use_proxy: lock.release() print(e)

!/usr/bin/python3 -- coding=utf-8 -- @Author : lhys @FileName: spider.py import time
import threading
from multiprocessing import Queue
from proxy_tool import request_by_proxy
threshold = 30
queue = Queue()
class Spider(threading.Thread):
def __init__(self, use_proxy=True): super(Spider, self).__init__() self.use_proxy = use_proxy def get_data(self, url): try: res = request_by_proxy(url, self.use_proxy) # 响应处理 pass except Exception as e: print(e) return def run(self): while True: # 如果队列空了,等待一会儿。 # 过了指定的时间后,如果队列出现数据,就继续爬 # 如果队列还是空的,停止线程 if queue.empty(): time.sleep(threshold) if not queue.empty(): url = queue.get() self.get_data(url) time.sleep(threshold) else: print('Queue is empty.') return

    推荐阅读