引言
记录并简单的教程一下,知乎上爬取问题和答案的过程。包括模拟登录。
Cookie和Session
首先讲解一下Cookie
和Session
的区别。
Cookie
Cookie
是保存在本地的一种信息,而且是一种类似于dick
的保存方式,是一个文本。
http
最开始是一种无状态的请求,请求什么,返回什么,前后之间没有联系。为了保存用户的浏览信息,更有效率的返回信息,就有了Cookie
,就有了有状态的请求。
但是这样,如果有人看到电脑上的Cookie
,就可以通过其看到一些个人的一些敏感信息。所以有了Session
。
Session
和Cookie
类似,但是Session
生成的一些东西,是根据用户上传的信息进行变换之后生成的id
,并不包含敏感信息本身,而这个Session
生成的id
也保存在服务器的数据库中。因为这个有过期时间,所以不用担心被盗用。
模拟登陆
教程里面写了知乎的模拟登陆,是因为以前知乎不登录无法获取问题和答案,但是现在的知乎好像没有这样的规定了。所以爬虫的时候应该可以去掉模拟登陆,但是还是讲一讲。
教程里面的模拟登陆已经过时了,而且网上能找到的模拟登陆也都不适合现在的知乎了。现在知乎登录post的是一段密文,懒懒的我并不想去解密(其实是不会XD)。于是推荐用selenium
,这个是一个python
的模拟浏览器的模块,可以手动登录然后将cookies
保存下来供scrapy
用。所以是一种通用方案。
这里需要用到selenium
库和chromedriver.exe
,直接上代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18zhihu_session = requests.session()
chromePath = r'G:\PythonWork\ArticleSpider\chromedriver.exe'
wd = webdriver.Chrome(executable_path=chromePath)
loginUrl = 'https://www.zhihu.com/signin'
wd.get(loginUrl)
wd.find_element_by_xpath('//*[@id="root"]/div/main/div/div/div/div[2]/div[1]/form/div[1]/div[2]/div[1]/input').send_keys('用户名')
wd.find_element_by_xpath('//*[@id="root"]/div/main/div/div/div/div[2]/div[1]/form/div[2]/div/div[1]/input').send_keys('密码')
# 通过url的变化判断是否登陆成功
while wd.current_url == loginUrl:
pass
cookies = wd.get_cookies()
for cookie in cookies:
zhihu_session.cookies.set(cookie['name'], cookie['value'])
wd.quit()
with open('zhihu_cookies.pkl', 'wb') as f:
# 用pickle这个库来把cookies保存到本地,就不用每次都模拟登陆了。
pickle.dump(zhihu_session.cookies, f, 0)
# zhihu_session.cookies.get_dict()这个方法可以获得cookies
这里面要注意的是,一个是user-agent
,这个是一个浏览器客户端的信息,会包含在http请求头里面,这个可以参考自己浏览器的更改。知乎会检测你这个是否合法,如果没有这个或者不合法,会返回500
的错误给你,不让你获取页面,后面爬虫的时候也要注意。然后里面有一个while
,用来判断是否登陆成功。这个是一个通常的用法,如何嵌入scrapy
呢?
嵌入scrapy
也很简单。先在spider
文件夹下打开powershell
或者命令行,输入scrapy genspider zhihu www.zhihu.com
,就可以看到spider
文件夹下多了一个zhihu.py
文件。
打开,在里面重载这个函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30# 重载的函数
def start_requests(self):
zhihu_session = requests.session()
chromePath = r'G:\PythonWork\ArticleSpider\chromedriver.exe'
wd = webdriver.Chrome(executable_path=chromePath)
loginUrl = 'https://www.zhihu.com/signin'
wd.get(loginUrl)
wd.find_element_by_xpath(
'//*[@id="root"]/div/main/div/div/div/div[2]/div[1]/form/div[1]/div[2]/div[1]/input').send_keys(
'用户名')
wd.find_element_by_xpath(
'//*[@id="root"]/div/main/div/div/div/div[2]/div[1]/form/div[2]/div/div[1]/input').send_keys('密码')
# 通过url的变化判断是否登陆成功
while wd.current_url == loginUrl:
pass
cookies = wd.get_cookies()
for cookie in cookies:
zhihu_session.cookies.set(cookie['name'], cookie['value'])
wd.quit()
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'
zhihu_header = {
'Connection': 'keep - alive',
'User-Agent': user_agent
}
# yield scrapy.Request(url='https://www.zhihu.com/signin', headers=zhihu_header, cookies=cookies, callback=self.check_login)
return [scrapy.FormRequest('https://www.zhihu.com/signin', headers=zhihu_header, cookies=cookies, callback=self.check_login)]
# 自定义的函数
def check_login(self, response):
for url in self.start_urls:
yield scrapy.Request(url, dont_filter=True, headers=self.headers)
start_requests
这个方法是scrapy
的一个入口函数,如果没写,默认会从上面的start_urls
开始(这里没有,你们自己的文件里面有)。
重载做的事:首先是将cookies
尝试从本地读取,如果读的到,就带着cookies
访问登录页面,如果cookies
有效,就会自动跳转到首页,如果无效,链接不变,就需要重新模拟登陆获取cookies
。
start_requests
函数的回调函数一定是要的,因为要模仿原来start_requests
里面做的事情,而回调函数check_login
最后一定要把request
给yield
出去,交给下载器。
因为scrapy
是一个异步的框架,yield
出去的实例,在scrapy
里面会判断,如果是item
就路由给pipeline
,如果是链接,就交给下载器等等。
提取问答页面
上面的check_login
函数执行完了之后,就会去执行parse
,所以我们要在这个函数里面来提取问答页面,然后交给下载器并进一步分析里面的字段。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18def parse(self, response):
"""
提取页面中的所有url,并进一步爬取
如果提取的url是 /question/xxx 就下载之后进入解析函数
"""
all_urls = response.css('a::attr(href)').extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
all_urls1 = filter(lambda x: True if x.startswith('https') else False, all_urls)
for url in all_urls1:
match_obj = re.match('(.*zhihu.com/question/(\d+))(/|$).*', url)
if match_obj:
# 页面提取到question相关的页面,则下载后交由提取函数提取
request_url = match_obj.group(1)
# question_id = match_obj.group(2)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_detail)
else:
# 如果不是question页面,就进一步跟踪
yield scrapy.Request(url, headers=self.headers, callback=self.parse)
首先是拼接url
,使其变成完整的url
,然后有些url
其实是js
文件,要去掉,所以用了startswith
判断。
最后,观察链接可以发现,知乎的问答页面链接是这样的:
https://www.zhihu.com/question/29173647/answer/437189494
但是点一下查看全部回答
,就会变成:
https://www.zhihu.com/question/29173647
其实就是把后面的一点去掉了,所以我们在访问的时候,就直接访问第二个就好,提取的时候,用正则表达式匹配出来。然后yield
出去,并回调parse_detail
函数来提取具体的字段。如果不是问答的页面,就继续交给parse
跟踪里面的链接。
网页内容提取
这里还是需要用scrapy shell
进行调试,但是有一点小技巧需要提醒。
之前说过访问知乎的时候需要带上heards
才不会返回500
的错误,所以在这里也需要加上heards
,但是不需要全部,只需要user-agent
,用-s USER_AGENT=""
这样的参数来添加user-agent
参数,完整如下:1
scrapy shell -s USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36" https://www.zhihu.com/question/285353546
创建Item
提取内容之前,还是先创建Item
,让内容能有地方保存,然后可以集中处理。
在items.py
文件里面创建两个类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26class ZhihuQuestionItem(scrapy.Item):
# 知乎的问题 item
zhihu_id = scrapy.Field()
topics = scrapy.Field()
url = scrapy.Field()
title = scrapy.Field()
content = scrapy.Field()
answer_num = scrapy.Field()
comments_num = scrapy.Field()
watch_user_num = scrapy.Field()
click_num = scrapy.Field()
crawl_time = scrapy.Field()
class ZhihuAnswerItem(scrapy.Item):
# 知乎的回答
zhihu_id = scrapy.Field()
url = scrapy.Field()
question_id = scrapy.Field()
author_id = scrapy.Field()
content = scrapy.Field()
parise_num = scrapy.Field()
comments_num = scrapy.Field()
create_time = scrapy.Field()
update_time = scrapy.Field()
crawl_time = scrapy.Field()
从名字都可以看得出来,提取出了一些关键的字段。
提取Item
然后就是spiders
里面的zhihu.py
,教程里面因为当时知乎在改版,新版老版都混在一起,所以有两套提取,现在不用了,统一可以用下面的:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68start_answer_urls = 'https://www.zhihu.com/api/v4/questions/{0}/answers?include=data[*].is_normal,admin_closed_comment,reward_info,is_collapsed,annotation_action,annotation_detail,collapse_reason,is_sticky,collapsed_by,suggest_edit,comment_count,can_comment,content,editable_content,voteup_count,reshipment_settings,comment_permission,created_time,updated_time,review_info,relevant_info,question,excerpt,relationship.is_authorized,is_author,voting,is_thanked,is_nothelp;data[*].mark_infos[*].url;data[*].author.follower_count,badge[?(type=best_answerer)].topics&limit={1}&offset={2}&sort_by=default'
def parse(self, response):
"""
提取页面中的所有url,并进一步爬取
如果提取的url是 /question/xxx 就下载之后进入解析函数
"""
all_urls = response.css('a::attr(href)').extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
all_urls1 = filter(lambda x: True if x.startswith('https') else False, all_urls)
for url in all_urls1:
match_obj = re.match('(.*zhihu.com/question/(\d+))(/|$).*', url)
if match_obj:
# 页面提取到question相关的页面,则下载后交由提取函数提取
request_url = match_obj.group(1)
# question_id = match_obj.group(2)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_detail)
else:
# 如果不是question页面,就进一步跟踪
yield scrapy.Request(url, headers=self.headers, callback=self.parse)
def parse_detail(self, response):
# 处理question页面,从页面中提取quesiont item
match_obj = re.match('(.*zhihu.com/question/(\d+))(/|$).*', response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
item_loader.add_css('title', 'h1.QuestionHeader-title::text')
item_loader.add_css('content', '.QuestionHeader-detail') # .QuestionHeader-detail span.RichText::text
item_loader.add_value('url', response.url)
item_loader.add_value('zhihu_id', question_id)
item_loader.add_css('answer_num', '.List-headerText span::text')
item_loader.add_css('comments_num', '.QuestionHeader-Comment button.Button--plain::text')
item_loader.add_css('watch_user_num', '.NumberBoard-itemInner strong.NumberBoard-itemValue::attr("title")')
item_loader.add_css('topics', '.QuestionHeader-topics .Popover div::text')
question_item = item_loader.load_item()
yield scrapy.Request(self.start_answer_urls.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer)
yield question_item
def parse_answer(self, response):
# 处理question的answer
answer_json = json.loads(response.text)
is_end = answer_json["paging"]['is_end']
totals_answer = answer_json["paging"]['totals']
next_url = answer_json["paging"]['next']
# 提取answer的具体字段
for answer in answer_json['data']:
answer_item = ZhihuAnswerItem()
answer_item['zhihu_id'] = answer['id']
answer_item['url'] = answer['url']
answer_item['question_id'] = answer['question']['id']
answer_item['author_id'] = answer['author']['id'] if 'id' in answer['author'] else None
answer_item['content'] = answer['content'] if 'content' in answer else None
answer_item['parise_num'] = answer['voteup_count']
answer_item['comments_num'] = answer['comment_count']
answer_item['update_time'] = answer['updated_time']
answer_item['create_time'] = answer['created_time']
answer_item['crawl_time'] = datetime.datetime.now()
yield answer_item
if not is_end:
yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)
提取字段的css
选择还是老样子。大致讲一讲思路:先parse
函数提取出页面上面存在的url
,并分析是不是和问题相关的url
,是的话就交给下载器并且调用回调函数parse_detail
,进行字段解析,如果不是问题页面,就交给下载器下载,继续分析页面里面带的url
,看有没有别的问题页面。相当于一种深度优先算法。
处理字段的函数parse_detail
里面,第一个yield
里面的url
是知乎答案内容请求链接,因为知乎的回答并不是一次性显示在页面上的,而是随着你的下拉,一点点请求来的,所以需要访问这个链接,里面的三个参数分别是:问题的id
,请求答案的数量
,从第几个答案开始请求
。下载页面之后调用回调函数parse_answer
,去分析回答,提取问题相关的回答字段。
而parse_answer
方法里面的yield
的url
是可以从上一个request
请求返回值里面提取出来的。
保存Item到数据库
上面两个提取的步骤,最后都把Item
给yield
出去了,这些Item
就会被送到pipelines
里面,所以还是先去pipelines
里面看看。
看过我之前写的一个关于伯乐在线的爬虫教程笔记的话,就应该知道pipelines.py
文件里面有一个MysqlTwistedPipeline
类,用于异步操作插入数据库。其中的do_insert
函数来看看:1
2
3
4
5
6def do_insert(self, cursor, item):
insert_sql = """
insert into jobbole_article(title, create_date, url, url_object_id, front_image_url, front_image_path, comment_nums, fav_nums, praise_nums, tags, content)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_sql, (item['title'], item['create_date'], item['url'], item['url_object_id'], item['front_image_url'], item['front_image_path'], item['comment_nums'], item['fav_nums'], item['praise_nums'], item['tags'], item['content']))
所有的item
都会路由到这里,但是不同的item
要有不同的操作,但是这又是一个异步的机制,要是能知道这个item
是哪个就好了。
可以通过item.__class__.__name__
这个属性,来得到这个item
实例的名字,和之前定义的对比一下,再来写也是可以的,但是这样有些问题,名字变了之后就会出错;而且当数据量大的时候,这每一个操作都会打开一个数据库,后面数据库的链接会变得很多,浪费资源。
所以这里采用的是一种类似于django
的思想,把所有的底层都屏蔽掉,只调用一个方法就可以。代码如下:1
2
3
4
5def do_insert(self, cursor, item):
# 执行具体的插入语句
# if item.__class__.__name__ == 'JobBoleArticleItem': # 这种方法有点死,名字变了就会出错
insert_sql, params = item.get_insert_sql()
cursor.execute(insert_sql, params)
这里我们为每个item
定义一个get_inset_sql
方法,就在这里屏蔽掉了不同的item
插入的语句不一样的问题。
所以这里又回到items.py
文件里面,之前定义的两个item
变成了如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72# from ArticleSpider.settings import SQL_DATETIME_FORMAT, SQL_DATE_FORMAT
# 下面这两个量定义在settings里面,再引入过来
# SQL_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
# SQL_DATE_FORMAT = "%Y-%m-%d"
class ZhihuQuestionItem(scrapy.Item):
# 知乎的问题 item
zhihu_id = scrapy.Field()
topics = scrapy.Field()
url = scrapy.Field()
title = scrapy.Field()
content = scrapy.Field()
answer_num = scrapy.Field()
comments_num = scrapy.Field()
watch_user_num = scrapy.Field()
click_num = scrapy.Field()
crawl_time = scrapy.Field()
def get_insert_sql(self):
insert_sql = """
insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num), crawl_time=VALUES(crawl_time)
"""
zhihu_id = self['zhihu_id'][0]
topics = ','.join(self['topics'])
url = self['url'][0]
title = self['title'][0]
content = self['content'][0]
answer_num = int(self.get('answer_num', ['0'])[0].replace(',', ''))
comments_num = int(self['comments_num'][0].split(' ')[0]) if self['comments_num'][0] != '添加评论' else 0
watch_user_num = self['watch_user_num'][0]
click_num = self['watch_user_num'][1]
crawl_time = datetime.datetime.now().strftime(SQL_DATE_FORMAT)
params = (zhihu_id, topics, url, title, content, answer_num, comments_num, watch_user_num, click_num, crawl_time)
return insert_sql, params
class ZhihuAnswerItem(scrapy.Item):
# 知乎的回答
zhihu_id = scrapy.Field()
url = scrapy.Field()
question_id = scrapy.Field()
author_id = scrapy.Field()
content = scrapy.Field()
parise_num = scrapy.Field()
comments_num = scrapy.Field()
create_time = scrapy.Field()
update_time = scrapy.Field()
crawl_time = scrapy.Field()
def get_insert_sql(self):
insert_sql = """
insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, praise_num, comments_num,
create_time, update_time, crawl_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num),
praise_num=VALUES(praise_num), update_time=VALUES(update_time), crawl_time=VALUES(crawl_time)
"""
zhihu_id = self['zhihu_id']
url = self['url']
question_id = self['question_id']
author_id = self['author_id']
content = self['content']
parise_num = self['parise_num']
comments_num = self['comments_num']
create_time = datetime.datetime.fromtimestamp(self['create_time']).strftime(SQL_DATETIME_FORMAT)
update_time = datetime.datetime.fromtimestamp(self['update_time']).strftime(SQL_DATETIME_FORMAT)
crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT)
params = (zhihu_id, url, question_id, author_id, content, parise_num, comments_num, create_time, update_time, crawl_time)
return insert_sql, params
把item
的最终处理也放到了这里。和之前伯乐在线的处理方式不一样,其实item
的处理方式有很多种,可以自己选。
其中需要注意的第一个地方是有两个量是定义在settings
里面的,需要自己添加进去,再导入进来,也可以不这么麻烦。
第二个需要注意的地方是ZhihuQuestionItem
类里面的回答数answer_num
,实际运行的时候,有可能没有回答数,所以这个字段不存在,于是用的是self.get
方法来获取,这个方法比较保险,如果没获取到,还可以填写一个默认值,就不会报错。类似的还有一个评论数comments_num
。
最后需要注意的地方时,sql
语句里面有一个ON DUPLICATE KEY UPDATE content=VALUES(content)···
(···表示省略),这个的意思是,如果主键存在的话,就换成更新里面的某些值。因为前面执行的是插入insert
,但是如果反复运行,一些问题肯定还会爬到,再次爬取到的时候,主键就会重复,就会插入失败,所以要换成更新update
的方式。
回顾一下
梳理一下整个流程:
- 首先重写了
start_requests
,用于模拟登陆。- 用
selenium
模拟登陆。 - 用
pickle
保存cookies
,可以反复用。 - 调用回调函数
check_login
,执行原本start_requests
该执行的。
- 用
- 在
parse
里面提取页面中问答页面,并交给下载器,如果无关,就继续追踪。 - 在
items.py
文件里面创建item
类,保存提取信息。 - 在
parse_detail
里面处理下载的问答页面,创建itemloader
,统一提取字段并保存到item
。 - 在
parse_answer
里面处理下载的回答json
,同上保存到item
里面。 - 在
pipelines.py
文件中创建MysqlTwistedPipeline
类,用于异步将数据插入数据库。- 通过在
item
里面创建统一的方法,解决了不同item
需要不同插入语句的问题。
- 通过在
全部代码
老规矩还是放在Github
上面比较好,会混有伯乐在线的代码,请注意。
全部代码点这里