批量抓取新发地菜价

此篇是之前写的另外一个爬虫项目,只不过后来又了若干版本的演变,因此记录下来。

技术点

  • 多线程
  • asyncio
  • aiohttp
  • aiofiles

逻辑

该项目抓取的逻辑比较简单,直接根据api获取数据后存储即可,无需从源代码上筛选节点。

多线程的方式

需要注意线程数量和limit值,否则容易把网站搞崩!

下列代码实现两种功能,全量和增量,适用flag来标识:

  • 全量数据,抓取全部,如果出现失败的会最终进行重试,直到指定list为空;
  • 增量数据,默认抓取今天,但是如果没有今天的则抓取前一天的,并和已有文件进行比对,不存在对应日期则存储。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/python
# -*- coding: UTF-8 -*-
"""
@time:2022/09/26
@file:xinfadi.py
@author:medivh
@IDE:PyCharm
"""
import os
import time
import datetime
import requests
from concurrent.futures import ThreadPoolExecutor
import random
from utils import random_useragent


def get_page_total():
url = "http://xinfadi.com.cn/getPriceData.html"
try:
response = requests.post(
url,
headers={
"User-Agent": random_useragent(),
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
},
)
content = response.json()
return content['count']
except requests.exceptions.RequestException:
return False


def save_data(cols):
try:
with open('xfd.csv', 'a+') as f:
f.write("".join(cols))
return True
except Exception as e:
print(e)
return False


def get_data(pages, current_num, limit_num, flag):
"""

:param current_num: 当前页
:param limit_num: 限制返回数量
:param flag: True 全量,False 增量
:return:
"""
if flag:
data = download_one_page(current_num, limit_num)
if data and save_data(data):
print('完成第{}页'.format(current_num))
pages.remove(current_num)
else:
print('执行增量逻辑')
data = download_one_page(1, 300)
# 获得当前时间
now = datetime.datetime.now()
# 转换为指定的格式:
days = now.strftime("%Y-%m-%d")
new_data = [col for col in data if days in col]
if not new_data:
print('没有今天的数据,重新拉取昨天的数据')
one_day_ago = (datetime.datetime.now() - datetime.timedelta(days=1))
days = one_day_ago.strftime("%Y-%m-%d")
new_data = [col for col in data if days in col]
try:
if not os.path.exists('xfd.csv') or not os.path.getsize('xfd.csv'):
# 如果文件不存在或者文件为空,则全部存储
save_data(new_data)
else:
with open('xfd.csv', 'r') as f:
"""
1. 如果查询到的第一行不为空不包含指定日期,则存储
2. 如果查询到的第一行为空,则存储
"""
if days not in f.readline():
save_data(new_data)
elif days not in f.readline():
save_data(new_data)
except Exception as e:
print(e)


def download_one_page(current_num, limit_num):
"""

:param current_num: 当前页
:param limit_num: 总数
:return:
"""
url = "http://xinfadi.com.cn/getPriceData.html"
try:
response = requests.post(
url,
headers={
"User-Agent": random_useragent(),
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
},
data={
"current": current_num,
"limit": limit_num
},
timeout=10
)
content = response.json()
cols = list()
for i in content['list']:
col = '{},{},{},{},{},{},{},{},{},{} \n'.format(i['id'], i['prodName'], i['prodCatid'], i['prodCat'],
i['lowPrice'],
i['highPrice'], i['avgPrice'], i['place'],
i['unitInfo'],
i['pubDate'])
cols.append(col)
response.close()
return cols
except requests.exceptions.RequestException:
return False


def replace_price(nums, limit_num):
time.sleep(30)
r_sleep = [0, 0.1, 0.2, 0.3]
while nums:
with ThreadPoolExecutor(10) as rp_t:
for page_num in nums:
rp_args = [page_num, limit_num]
rp_t.submit(lambda p: download_one_page(*p), rp_args)
time.sleep(random.choice(r_sleep))
rp_end = int(time.time())
print(rp_end)
print('最终耗时:{}'.format(rp_end - start))


def all_data():
limit_num = 1000
page_nums = int(count / 1500) + 1 + 1 # 此处为页面总数,+1为无法整除的情况,第二个+1为range范围终止位置需要再+1
pages = list()
with ThreadPoolExecutor(10) as t:
for i in range(1, page_nums):
args = [pages, i, limit_num, True]
pages.append(i)
t.submit(lambda p: get_data(*p), args)
end = int(time.time())
print(end)
print('初次抓取耗时:{}'.format(end - start))
if pages:
replace_price(pages, limit_num)


def last_day_data():
# TODO 增量数据放在文首
limit_num = 1000
page_nums = 1 + 1
pages = list()
print('初始化page {}'.format(pages))
with ThreadPoolExecutor(10) as t:
for i in range(1, page_nums):
args = [pages, i, limit_num, False]
pages.append(i)
t.submit(lambda p: get_data(*p), args)


if __name__ == '__main__':
start = int(time.time())
print(start)
count = get_page_total()
if count:
# all_data() # 全量数据
last_day_data() # 增量数据
else:
print('无法获取总数')
------ 本文结束 ------

版权声明

Medivh's Notes by Medivh is licensed under a Creative Commons BY-NC-ND 4.0 International License.
Medivh创作并维护的Medivh's Notes博客采用创作共用保留署名-非商业-禁止演绎4.0国际许可证
本文首发于Medivh 博客( http://www.mknight.cn ),版权所有,侵权必究。