如何使用 joblib 并行化 Selenium 抓取任务? (一个不工作的例子)

How do I use joblib to parallelize a Selenium scraping task? (A non-working example)

我有一项任务需要从印度 2011 年人口普查中提取数据。我正在使用 Selenium 并有一个工作脚本(如下所示),但我正在尝试使用 joblib 库和 Parallel 来并行化任务。当我 运行 这个脚本时,我没有收到错误,我确实观察到我的处理器在我的任务管理器 (Windows 10) 中处于活动状态,但我没有看到从 运行 保存的任何文件宁这个程序,它继续 运行 在非并行版本完成后很久。任何帮助将非常感激。非常感谢。顺便说一句,here 是该程序输入数据集的 link。

的前四条记录
import time
import re
import string
import urllib.parse
import pandas
import numpy
import os
import csv
import joblib

from selenium import webdriver
from joblib import Parallel, delayed
from selenium.webdriver.support.ui import Select
from selenium.webdriver.support.ui import WebDriverWait
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup

path = 'C:/Users/d.wm.mclaughlin/Dropbox/research/india'
os.chdir(path)

input_df = pandas.read_excel("file_path/villages_3109_UTTAR PRADESH_12_003.xlsx", "Sheet1")

def downloadFunction(x):
    driver = webdriver.PhantomJS('C:/phantomjs/bin/phantomjs.exe')
    url = "url"
    driver.get(url);

    selected_state = str(input_df['state_no'][x])
    selected_district = str(input_df['dist_no'][x])
    selected_block = str(input_df['block_no'][x]).zfill(3)
    selected_pan = str(input_df['pan'][x]).zfill(4)

    selected_state_name = input_df['state'][x]
    selected_dist_name = input_df['district'][x]
    selected_block_name = input_df['block'][x]
    selected_pan_name = input_df['village'][x]

    select = Select(driver.find_element_by_css_selector("#ddl_state"))
    select.select_by_value(selected_state)

    distSelect = Select(driver.find_element_by_css_selector("#ddl_dist"))
    distSelect.select_by_value(selected_district)

    blkSelect = Select(driver.find_element_by_css_selector("#ddl_blk"))
    blkSelect.select_by_value(selected_block)

    panSelect = Select(driver.find_element_by_css_selector("#ddl_pan"))
    panSelect.select_by_value(selected_pan)                                                                                                                                                                    
    button_list = ['#RadioButtonList1_0', '#RadioButtonList1_1', '#RadioButtonList1_2']
    button_names = ['auto_inclusion', 'auto_exclusion', 'other']
    for b in range(0,1):
        selected_button = button_list[b]
        selected_button_name = button_names[b]
        driver.find_element_by_css_selector(selected_button).click()
        driver.find_element_by_css_selector('#Button1').click()

        if('No Record Found !!!' in driver.page_source):
            print('No Record Found !!!')
        else:
            ae = driver.find_element_by_css_selector('#form1 > div:nth-child(4) > center:nth-child(2) > table > tbody > tr:nth-child(3) > td:nth-child(1)').text
            if(ae == ''): ae = 0
            ai = driver.find_element_by_css_selector('#form1 > div:nth-child(4) > center:nth-child(2) > table > tbody > tr:nth-child(3) > td:nth-child(2)').text
            if(ai == ''): ai = 0
            oth = driver.find_element_by_css_selector('#form1 > div:nth-child(4) > center:nth-child(2) > table > tbody > tr:nth-child(3) > td:nth-child(3)').text
            if(oth == ''): oth = 0
            dep = driver.find_element_by_css_selector('#form1 > div:nth-child(4) > center:nth-child(2) > table > tbody > tr:nth-child(3) > td:nth-child(4)').text
            if(dep == ''): dep = 0
            ae = int(ae)
            ai = int(ai)
            oth = int(oth)
            dep = int(dep)
            ai_dep = ai + dep

            records = [ai_dep, ae, oth]
            selected_record = records[b]

            table_number = round(selected_record/45)
            table_numbers = list(range(1, (1+(table_number)*3), 3))
            data = []
            for data_tab in table_numbers:
                table_address = '#Div1 > table:nth-child(' + str(data_tab) + ')'
                #print(table_address)
                for tr in driver.find_elements_by_css_selector(table_address):
                    # CONTINUE FROM HERE!!!
                    #print(tr == driver.find_element_by_css_selector("#Div1 > table:nth-child(" + str(data_tab) + ") > tbody > tr:nth-child(1)"))
                    #"#Div1 > table:nth-child(" + str(data_tab) + ") > tbody > tr:nth-child(2)"
                    #"#Div1 > table:nth-child(" + str(data_tab) + ") > tbody > tr:nth-child(3)"

                    tds = tr.find_elements_by_tag_name('td')
                    if tds:
                        data.append([td.text for td in tds])

            #newArray = numpy.array(data)
            for listItem in range(0,len(data)):
                if(listItem > 0):
                    data[listItem] = data[listItem][18:len(data[listItem])]
                    #print(len(data[listItem]))

            flat_data = [item for sublist in data for item in sublist]
            newArray = numpy.array(flat_data)
            dataRows = int(numpy.array(flat_data).size / 9)
            rowsTimesColumns = (dataRows * 9)
            test = pandas.DataFrame(newArray.reshape(dataRows,9), columns=['no',    'hh_name', 'gender', 'age', 'sc', 'fm_name', 'depriv_count', 'ai_d_code', 'total_mem'])
            file_path = 'C:/Users/d.wm.mclaughlin/Dropbox/research/lpg_india/data/secc/secc' + '_' + selected_state + '_' + '_' + selected_district + '_' + '_' + selected_block + '_' + '_' + selected_pan + '_' + '_' + selected_button_name + '.xlsx'
            test.to_excel(file_path, 'Sheet1')
    return print(x);

tester = Parallel(n_jobs=3)(delayed(downloadFunction)(in_val) for in_val in range(1, 10))

假设您有足够的内存来 运行 这而不使用交换,您应该查看文档。来自 https://pythonhosted.org/joblib/parallel.html。特别注意最后一行。

Warning

Under Windows, it is important to protect the main loop of code to avoid recursive spawning of subprocesses when using joblib.Parallel. In other words, you should be writing code like this:

import ....

def function1(...):
    ...

def function2(...):
    ...

... if __name__ == '__main__':
    # do stuff with imports and functions defined about
    ...

No code should run outside of the “if name == ‘main’” blocks, only imports and definitions.

如果是内存问题,请阅读页面的其余部分。你可以从

开始
from joblib.pool import has_shareable_memory

并将最后一行更改为:

if __name__ == '__main__':
    tester = Parallel(n_jobs=3, max_nbytes=1e2)(delayed(downloadFunction, has_shareable_memory)(in_val) for in_val in range(1, 10))

但我猜你的内存消耗不会有多少可以共享。

您还可以添加一些垃圾收集以节省内存:

import gc

在您的 return 语句之前删除所有不必要的变量并添加

del driver
del test
del newArray
del data
# and all the rest
_ = gc.collect()

但请注意,这不会对底层可执行文件内存进行垃圾回收,例如PhantomJS