存储数据并在 Python 中重新访问数据的最佳方式
Best way to store the data and re-access it in Python
我正在构建一个将 运行 持续作为拍卖网站的抓取工具。刮刀先刮 link 辆汽车然后去每个 link 并检查汽车是否已售出。如果汽车已售出,抓取工具会将数据抓取到 CSV 文件中。如果没有卖出,则继续下一个link并完成该过程。
一旦流程完成,它就会从头开始。抓取汽车 links,然后将 links 附加到列表中,然后从 link,我抓取每辆车。
现在,这个程序的缺点是如果脚本因任何原因停止,存储在列表中的数据也将丢失。
那么,存储数据的最佳方式是什么,这样即使脚本因任何原因中断,数据也不会丢失,并且可以在再次 运行 再次运行脚本时重新访问。
我试图将 links 存储在一个文本文件中,但是当我读取文件时,在写入文件后,它没有显示任何存储的 links。
下面是我的代码。
print('***Please enter the years range***')
year_from = 2000 # you can change this value.
year_to = 202 # you can change this value.
pause = 8 # will run again after 24 hours.
import requests
from scrapy.selector import Selector
import csv
import re
from time import sleep
import datetime
from random import randint
headers = {
'authority': 'www.pickles.com.au',
'cache-control': 'max-age=0',
'sec-ch-ua': '^\^Chromium^\^;v=^\^92^\^, ^\^',
'sec-ch-ua-mobile': '?0',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'sec-fetch-site': 'none',
'sec-fetch-mode': 'navigate',
'sec-fetch-user': '?1',
'sec-fetch-dest': 'document',
'accept-language': 'en-GB,en-US;q=0.9,en;q=0.8',
'if-modified-since': 'Sun, 29 Aug 2021 20:36:16 GMT',
}
while True:
pickles_links_list = []
live_auctions_api = 'https://www.pickles.com.au/PWR-Web/services/api/sales/future'
api_request = requests.get(url=live_auctions_api, headers=headers)
for auctions in api_request.json():
auction_link = auctions.get('viewSaleListingLink')
if 'cars/item/search/-/listing/listSaleItems/' in auction_link:
auction_request = requests.get(url=auction_link, headers=headers)
response = Selector(text=auction_request.text)
sales_id_re = response.xpath('//script[contains(text(), "Product_Type_Sequence")]/text() | //script[contains(text(), "lot_number_suffix_sequence")]/text()').get()
sales_id = re.findall(r'"Product_Type_Sequence";var n="(.*?)"', sales_id_re) or re.findall(r'"lot_number_suffix_sequence";var n="(.*?)"', sales_id_re)
if sales_id == []:
continue
auction_sale_link = f'https://www.pickles.com.au/v4/caradvert/saleid-{sales_id[0]}-public?count=true&inav=Car%7Cbc%7Cha%7Cu&q=(And.ProductType.Vehicles._.Year.range({year_from}..{year_to}).)&sr=%7Clot_number_suffix_sequence%7C0%7C30'
auction_sale_link_requests = requests.get(url=auction_sale_link, headers=headers)
auctions_data = auction_sale_link_requests.json().get('SearchResults')
if auctions_data == []:
print({"No results for": auction_sale_link_requests.url})
for auction_data in auctions_data:
ids = auction_data.get('TargetId')
main_title = auction_data.get('Title')
link_path = main_title.replace(' ', '-').replace('/', '-').replace(',', '-') + '/' + str(ids)
each_auction_link = f'https://www.pickles.com.au/cars/item/-/details/{link_path}'
pickles_links_list.append(each_auction_link)
print({'Link': each_auction_link})
# going through each link in the text file and checking the results
with open('pickles.csv', 'a+', newline='', encoding='utf-8') as csv_file:
csv_writer = csv.writer(csv_file)
csv_header = [
'Title', 'Make','Model', 'Variant',
'Transmission', 'Odometer', 'State',
'Sale Price', 'Link', 'Sold Date & Time',
'Sold To', 'Condition Report', 'Description',
]
# csv_writer.writerow(csv_header)
unique_links_list = list(set(pickles_links_list))
print('''
###################################
# #
# #
# Now scraping sold items #
# #
# #
###################################
''')
sleep(1)
print({'Total links': f'*** {len(unique_links_list)} ***'})
sleep(3)
for each_link in unique_links_list:
print({'Scraping': each_link})
random_delay = randint(1, 7)
print(f'*** Sleeping for [{random_delay}] seconds ***')
sleep(random_delay)
each_auction_request = requests.get(each_link, headers=headers)
response = Selector(text=each_auction_request.text)
current_status = response.xpath('//h6[@class="mt-2"]/text()[2]').get()
sold_auctions_list = []
if current_status == 'This item has been sold. ' and each_link not in sold_auctions_list:
ids = each_link.split('/')[-1]
title = response.xpath('//div[@class="row"]//h1/text()').get()
description = response.xpath('//td[@itemprop="description"]/text()').get()
condition_report = response.xpath('//a[contains(text(), "Condition Report")]/@href').get()
make = description.split(', ')[1]
model = description.split(', ')[2]
variant = description.split(', ')[3]
transmission = response.xpath('//i[contains(@class, "transmission")]/following-sibling::span/text()').get()
odometer = response.xpath('//i[contains(@class, "mileage")]/following-sibling::span/text()').get()
state = response.xpath('//td[contains(text(), "Location")]/following-sibling::td/text()').get().split(', ')[-1]
# bid history api
bid_history = f'https://www.pickles.com.au/PWR-Web/services/api/bidHistoryService/bidHistory?item={ids}'
sold_item_request = requests.get(url=bid_history, headers=headers)
sold_item_resp = sold_item_request.json()[0]
winning_price = sold_item_resp.get('actualBid')
sold_time_in_ms = sold_item_resp.get('bidTimeInMilliSeconds')
sold_date_time = datetime.datetime.fromtimestamp(sold_time_in_ms / 1000.0, tz=datetime.timezone.utc).isoformat()
sold_to = sold_item_resp.get('bidderAnonName')
auction_values = [
title, make, model, variant, transmission, odometer,
state, "${:,.2f}".format(winning_price).strip() ,
each_auction_request.url, sold_date_time, sold_to,
f'https://www.pickles.com.au{condition_report}', description,
]
csv_writer.writerow(auction_values)
print('*** Sold item found and added to the CSV file ***')
sold_auctions_list.append(each_link)
else:
print('*** This item is not sold yet ***')
continue
您可以使用数据帧来跟踪提取的链接,并使用 try catch 保存数据帧以防脚本中断。这是示例代码。
import pandas as pd
import os
class Scraping_data():
def __init__(self):
self.data_directory = 'your_data_directory'
def load_links(self):
df_links = pd.read_csv('./links_file.csv')
if 'extracted_links.csv' in os.listdir(self.data_directory):
df_extracted = pd.read_csv(os.path.join(self.data_directory, 'extracted_links.csv'))
df_links = df_links[~df_links['links'].isin(df_extracted['links'])]
df_links.reset_index(drop=True, inplace=True)
else:
df_extracted = pd.DataFrame(columns=['links', 'status'])
return df_extracted, df_links
def scrap_data(self):
df_extracted, df_links = self.load_links()
extracted_users = []
try:
for index, row in df_links.iterrows():
#Your Scrapping Logic Here.
#row['links'] will give you the current link.
#Upon Successfull extraction of a link.
data_row = {'links': row['link'], 'status': 'extracted'}
extracted_users.append(data_row)
df_extracted = pd.concat([df_extracted, pd.DataFrame(data=extracted_users)], ignore_index=True)
df_extracted.to_csv(os.path.join(self.data_directory, 'extracted_links.csv'), index=False)
except:
df_extracted = pd.concat([df_extracted, pd.DataFrame(data=extracted_users)], ignore_index=True)
df_extracted.to_csv(os.path.join(self.data_directory, 'extracted_links.csv'), index=False)
Python sqlitedb 方法:
参考:https://www.tutorialspoint.com/sqlite/sqlite_python.htm
- 创建 sqlitedb。
- 创建一个 table 和 url 来 删除 与架构类似
创建 TABLE 公司
(url 不为空唯一,
Status NOT NULL default "Not started")
- 现在只读取状态为“未开始”的行。
- 抓取完成后,您可以将 URL 的状态列更改为成功。
- 因此,无论脚本在哪里启动,它只会 运行 运行 对于未启动一次。
我正在构建一个将 运行 持续作为拍卖网站的抓取工具。刮刀先刮 link 辆汽车然后去每个 link 并检查汽车是否已售出。如果汽车已售出,抓取工具会将数据抓取到 CSV 文件中。如果没有卖出,则继续下一个link并完成该过程。
一旦流程完成,它就会从头开始。抓取汽车 links,然后将 links 附加到列表中,然后从 link,我抓取每辆车。 现在,这个程序的缺点是如果脚本因任何原因停止,存储在列表中的数据也将丢失。
那么,存储数据的最佳方式是什么,这样即使脚本因任何原因中断,数据也不会丢失,并且可以在再次 运行 再次运行脚本时重新访问。 我试图将 links 存储在一个文本文件中,但是当我读取文件时,在写入文件后,它没有显示任何存储的 links。
下面是我的代码。
print('***Please enter the years range***')
year_from = 2000 # you can change this value.
year_to = 202 # you can change this value.
pause = 8 # will run again after 24 hours.
import requests
from scrapy.selector import Selector
import csv
import re
from time import sleep
import datetime
from random import randint
headers = {
'authority': 'www.pickles.com.au',
'cache-control': 'max-age=0',
'sec-ch-ua': '^\^Chromium^\^;v=^\^92^\^, ^\^',
'sec-ch-ua-mobile': '?0',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'sec-fetch-site': 'none',
'sec-fetch-mode': 'navigate',
'sec-fetch-user': '?1',
'sec-fetch-dest': 'document',
'accept-language': 'en-GB,en-US;q=0.9,en;q=0.8',
'if-modified-since': 'Sun, 29 Aug 2021 20:36:16 GMT',
}
while True:
pickles_links_list = []
live_auctions_api = 'https://www.pickles.com.au/PWR-Web/services/api/sales/future'
api_request = requests.get(url=live_auctions_api, headers=headers)
for auctions in api_request.json():
auction_link = auctions.get('viewSaleListingLink')
if 'cars/item/search/-/listing/listSaleItems/' in auction_link:
auction_request = requests.get(url=auction_link, headers=headers)
response = Selector(text=auction_request.text)
sales_id_re = response.xpath('//script[contains(text(), "Product_Type_Sequence")]/text() | //script[contains(text(), "lot_number_suffix_sequence")]/text()').get()
sales_id = re.findall(r'"Product_Type_Sequence";var n="(.*?)"', sales_id_re) or re.findall(r'"lot_number_suffix_sequence";var n="(.*?)"', sales_id_re)
if sales_id == []:
continue
auction_sale_link = f'https://www.pickles.com.au/v4/caradvert/saleid-{sales_id[0]}-public?count=true&inav=Car%7Cbc%7Cha%7Cu&q=(And.ProductType.Vehicles._.Year.range({year_from}..{year_to}).)&sr=%7Clot_number_suffix_sequence%7C0%7C30'
auction_sale_link_requests = requests.get(url=auction_sale_link, headers=headers)
auctions_data = auction_sale_link_requests.json().get('SearchResults')
if auctions_data == []:
print({"No results for": auction_sale_link_requests.url})
for auction_data in auctions_data:
ids = auction_data.get('TargetId')
main_title = auction_data.get('Title')
link_path = main_title.replace(' ', '-').replace('/', '-').replace(',', '-') + '/' + str(ids)
each_auction_link = f'https://www.pickles.com.au/cars/item/-/details/{link_path}'
pickles_links_list.append(each_auction_link)
print({'Link': each_auction_link})
# going through each link in the text file and checking the results
with open('pickles.csv', 'a+', newline='', encoding='utf-8') as csv_file:
csv_writer = csv.writer(csv_file)
csv_header = [
'Title', 'Make','Model', 'Variant',
'Transmission', 'Odometer', 'State',
'Sale Price', 'Link', 'Sold Date & Time',
'Sold To', 'Condition Report', 'Description',
]
# csv_writer.writerow(csv_header)
unique_links_list = list(set(pickles_links_list))
print('''
###################################
# #
# #
# Now scraping sold items #
# #
# #
###################################
''')
sleep(1)
print({'Total links': f'*** {len(unique_links_list)} ***'})
sleep(3)
for each_link in unique_links_list:
print({'Scraping': each_link})
random_delay = randint(1, 7)
print(f'*** Sleeping for [{random_delay}] seconds ***')
sleep(random_delay)
each_auction_request = requests.get(each_link, headers=headers)
response = Selector(text=each_auction_request.text)
current_status = response.xpath('//h6[@class="mt-2"]/text()[2]').get()
sold_auctions_list = []
if current_status == 'This item has been sold. ' and each_link not in sold_auctions_list:
ids = each_link.split('/')[-1]
title = response.xpath('//div[@class="row"]//h1/text()').get()
description = response.xpath('//td[@itemprop="description"]/text()').get()
condition_report = response.xpath('//a[contains(text(), "Condition Report")]/@href').get()
make = description.split(', ')[1]
model = description.split(', ')[2]
variant = description.split(', ')[3]
transmission = response.xpath('//i[contains(@class, "transmission")]/following-sibling::span/text()').get()
odometer = response.xpath('//i[contains(@class, "mileage")]/following-sibling::span/text()').get()
state = response.xpath('//td[contains(text(), "Location")]/following-sibling::td/text()').get().split(', ')[-1]
# bid history api
bid_history = f'https://www.pickles.com.au/PWR-Web/services/api/bidHistoryService/bidHistory?item={ids}'
sold_item_request = requests.get(url=bid_history, headers=headers)
sold_item_resp = sold_item_request.json()[0]
winning_price = sold_item_resp.get('actualBid')
sold_time_in_ms = sold_item_resp.get('bidTimeInMilliSeconds')
sold_date_time = datetime.datetime.fromtimestamp(sold_time_in_ms / 1000.0, tz=datetime.timezone.utc).isoformat()
sold_to = sold_item_resp.get('bidderAnonName')
auction_values = [
title, make, model, variant, transmission, odometer,
state, "${:,.2f}".format(winning_price).strip() ,
each_auction_request.url, sold_date_time, sold_to,
f'https://www.pickles.com.au{condition_report}', description,
]
csv_writer.writerow(auction_values)
print('*** Sold item found and added to the CSV file ***')
sold_auctions_list.append(each_link)
else:
print('*** This item is not sold yet ***')
continue
您可以使用数据帧来跟踪提取的链接,并使用 try catch 保存数据帧以防脚本中断。这是示例代码。
import pandas as pd
import os
class Scraping_data():
def __init__(self):
self.data_directory = 'your_data_directory'
def load_links(self):
df_links = pd.read_csv('./links_file.csv')
if 'extracted_links.csv' in os.listdir(self.data_directory):
df_extracted = pd.read_csv(os.path.join(self.data_directory, 'extracted_links.csv'))
df_links = df_links[~df_links['links'].isin(df_extracted['links'])]
df_links.reset_index(drop=True, inplace=True)
else:
df_extracted = pd.DataFrame(columns=['links', 'status'])
return df_extracted, df_links
def scrap_data(self):
df_extracted, df_links = self.load_links()
extracted_users = []
try:
for index, row in df_links.iterrows():
#Your Scrapping Logic Here.
#row['links'] will give you the current link.
#Upon Successfull extraction of a link.
data_row = {'links': row['link'], 'status': 'extracted'}
extracted_users.append(data_row)
df_extracted = pd.concat([df_extracted, pd.DataFrame(data=extracted_users)], ignore_index=True)
df_extracted.to_csv(os.path.join(self.data_directory, 'extracted_links.csv'), index=False)
except:
df_extracted = pd.concat([df_extracted, pd.DataFrame(data=extracted_users)], ignore_index=True)
df_extracted.to_csv(os.path.join(self.data_directory, 'extracted_links.csv'), index=False)
Python sqlitedb 方法: 参考:https://www.tutorialspoint.com/sqlite/sqlite_python.htm
- 创建 sqlitedb。
- 创建一个 table 和 url 来 删除 与架构类似 创建 TABLE 公司 (url 不为空唯一, Status NOT NULL default "Not started")
- 现在只读取状态为“未开始”的行。
- 抓取完成后,您可以将 URL 的状态列更改为成功。
- 因此,无论脚本在哪里启动,它只会 运行 运行 对于未启动一次。