Scrapy爬虫项目,Scrapy存储为Json文件、Scrapy存入MySQL、Scrapy存入MongoDB,Scrapy项目改造为Scrapy-Redis分布式爬虫、Scrapy项目部署到服务器

1、项目背景及需求

  • 在B站看了一个爬取房天下网站的案例,自己动手敲了敲,改了改
  • 这个网站既卖全国各个城市的新房,也卖二手房,要做的就是爬取各个城市新房的各项信息,各个城市二手房的各种信息
  • 新房的信息有:哪个省份的(province),哪个城市的(city),小区名字(name),价格(price),几居室(rooms),房子面积(area),地址(address),房子属于哪个行政区(district),是否在售(sale),每一套房子详情页面的链接(origin_url)
  • 二手房的信息有:哪个省份的(province),哪个城市的(city),小区名字(name),地址(address),房子的一些基本信息(infos),价格(price),房子单价(unit),每套房子详情页面的链接(origin_url)
  • 以上要爬取的信息在下面Scrapy的items.py文件中可以看到
2、Scrapy爬虫的书写 2.1、项目创建及目录结构 2.1.1、项目创建
  • 打开cmd命令行,进入到想要创建文件的文件夹
  • 输入命令:scrapy startproject fang
  • 输入cd fang命令两次,再输入cd spiders命令,进入爬虫文件夹
  • 输入命令:scrapy genspider sfw fang.com创建爬虫文件
2.1.2、目录结构
Scrapy爬虫项目,Scrapy存储为Json文件、Scrapy存入MySQL、Scrapy存入MongoDB,Scrapy项目改造为Scrapy-Redis分布式爬虫、Scrapy项目部署到服务器
文章图片

2.2、settings.py文件
  • 设置 ROBOTSTXT_OBEY = False
  • 设置DOWNLOAD_DELAY = 3
  • 将以下代码打开,请求头在middlewares.py中书写(下面有)
DEFAULT_REQUEST_HEADERS = { 'Accept': 'text/html,application/xhtml+xml,application/xml; q=0.9,*/*; q=0.8', 'Accept-Language': 'en' }

下载器中间件和管道文件后期代码书写完成之后再打开
2.3、items.py文件 此文件中写要存储的数据
# 新房信息 class NewHouseItem(scrapy.Item): # define the fields for your item here like: # name = scrapy.Field() # 省份 province = scrapy.Field() # 城市 city = scrapy.Field() # 小区的名字 name = scrapy.Field() # 价格 price = scrapy.Field() # 几居, 这是个列表 rooms = scrapy.Field() # 面积 area = scrapy.Field() # 地址 address = scrapy.Field() # 行政区 district = scrapy.Field() # 是否在售 sale = scrapy.Field() # 房天下详情页面的url origin_url = scrapy.Field()# 二手房信息 class ESFHouseItem(scrapy.Item): # 省份 province = scrapy.Field() # 城市 city = scrapy.Field() # 小区名字 name = scrapy.Field() # # 几室几厅 # rooms = scrapy.Field() # # 层 # floor = scrapy.Field() # # 朝向 # toward = scrapy.Field() # # 年代 # year = scrapy.Field() # 地址 address = scrapy.Field() # # 建筑面积 ## area = scrapy.Field() # 总价 price = scrapy.Field() # 单价 unit = scrapy.Field() # 原始的url origin_url = scrapy.Field() # 信息 infos = scrapy.Field()

2.4、sfw.py文件(爬虫文件) 主要的爬虫代码写在此文件中
import scrapy import re from fang.items import NewHouseItem, ESFHouseItemclass SfwSpider(scrapy.Spider): name = 'sfw' allowed_domains = ['fang.com'] start_urls = ['https://www.fang.com/SoufunFamily.htm']def parse(self, response): # 所有城市标签 trs = response.xpath("//div[@class = 'outCont']//tr") province = None # 遍历得到每一行的数据 for tr in trs: # 获取省份和对应城市的两个td标签 tds = tr.xpath(".//td[not(@class)]") # 省份名称 province_text = tds[0] # 省份对应的城市名称及链接 city_info = tds[1] # 提取省份名称 province_text = province_text.xpath(".//text()").get() province_text = re.sub(r"\s", "", province_text) if province_text: province = province_text # 不爬取海外房产 if province == "其它": continue # 提取城市名称及链接 city_links = city_info.xpath(".//a") for city_link in city_links: # 获取城市 city = city_link.xpath(".//text()").get() # 获取城市链接 city_url = city_link.xpath(".//@href").get()# 构建新房链接 url_split = city_url.split("fang") url_former = url_split[0] url_backer = url_split[1] newhouse_url = url_former + "newhouse.fang.com/house/s/" # 构建二手房链接 esf_url = url_former + "esf.fang.com/"# print("++" * 20) # print("省份:", province) # print("城市:", city) # print("新房链接:", newhouse_url) # print("二手房链接:", esf_url) # print("++" * 20)# 返回新房信息再解析 yield scrapy.Request(url=newhouse_url, callback=self.parse_newhouse, meta={"info": (province, city)})# 返回二手房信息再解析 yield scrapy.Request(url=esf_url, callback=self.parse_esf, meta = {"info": (province, city)})# 新房页面解析 def parse_newhouse(self, response): province, city = response.meta.get("info") lis = response.xpath("//div[contains(@class, 'nl_con')]/ul/li[not(@style)]") for li in lis: # 获取房产名字 name = li.xpath(".//div[@class='nlcd_name']/a/text()").get().strip() # 获取几居室 rooms = li.xpath(".//div[contains(@class, 'house_type')]/a//text()").getall() # 获取面积 area = li.xpath(".//div[contains(@class, 'house_type')]/text()").getall() area = "".join(area).strip() area = re.sub(r"/|-|/s| |\n", "", area) # 获取地址 address = li.xpath(".//div[@class = 'address']/a/@title").get() # 获取是哪个区的房子 district = li.xpath(".//div[@class = 'address']/a//text()").getall() district = "".join(district) district = re.search(r".*\[(.+)\].*", district).group(1) # 获取是否在售 sale = li.xpath(".//div[contains(@class, 'fangyuan')]/span/text()").get() # 获取价格 price = li.xpath(".//div[@class = 'nhouse_price']//text()").getall() price = "".join(price).strip() # 获取详情页url origin_url = li.xpath(".//div[@class = 'nlcd_name']/a/@href").get()# 构建item返回 item = NewHouseItem(province = province, city = city, name = name, rooms = rooms, area = area, address = address, district = district, sale = sale, price = price, origin_url = origin_url) yield item# 爬取下一页数据 next_url = response.xpath("//div[@class = 'page']//a[@class = 'next']/@href").get() if next_url: yield scrapy.Request(url=response.urljoin(next_url), callback=self.parse_newhouse, meta={"info": (province, city)})# 二手房页面解析 def parse_esf(self, response): province, city = response.meta.get("info") dls = response.xpath("//div[contains(@class, 'shop_list')]/dl[@dataflag = 'bg']") for dl in dls: item = ESFHouseItem(province = province, city = city) # 房子名字 name = dl.xpath(".//p[@class = 'add_shop']/a/@title").get() item["name"] = name # 信息(几室几厅(rooms),面积(area), 层(floor), 朝向(toward), 年代(year)) infos = dl.xpath(".//p[@class = 'tel_shop']/text()").getall() infos = "".join(infos).strip() infos = re.sub(r"'|\|\r|\n|/s| ", "", infos) item['infos'] = infos # 地址 address = dl.xpath(".//p[@class = 'add_shop']/span/text()").get() item['address'] = address # 价格 price = dl.xpath(".//dd[@class = 'price_right']/span[1]//text()").getall() price = "".join(price) item['price'] = price # 均价 unit = dl.xpath(".//dd[@class = 'price_right']/span[2]/text()").get() item['unit'] = unit # 原始url origin_url = dl.xpath(".//h4[@class = 'clearfix']/a/@href").getall() origin_url = "".join(origin_url) origin_url = response.urljoin(origin_url) item['origin_url'] = origin_url yield item# 下一页url next_url = response.xpath("//div[@class = 'page_al']/p[last()-1]/a/@href").get() if next_url: yield scrapy.Request(url=response.urljoin(next_url), callback=self.parse_esf, meta={"info": (province, city)})if __name__ == '__main__': from scrapy import cmdline args = "scrapy crawl sfw".split() cmdline.execute(args)

爬虫文件最后四行main代码是设置的快捷运行爬虫代码,不用再使用命令行输入命令启动爬虫,直接右键运行即可。
3、将数据存储为Json文件(pipelines.py) 爬虫文件中有两个函数,最后分别将爬取到的新房和二手房数据通过yield一条一条返回到pipelines.py中,pipelines.py是数据存储文件,将数据存储为Json文件、存储进MySQL数据库、存储进MongoDB数据库均是在这个文件中。
from scrapy.exporters import JsonLinesItemExporter from fang.items import NewHouseItem, ESFHouseItemclass FangePipeline(object): def __init__(self): self.newhouse_fp = open("newhouse.json", "wb") self.esfhouse_fp = open("esfhouse.json", "wb") self.newhouse_exporter = JsonLinesItemExporter(self.newhouse_fp, ensure_ascii = False) self.esfhouse_exporter = JsonLinesItemExporter(self.esfhouse_fp, ensure_ascii=False)def process_item(self, item, spider): # 判断返回的item和items.py文件中定义的item类型是否一致 if isinstance(item, NewHouseItem): self.newhouse_exporter.export_item(item) else: self.esfhouse_exporter.export_item(item) return itemdef close_spider(self, spider): self.newhouse_fp.close() self.esfhouse_fp.close()

当代码写完后,在设置中把管道注释打开即可,如下所示:
ITEM_PIPELINES = { # 存储为Json文件管道 'fang.pipelines.FangePipeline': 300,# 存储为MySQL同步操作管道 # 'fang.pipelines.MysqlSavePipline_1': 300, # 存储为MySQL异步操作管道 # 'fang.pipelines.MysqlSavePipline_2': 300,# 存入到MongoDB # 'fang.pipelines.MongodbPipline': 300, }

切换到爬虫文件,右键运行即可获取到Json数据。
—————————————————————————————————————————————
附加内容:
将数据存入Json文件常用的有两种方法:
一种是本文件中使用的JsonLinesItemExporter
还有一种是JsonItemExporter
前一种方法是将数据逐条写入文件中,后一种方法是将数据一起写入文件中,当数据量大的时候,后一种方法十分吃内存,但是后一种方法是标准的Json格式,各有优缺点。
4、将数据存入MySQL(pipelines.py) 数据存入MySQL分为同步操作和异步操作两种。
4.1、同步操作
# 存入MySQL数据库(同步操作) import pymysql class MysqlSavePipline_1(object): def __init__(self): # 建立连接后面参数分别为:主机, MySQL用户名, MySQL密码, 哪一个数据库 self.conn = pymysql.connect("localhost", "root", "123456", "fang") # 创建游标 self.cursor = self.conn.cursor()def process_item(self, item, spider): # 判断返回的item和items.py文件中定义的item类型是否一致 if isinstance(item, NewHouseItem): # 新房sql语句 insert_sql = """ insert into newhouse(province, city, name, price, rooms, area, address, district, sale, origin_url) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ # 执行插入数据库的操作 self.cursor.execute(insert_sql, (item['province'], item['city'], item['name'], item['price'], item['rooms'], item['area'], item['address'], item['district'], item['sale'], item['origin_url']))if isinstance(item, ESFHouseItem): # 二手房sql语句 insert_sql = """ insert into esfhouse(province, city, name, address, price, unit, origin_url, infos) values (%s, %s, %s, %s, %s, %s, %s, %s) """ # 执行s插入数据库操作 self.cursor.execute(insert_sql, (item['province'], item['city'], item['name'], item['address'], item['price'], item['unit'], item['origin_url'], item['infos']))# 提交,不进行提交保存不到数据库 self.conn.commit()def close_spider(self, spider): # 关闭游标和连接 self.cursor.close() self.conn.close()

写完代码后,在设置文件中将管道注释打开即可,如下所示:
ITEM_PIPELINES = { # 存储为Json文件管道 # 'fang.pipelines.FangePipeline': 300,# 存储为MySQL同步操作管道 'fang.pipelines.MysqlSavePipline_1': 300, # 存储为MySQL异步操作管道 # 'fang.pipelines.MysqlSavePipline_2': 300,# 存入到MongoDB # 'fang.pipelines.MongodbPipline': 300, }

切换到爬虫文件,右键运行后即可以同步方式存储数据。
4.2、异步操作
# 存入MySQL数据库(异步操作) import pymysql from twisted.enterprise import adbapi class MysqlSavePipline_2(object): def __init__(self, dbpool): self.dbpool = dbpool@classmethod # 函数名固定,会被scrapy调用,直接可用settings的值 def from_settings(cls, settings): """ 数据库建立连接 :param settings:配置参数 :return:实例化参数 """ adbparams = dict( host = settings['MYSQL_HOST'], db = settings['MYSQL_DBNAME'], user = settings['MYSQL_USER'], password = settings['MYSQL_PASSWORD'], # 指定cursor类型 cursorclass = pymysql.cursors.DictCursor ) # 连接数据池ConnectionPool,使用pymysql连接 dbpool = adbapi.ConnectionPool('pymysql', **adbparams) # 返回实例化参数 return cls(dbpool)def process_item(self, item, spider): """ 使用twisted将MySQL插入变成异步执行。通过连接池执行具体的sql操作,返回一个对象 :param item: :param spider: :return: """ # 指定操作方法和操作数据 query = self.dbpool.runInteraction(self.do_insert, item) # 添加异常处理 query.addCallback(self.handle_error)def do_insert(self, cursor, item): # 对数据库执行插入操作,并不需要commit,twisted会自动commit # 首先判断应该插入哪一张表 if isinstance(item, NewHouseItem): insert_sql = """ insert into newhouse(province, city, name, price, rooms, area, address, district, sale, origin_url) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ cursor.execute(insert_sql, (item['province'], item['city'], item['name'], item['price'], item['rooms'], item['area'], item['address'], item['district'], item['sale'], item['origin_url'])) if isinstance(item, ESFHouseItem): insert_sql = """ insert into esfhouse(province, city, name, address, price, unit, origin_url, infos) values (%s, %s, %s, %s, %s, %s, %s, %s) """ # 执行s插入数据库操作 cursor.execute(insert_sql, (item['province'], item['city'], item['name'], item['address'], item['price'], item['unit'], item['origin_url'], item['infos']))def handle_error(self,failure): if failure: # 打印错误信息 print(failure)

代码书写完成后,在设置文件中打开管道注释,如下所示:
ITEM_PIPELINES = { # 存储为Json文件管道 # 'fang.pipelines.FangePipeline': 300,# 存储为MySQL同步操作管道 #'fang.pipelines.MysqlSavePipline_1': 300, # 存储为MySQL异步操作管道 'fang.pipelines.MysqlSavePipline_2': 300,# 存入到MongoDB #'fang.pipelines.MongodbPipline': 300, }

切换到爬虫文件,右键运行后即可以异步方式存储数据。
5、将数据存入MongoDB(pipelines.py) 首先在settings.py文件中添加数据库配置信息:
# 数据库配置信息 MYSQL_HOST = 'localhost' MYSQL_DBNAME = 'fang' MYSQL_USER = 'root' MYSQL_PASSWORD = '123456'

然后在piplines.py文件中书写存储代码:
# 存入MongoDB数据库 import pymongo class MongodbPipline(object): def __init__(self): # 建立数据库连接 client = pymongo.MongoClient('127.0.0.1', 27017) # 连接所需数据库, fang为数据库名字 db = client['fang'] # 连接所用集合,也就是通常所说的表,newhouse为表名 self.post_newhouse = db['newhouse']# 新房 self.post_esfhouse = db['esfhouse']# 二手房def process_item(self, item, spider): if isinstance(item, NewHouseItem): # 把item转化为字典形式 postItem = dict(item) # 向数据库插入一条记录 self.post_newhouse.insert(postItem) if isinstance(item, ESFHouseItem): # 把item转化为字典形式 postItem = dict(item) # 向数据库插入一条记录 self.post_esfhouse.insert(postItem)

代码书写完成后,在设置文件中打开管道注释,如下所示:
ITEM_PIPELINES = { # 存储为Json文件管道 # 'fang.pipelines.FangePipeline': 300,# 存储为MySQL同步操作管道 #'fang.pipelines.MysqlSavePipline_1': 300, # 存储为MySQL异步操作管道 # 'fang.pipelines.MysqlSavePipline_2': 300,# 存入到MongoDB 'fang.pipelines.MongodbPipline': 300, }

切换到爬虫文件,右键运行后即可将数据存储到MongoDB数据库。
6、 请求头及代理ip设置(middlewares.py) 6.1、 请求头设置
# 设置随机请求头 class UserAgentDownloadMiddleware(object): # User-Agent中的请求头 User_Agents = [ "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50", "Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50", "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:38.0) Gecko/20100101 Firefox/38.0", "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727; .NET CLR 3.0.30729; .NET CLR 3.5.30729; InfoPath.3; rv:11.0) like Gecko", "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)", "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0)", "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:2.0.1) Gecko/20100101 Firefox/4.0.1", "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1", "Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11", "Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11", "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)", "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)", "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)", "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)", "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SE 2.X MetaSr 1.0; SE 2.X MetaSr 1.0; .NET CLR 2.0.50727; SE 2.X MetaSr 1.0)", "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)", "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Avant Browser)", "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)", "Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5", "Mozilla/5.0 (iPod; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5", "Mozilla/5.0 (iPad; U; CPU OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5", "Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1", "MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1", "Opera/9.80 (Android 2.3.4; Linux; Opera Mobi/build-1107180945; U; en-GB) Presto/2.8.149 Version/11.10", "Mozilla/5.0 (Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13 (KHTML, like Gecko) Version/4.0 Safari/534.13", "Mozilla/5.0 (BlackBerry; U; BlackBerry 9800; en) AppleWebKit/534.1+ (KHTML, like Gecko) Version/6.0.0.337 Mobile Safari/534.1+", "Mozilla/5.0 (hp-tablet; Linux; hpwOS/3.0.0; U; en-US) AppleWebKit/534.6 (KHTML, like Gecko) wOSBrowser/233.70 Safari/534.6 TouchPad/1.0", "Mozilla/5.0 (SymbianOS/9.4; Series60/5.0 NokiaN97-1/20.0.019; Profile/MIDP-2.1 Configuration/CLDC-1.1) AppleWebKit/525 (KHTML, like Gecko) BrowserNG/7.1.18124", "Mozilla/5.0 (compatible; MSIE 9.0; Windows Phone OS 7.5; Trident/5.0; IEMobile/9.0; HTC; Titan)", "UCWEB7.0.2.37/28/999", "NOKIA5700/ UCWEB7.0.2.37/28/999", "Openwave/ UCWEB7.0.2.37/28/999", "Mozilla/4.0 (compatible; MSIE 6.0; ) Opera/UCWEB7.0.2.37/28/999" ]# 定义函数随机获取一个请求头 def process_request(self, request, spider): user_agent = random.choice(self.User_Agents) request.headers['User-Agent'] = user_agent

6.2、 代理ip 6.2.1、 代理ip
# 设置ip代理 class IPProxyDownloadMiddleware(object): # 在代理平台购买代理,填写信息后会返回一个链接,把链接中的ip代理复制过来 proxies = ["178.44.170.152:8000", "110.44.113.182:8080", "209.126.124.73:8888", "84.42.79.243:8080","42.104.84.106:8080","117.64.234.7:808", "103.76.199.166:8080"] # 定义函数一次选择一个ip def process_request(self, request, spider): proxy = random.choice(self.proxies) request.meta["proxy"] = proxy

写完代码后记得在设置中添加中间件。
6.2.2、 独享代理ip
import base64 # 设置独享代理ip class DuXiangIPProxyDownloadMiddleware(object): def process_request(self, request, spider): # 购买独享代理的ip地址以及端口号 proxy = "独享代理ip地址 : 端口号" # 购买独享代理的账号和密码 user_password = "账号 : 密码" request.meta["proxy"] = proxy # 把密码设置进去,先进行base64转换.b64_user_password需要是bytes数据类型,而user_password是Unicode(str)类型,所以需要先编码。 b64_user_password = base64.b64encode(user_password.encode('utf-8')) # 设置在请求头中.Basic是str数据类型,b64_user_password是bytes数据类型,所以首先解码 request.headers["Proxy-Authorization"] = "Basic " + b64_user_password.decode('utf-8')

写完代码后记得在设置中添加中间件。
7、将Scrapy爬虫改造为Scrapy-Redis分布式爬虫 7.1、 Redis 7.1.1、 下载
地址:https://github.com/MicrosoftArchive/redis/releases
7.1.2、 打开Redis
  1. 打开cmd命令窗口
  2. 进入Redis文件夹目录
  3. 输入命令:redis-server.exe redis.windows.conf即可打开。(此条命令分别是Redis目录下的两个文件)
  4. 再重新打开一个cmd,输入命令redis-cli即可连接
7.1.3、 其他机器连接本机Redis
  1. 将本机Redis文件中的这个文件redis.windows.conf中的bind后面的ip改为Redis所在服务器的ip
  2. 打开虚拟机(或者是需要连接Redis的主机),输入代码连接
  3. 输入如下代码:redis-cli -h [Redis所在服务器的ip] -p 6379
7.2、 分布式爬虫原理 7.2.1、 原理
Scrapy单机爬虫中有一个本地爬取队列Queue,这个队列是利用deque模块实现的。如果新的Request生成就会放到队列里面,随后RequestScheduler调度。之后,Request交给Downloader执行爬取,简单的调度架构如下图所示。
Scrapy爬虫项目,Scrapy存储为Json文件、Scrapy存入MySQL、Scrapy存入MongoDB,Scrapy项目改造为Scrapy-Redis分布式爬虫、Scrapy项目部署到服务器
文章图片

如果两个Scheduler同时从队列里面取Request,每个Scheduler都有其对应的Downloader,那么在带宽足够、正常爬取且不考虑队列存取压力的情况下,爬取效率会有什么变化?没错,爬取效率会翻倍。
这样,Scheduler可以扩展多个,Downloader也可以扩展多个。而爬取队列Queue必须始终为一个,也就是所谓的共享爬取队列。这样才能保证Scheduer从队列里调度某个Request之后,其他Scheduler不会重复调度此Request,就可以做到多个Schduler同步爬取。这就是分布式爬虫的基本雏形,简单调度架构如下图所示。
Scrapy爬虫项目,Scrapy存储为Json文件、Scrapy存入MySQL、Scrapy存入MongoDB,Scrapy项目改造为Scrapy-Redis分布式爬虫、Scrapy项目部署到服务器
文章图片

我们需要做的就是在多台主机上同时运行爬虫任务协同爬取,而协同爬取的前提就是共享爬取队列。这样各台主机就不需要各自维护爬取队列,而是从共享爬取队列存取Request。但是各台主机还是有各自的SchedulerDownloader,所以调度和下载功能分别完成。如果不考虑队列存取性能消耗,爬取效率还是会成倍提高。
7.2.2、 维护爬取队列
那么这个队列用什么来维护?首先需要考虑的就是性能问题。我们自然想到的是基于内存存储的Redis,它支持多种数据结构,例如列表(List)、集合(Set)、有序集合(Sorted Set)等,存取的操作也非常简单。
Redis支持的这几种数据结构存储各有优点。
  1. 列表有lpush()lpop()rpush()rpop()方法,我们可以用它来实现先进先出式爬取队列,也可以实现先进后出栈式爬取队列。
  2. 集合的元素是无序的且不重复的,这样我们可以非常方便地实现随机排序且不重复的爬取队列。
  3. 有序集合带有分数表示,而ScrapyRequest也有优先级的控制,我们可以用它来实现带优先级调度的队列。
我们需要根据具体爬虫的需求来灵活选择不同的队列。
7.2.3、 如何去重
Scrapy有自动去重,它的去重使用了Python中的集合。这个集合记录了Scrapy中每个Request的指纹,这个指纹实际上就是Request的散列值。
Scrapy源代码中的request_fingerprint()就是计算Request指纹的方法,其方法内部使用的是hashlibsha1()方法。计算的字段包括Request的Method、URL、Body、Headers这几部分内容,这里只要有一点不同,那么计算的结果就不同。计算得到的结果是加密后的字符串,也就是指纹。每个Request都有独有的指纹,指纹就是一个字符串,判定字符串是否重复比判定Request对象是否重复容易得多,所以指纹可以作为判定Request是否重复的依据。
7.2.4、 防止中断
Scrapy中,爬虫运行时的Request队列放在内存中。爬虫运行中断后,这个队列的空间就被释放,此队列就被销毁了。所以一旦爬虫运行中断,爬虫再次运行就相当于全新的爬取过程。
要做到中断后继续爬取,我们可以将队列中的Request保存起来,下次爬取直接读取保存数据即可获取上次爬取的队列。我们在Scrapy中指定一个爬取队列的存储路径即可,这个路径使用JOB_DIR变量来标识,我们可以用如下命令来实现:
scrapy crawl spider -s JOB_DIR=crawls/spider

更加详细的使用方法可以参见官方文档,链接为:https://doc.scrapy.org/en/latest/topics/jobs.html
Scrapy中,我们实际是把爬取队列保存到本地,第二次爬取直接读取并恢复队列即可。那么在分布式架构中我们还用担心这个问题吗?不需要。因为爬取队列本身就是用数据库保存的,如果爬虫中断了,数据库中的Request依然是存在的,下次启动就会接着上次中断的地方继续爬取。
所以,当Redis的队列为空时,爬虫会重新爬取;当Redis的队列不为空时,爬虫便会接着上次中断之处继续爬取。
7.3、 Centos8Python3安装配置 7.3.1、 必备的插件
  1. 先升级
    su -# 切换到root用户,设置必须在root下完成* gcc --version# 查看有没安装gcc* yum install -y update# 升级

    2. 再安装 `gcc`和`make`插件

yum install gcc gcc-c++ yum -y install gcc automake autoconf libtool make yum groupinstall -y 'Development Tools'

yum install -y gcc openssl-devel bzip2-devel libffi-devel

6.3.2、 下载和编译python3.8.2
Centos8自带有Python3.6,可视情况安装)
  1. 在官方网站下载最新版本python3.8.2
wget https://www.python.org/ftp/python/3.8.2/Python-3.8.2.tgz

? 解压缩:
tar -zxvf Python-3.8.1.tgz

2. 用脚本检验整个编译环境

./configure prefix=/usr/local/python3 --enable-optimizations

3. 用make命令编译安装

make && make install

4. 修改环境变量

export PATH=$PATH:/usr/local/python3/bin/

5. 安装PIP

curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py python3.8 get-pip.py

7.4、 项目设置 7.4.1、 获取项目运行所需要的包(后缀文件名字可更改)
打开电脑cmd命令行,进入到项目目录,输入如下命令,会在项目中生成一个txt文件,里面有项目所有需要的包。
pip freeze > requirments.txt

7.4.2、 Centos8安装Python虚拟环境
  1. 安装虚拟环境模块
pip3 install virtualenv -i https://mirrors.aliyun.com/pypi/simple/

2. 安装`virtualenvwrapper`工具管理虚拟环境

pip3 install virtualenvwrapper -i https://mirrors.aliyun.com/pypi/simple/

3. 创建目录用来存放虚拟环境

mkdir $HOME/.virtualenvs

4. 进入`root`用户,修改`.bashrc` 文件

su -# 进入root用户 work空格+两个Tab键# 进入目录 # vi ~/.bashrc# 编辑.bashrc文件

5.在`.bashrc`文件中进入编辑模式,再添加如下代码(不同系统记得改路径)

export WORKON_HOME=$HOME/.virtualenvs# 虚拟环境文件位置 source /usr/local/bin/virtualenvwrapper.sh# 虚拟环境管理工具的位置 VIRTUALENVWRAPPER_PYTHON=/usr/bin/python3.6# 指定使用的解释器目录

6. 运行

source ~/.bashrc

7.4.3、 创建虚拟环境并进入
mkvirtualenv [虚拟环境名称]

workon [虚拟环境名称]

退出虚拟环境:
deactivate

删除虚拟环境(慎用):
rmvirtualenv [虚拟环境名称]
7.4.4、 将所需要的包安装到虚拟环境
注意:在虚拟环境下安装包
进入虚拟环境后,输入如下命令(记得一定要带上 -r):
pip install -r requirment.txt

如果提示找不到pywin32这个包,就打开文件删除这个包名字之后,再重新安装,因为这个包是Windows独有的,Linux系统不用安装。
7.5、 编写Scrapy-Redis步骤
  1. 安装scrapy-redis包:pip install scrapy-redis
  2. 然后在爬虫中导入包:from scrapy_redis.spiders import RedisSpider
  3. 将爬虫的继承的类从scrapy.Spider变成RedisSpider;或者是从scrapy.CrawlSpider变成RedisCrawlSpider
  4. 将爬虫中的start_urls删掉或注释,增加一个redis_key = "xxx"。这个redis_key是为了以后在redis中控制爬虫启动的,爬虫的第一个url,就是在redis中通过这个发送出去的。例如:redis_key = "fang:start_url"
  5. settings.py配置文件中增加以下配置:
# Scrapy-Redis相关配置 # 确保request存储到redis中 SCHEDULER = "scrapy_redis.scheduler.Scheduler"# 确保所有的爬虫共享相同的去重指纹 DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"# 设置redis为item pipeline(数据保存在了redis服务器上) ITEM_PIPELINES = { 'scrapy_redis.pipelines.RedisPipeline': 300 }# 在redis中保持scrapy-redis用到的队列,不会清理redis中的队列,从而可以实现暂停和恢复的功能 SCHEDULER_PERSIST = True# 设置连接Redis信息 REDIS_HOST = '192.168.x.x'# Redis服务器的主机地址(安装Redis的电脑的ip) REDIS_PORT = 6379

上面代码写入scrapysettings.py文件后,记得把原来的ITEM_PIPELINES给注释掉。
7.6、 上传文件到服务器并运行爬虫
1. 把项目在自己电脑压缩,使用命令rz命令上传到服务器(上传到虚拟环境) 2. 上传后在服务器中进行解压:unzip fang.zip 3. 进入项目中爬虫所在文件夹:使用cd命令 4. 运行爬虫

在爬虫服务器上,进入爬虫文件所在的路径,然后输入命令:scrapy runspider [爬虫名字]
【Scrapy爬虫项目,Scrapy存储为Json文件、Scrapy存入MySQL、Scrapy存入MongoDB,Scrapy项目改造为Scrapy-Redis分布式爬虫、Scrapy项目部署到服务器】Redis服务器上,推入一个开始的url链接:redis-cli > lpush [redis_key] start_url开始爬取

    推荐阅读