selenium操作实例(爬取本地的一本小说和淘宝搜索:笔记本电脑)以及进程池

标签: python  selenium  爬虫

本地小说:





#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from multiprocessing import Pool




class XS(object):
    def __init__(self):
        self.options = webdriver.FirefoxOptions()
        self.options.headless = True
        self.driver = webdriver.Firefox(firefox_options=self.options)
        self.driver.get("file:///C:/Users/Administrator/Desktop/wanmeishijie/wanmeishijiexiaoshuo/index.htm")
        self.driver.find_element_by_xpath('/html/body/div[8]/div/ul/li[1]/span/a').click()

    def get_list(self):
        title = self.driver.find_element_by_xpath('/html/body/div[6]/h1')
        content = self.driver.find_element_by_xpath("/html/body/div[6]/div[2]")
        next_href = self.driver.find_element_by_xpath("/html/body/div[6]/div[2]/a[1]").get_attribute("tppabs")
        print(title.text)
        print(content.text)
        print(next_href)
        data = {
            "title":title.text,
            "content":content.text,
            "next_href":next_href,
        }
        f = open('wanmeishijie.txt', 'a', encoding='utf-8')

        # for title, content, next_href in data:
        f.write(data["title"])
        f.write('\n')
        f.write(data["content"])
        f.write('\n')
        f.write('\n')
        f.write('\n')
        f.close()

        self.driver.find_element_by_xpath("/html/body/div[6]/div[2]/a[1]").click()
        self.get_list()


if __name__ == '__main__':

    a = XS()
    a.get_list()










淘宝(笔记本电脑):




import time

from multiprocessing import Pool
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from lxml.html import etree


# apply_async() / map() 方法添加函数任务的时候,需要注意如果是在类中,保证类中的方法不能是实例方法。


class TaoBao(object):
    options = webdriver.FirefoxOptions()
    options.headless = True
    driver = webdriver.Firefox(firefox_options=options)

    def __init__(self, pool):
        self.pool = pool
        self.start_url = 'https://www.taobao.com/'

    def start(self):
        self.driver.get(self.start_url)
        search_input = WebDriverWait(self.driver, 15).until(
            lambda driver: driver.find_element_by_id('q'))
        search_input.send_keys('笔记本电脑')
        self.driver.find_element_by_class_name('btn-search').click()

        # 点击进入到列表页之后,需要使用selenium操作滚动条,让页面滚动到底部。
        for x in range(1, 11, 2):
            height = float(x) / 10
            # 根据x的值,计算整个页面高度需要循环的次数
            # document.documentElement.scrollTop:当前页面相对于窗口顶部的偏移量
            # document.documentElement.scrollHeight: 整个页面的高度,包含可滚动的部分
            js = "document.documentElement.scrollTop = document.documentElement.scrollHeight * %f" % height

            self.driver.execute_script(js)
            time.sleep(0.2)

        html = self.driver.page_source
        self.parse_list_page(html)

    @classmethod
    def get_list_page(cls, html):
        print('获取下一页源码的方法')

    def parse_list_page(self, html):

        list_html = etree.HTML(html, parser=etree.HTMLParser(encoding='utf-8'))

        # 根据list_html找到下一页的连接,然后将下一页的连接的请求也加入进程池
        next_url = 'list_html中提取'
        self.pool.apply_async(self.get_list_page, args=(next_url,), callback=self.parse_list_page)

        divs = list_html.cssselect('.info-cont')
        for div in divs:
            detail_url = 'https:'+div.cssselect('.product-title')[0].attrib['href']
            self.pool.apply_async(self.get_detail_page, args=(detail_url,), callback=self.parse_detail_page)

    @classmethod
    def get_detail_page(cls, detail_url):
        cls.driver.get(detail_url)
        time.sleep(1)
        detail_html = etree.HTML(cls.driver.page_source, parser=etree.HTMLParser(encoding='utf-8'))

        title = detail_html.cssselect('.panel-head > .spu-title')[0].text
        price = detail_html.cssselect('.price > strong')[0].text

        return {
            'title': title,
            'content': price
        }

    @classmethod
    def parse_detail_page(cls, data):
        print(data)


if __name__ == '__main__':
    pool = Pool()

    taobao = TaoBao(pool)
    taobao.start()

    pool.close()
    pool.join()

进程池:

 class MyTest(object):

     def __init__(self, pool):
         self.pool = pool

     @classmethod
     def one(cls, result):
         print('--',result)

     @classmethod
     def two(cls, x):
         time.sleep(3)
         print('====',x)
         return x+100

     def three(self):
         # 向进程池中添加异步任务
         for x in range(1, 10):
             self.pool.apply_async(self.two,args=(x,),callback=self.one)


 if __name__ == '__main__':
     pool = Pool(4)

     obj = MyTest(pool)
     obj.three()

     pool.close() # 关闭进程池,不再向进程池中添加任务。
     pool.join() # 等待子进程执行完毕,再执行主进程的代码

     print('程序执行结束了')

版权声明:本文为qq_33472765原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_33472765/article/details/80785292