使用selenium实现爬取BOSS直聘职位并写入数据库案例

与天地兮比寿,与日月兮齐光。这篇文章主要讲述使用selenium实现爬取BOSS直聘职位并写入数据库案例相关的知识,希望能为你提供帮助。
本次案例实现selenium控制本地Chrome浏览器(不是selenium新打开的无cookie浏览器)【使用selenium实现爬取BOSS直聘职位并写入数据库案例】抓取BOSS直聘相关职位信息,并写入数据库中,代码如下:

"""Readme: create time: 2021-08-011. 请使用pip安装好相应扩展库. lxml、win32api、selenium、pymysql,并配置好ChromeDriver等相关环境 2. 本次使用selenium控制的浏览器为本地Chrome,使用类时请将Chrome安装路径传入参数中 3. 以便更好的抓取相关信息,请提前在Chrome浏览器登录BOSS直聘,如果没有登录会受到反爬限制 4. 执行写入数据库操作时记得提前将 BOSS库和jobs_info表创建好 """ import os import time import random import pymysql import win32api from lxml import etree from selenium import webdriver from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.chrome.options import Optionsclass BOSS(object): def __init__(self, ChromePath, Keyword, QueryQuantity=30): """ :param ChromePath: chrome安装路径 :param Keyword: 搜索职位关键字 :param QueryQuantity: 获取职位的数量,默认获取30条 """ self.Url = "https://www.zhipin.com/" self.ChromePath = ChromePath self.Keyword = Keyword self.jobs_quantity = QueryQuantity self.Browser = None self.xpath = {} self.job_info = {} self.jobs_list = []def start_browser(self): """启动本地浏览器并打开页面"""# 判断当前Chrome是否在运行 def is_exe_running(exe="chrome.exe"): result = os.popen(f\'\'\'tasklist | findstr "{exe}" \'\'\') return exe in result.read()# 关闭当前Chrome def close_exe_program(exe="chrome.exe"): if is_exe_running(exe): os.popen(f"""taskkill -F /im {exe}""") return True return False# 启动Chrome def start_program(path, params=""): win32api.ShellExecute(0, \'open\', path, params, os.path.split(path)[0], 1)# 启用Chrome def start_debugging_chrome(url=""): if close_exe_program(): time.sleep(1) path = self.ChromePath assert path is not None, "获请传入chrome.exe 绝对路径" if not path.endswith(\'chrome.exe\'): path = path + \'\\\\chrome.exe\' start_program(path, f"--remote-debugging-port=9222 {url}")start_debugging_chrome(url=self.Url) option = Options() option.add_experimental_option("debuggerAddress", "127.0.0.1:9222") self.Browser = webdriver.Chrome(options=option) self.Browser.maximize_window() self.Browser.switch_to.window(self.Browser.window_handles[0]) try: WebDriverWait(driver=self.Browser, timeout=10).until(lambda d: d.find_element_by_id("main")) except Exception: print("打开页面失败,请检查网络是否正常后重新运行此程序(如果网络正常尝试改变此方法中的element元素或将try语句注释)") self.close_browser() exit(-1)def close_browser(self): """关闭浏览器""" self.Browser.quit()def wait_element_loaded(self, xpath: str, timeout=10, close_browser=True): """ 等待页面元素成功加载完成 :param xpath: xpath表达式 :param timeout: 最长等待超时时间 :param close_browser: 元素等待超时后是否关闭浏览器 :return: Boolean """ now_time = int(time.time()) while int(time.time()) - now_time < timeout: try: element = self.Browser.find_element_by_xpath(xpath) if element: return True time.sleep(1) except Exception: pass else: if close_browser: self.close_browser() print("查找页面元素失败,如果不存在网络问题请尝试修改xpath表达式") return Falsedef get_element_text(self, xpath, single=True): """获取页面中指定元素的文本内容,如果页面中找不到该元素则返回空 :param xpath: xpath表达式 :param single: True表示获取单个元素,False表示获取多个元素 :return: 元素的文本内容 """ try: if single: return self.Browser.find_element_by_xpath(xpath).text else: return self.Browser.find_elements_by_xpath(xpath) except Exception: return \'\'def add_xpath_element(self): """增加页面中相应的xpath元素""" self.xpath[\'input_box\'] = \'//input[@name="query"]\'# 输入框 self.xpath[\'query\'] = \'//button[@ka="search_box_index"]\'# 查询按钮 self.xpath[\'jobs_div\'] = \'//div[@class="job-list"]\'# 职位展示页面 self.xpath[\'jobs_list\'] = \'//div[@class="job-list"]/ul/li//span[@class="job-name"]/a\'# 职位信息列表 self.xpath[\'job_detail\'] = \'//div[@class="job-banner"]\'# 职位详情页面 self.xpath[\'next_page\'] = \'//a[@ka="page-next"]\'# 下一页按钮 self.xpath[\'company_name\'] = \'//div[@class="company-info"]/a[2]\'# 公司名称 self.xpath[\'job_name\'] = \'//div[@class="name"]/h1\'# 职位名称 self.xpath[\'job_salary\'] = \'//span[@class="salary"]\'# 员工薪水 self.xpath[\'job_banner\'] = \'(//div[@class="info-primary"]/p)[1]\'# 工作信息栏 self.xpath[\'job_content\'] = \'//div[@class="text"]\'# 工作内容 self.xpath[\'job_welfare\'] = \'//div[@class="job-tags"]/span\'# 员工福利 self.xpath[\'company_type\'] = \'//a[@ka="job-detail-brandindustry"]\'# 公司行业 self.xpath[\'update_date\'] = \'(//p[@class="gray"])[1]\'# 发布职位日期 self.xpath[\'update_person\'] = \'//div[@class="detail-op"]/h2\'# 发布职位的人员 self.xpath[\'company_address\'] = \'//div[@class="location-address"]\'# 公司地址 self.xpath[\'Financing_stage\'] = \'//div[@class="sider-company"]/p/i[@class="icon-stage"]/parent::*\'# 公司融资阶段 self.xpath[\'company_size\'] = \'//div[@class="sider-company"]/p/i[@class="icon-scale"]/parent::*\'# 公司规模def get_jobs_info(self): """处理页面内容""" self.Browser.find_element_by_xpath(self.xpath[\'input_box\']).send_keys(self.Keyword) time.sleep(1) self.Browser.find_element_by_xpath(self.xpath[\'query\']).click()while len(self.jobs_list) < self.jobs_quantity: if not self.wait_element_loaded(xpath=self.xpath[\'jobs_div\'], timeout=15): print("查询职位页面加载失败,请检查网络连接是否正常!") exit(-1) jobs_links = self.Browser.find_elements_by_xpath(self.xpath[\'jobs_list\'])for job_link in jobs_links: # 打开单个职位详情页面并将浏览器对象切换到新窗口 job_link.click() self.Browser.switch_to.window(self.Browser.window_handles[-1]) if not self.wait_element_loaded(xpath=self.xpath[\'job_detail\']): print("职位详情页面加载失败,请检查!") exit(-1) job_welfare_list = self.Browser.find_elements_by_xpath(self.xpath[\'job_welfare\']) job_banner_html = self.Browser.find_element_by_xpath(self.xpath[\'job_banner\']).get_attribute( "outerHTML") # 使用etree更好的获取text job_banner = etree.fromstring(job_banner_HTML)education = job_banner.xpath("./text()")[1] work_experience = job_banner.xpath("./text()")[0] job_name = self.get_element_text(self.xpath[\'job_name\']) job_salary = self.get_element_text(self.xpath[\'job_salary\']) update_date = self.get_element_text(self.xpath[\'update_date\']) company_size = self.get_element_text(self.xpath[\'company_size\']) company_type = self.get_element_text(self.xpath[\'company_type\']) company_name = self.get_element_text(self.xpath[\'company_name\']) update_person = self.get_element_text(self.xpath[\'update_person\']) Financing_stage = self.get_element_text(self.xpath[\'Financing_stage\']) company_address = self.get_element_text(self.xpath[\'company_address\']) job_content = self.get_element_text(self.xpath[\'job_content\']).replace(\'\\n\', \'\') job_welfare = \'、\'.join([welfare.text for welfare in job_welfare_list if welfare.text])self.job_info = {\'company_name\': company_name, \'job_name\': job_name, \'job_salary\': job_salary, \'work_experience\': work_experience, \'education\': education, \'job_content\': job_content, \'Financing_stage\': Financing_stage, \'company_size\': company_size, \'company_type\': company_type, \'company_address\': company_address, \'update_date\': update_date, \'update_person\': update_person, \'job_welfare\': job_welfare, } print(self.job_info) if len(self.jobs_list) < self.jobs_quantity: self.jobs_list.append(self.job_info) else: break time.sleep(random.randint(2, 5)) self.Browser.close() # 关闭当前所使用的页面后要及时切换到另一个存在的页面,不然会报错 self.Browser.switch_to.window(self.Browser.window_handles[0])if len(self.jobs_list) < self.jobs_quantity: try: self.Browser.switch_to.window(self.Browser.window_handles[0]) next_page = self.Browser.find_element_by_xpath(self.xpath[\'next_page\']) if "disabled" not in next_page.get_attribute("class"): # 点击下一页 next_page.click() else: print("已查询到最后一页...") break except Exception as e: print(e) breakprint("已获取职位数量", len(self.jobs_list))def run(self): """执行程序 :return: 获取到的结果 -> list """ self.start_browser() self.add_xpath_element() self.get_jobs_info() self.close_browser() return self.jobs_listclass MysqlPipelines(object): """ 实现将数据写入到数据库中 提前将BOSS库和jobs_info表创建好,创建表结构如下: create table BOSS.jobs_info( \\ id int primary key auto_increment, \\ company_name varchar(100) character set utf8, \\ job_name varchar(100) character set utf8, \\ job_salary varchar(100) character set utf8, \\ work_experience varchar(100) character set utf8, \\ education varchar(100) character set utf8, \\ job_content varchar(3000) character set utf8, \\ Financing_stage varchar(100) character set utf8, \\ company_size varchar(100) character set utf8, \\ company_type varchar(100) character set utf8, \\ company_address varchar(100) character set utf8, \\ update_date varchar(100) character set utf8, \\ update_person varchar(100) character set utf8, \\ job_welfare varchar(100) character set utf8); """ def __init__(self, Host, Port, DB, User, Password, Charset): """初始化. 定义连接远程MySQL的IP、端口、库名、用户名、密码、字符格式. """ self.conn = pymysql.connect( host=Host, port=Port, db=DB, user=User, password=Password, charset=Charset, ) self.cur = self.conn.cursor()def write_to_mysql(self, values: list or dict): """ 将values值进行循环遍历,过滤出字典数据后进行依次写入数据库. :param values: 可以是单个字典也可以将字典包含在列表或二维数组中. """ if isinstance(values, list) and len(values) > = 1: for value in values: if isinstance(value, list): for internal_dict in value: if isinstance(internal_dict, dict): self.insert_value(internal_dict) elif isinstance(value, dict): self.insert_value(value) elif isinstance(values, dict): self.insert_value(values)self.conn.close()def insert_value(self, dict_value: dict): """向MySQL中插入数据 :param dict_value: 要插入的字典值 """ TableName = "BOSS.jobs_info" FieldsName = "company_name, job_name, job_salary, work_experience, education, job_content, Financing_stage, " \\ "company_size, company_type, company_address, update_date, update_person, job_welfare " sql = "insert into {}({}) values(\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',\'{}\')".format( TableName, FieldsName, dict_value.get(\'company_name\'), dict_value.get(\'job_name\'), dict_value.get(\'job_salary\'), dict_value.get(\'work_experience\'), dict_value.get(\'education\'), dict_value.get(\'job_content\'), dict_value.get(\'Financing_stage\'), dict_value.get(\'company_size\'), dict_value.get(\'company_type\'), dict_value.get(\'company_address\'), dict_value.get(\'update_date\'), dict_value.get(\'update_person\'), dict_value.get(\'job_welfare\') ) try: self.cur.execute(sql) self.conn.commit() except Exception as E: print(E) self.conn.rollback()if __name__ == \'__main__\': chrome_path = "C:\\\\Program Files\\\\Google\\\\Chrome\\\\Application\\\\chrome.exe" keyword = "数据分析" query_quantity = 40 main = BOSS(ChromePath=chrome_path, Keyword=keyword, QueryQuantity=query_quantity) jobs_info = main.run()mysql = MysqlPipelines(Host=\'192.168.116.128\', Port=3306, DB=\'BOSS\', User=\'root\', Password=\'password123\', Charset=\'utf8\') mysql.write_to_mysql(values=jobs_info)



    推荐阅读