Python爬虫学习笔记兼教程(三)——实战知乎

引言

记录并简单的教程一下,知乎上爬取问题和答案的过程。包括模拟登录。


Cookie和Session

首先讲解一下CookieSession的区别。

Cookie是保存在本地的一种信息,而且是一种类似于dick的保存方式,是一个文本。

http最开始是一种无状态的请求,请求什么,返回什么,前后之间没有联系。为了保存用户的浏览信息,更有效率的返回信息,就有了Cookie,就有了有状态的请求。

但是这样,如果有人看到电脑上的Cookie,就可以通过其看到一些个人的一些敏感信息。所以有了Session

Session

Cookie类似,但是Session生成的一些东西,是根据用户上传的信息进行变换之后生成的id,并不包含敏感信息本身,而这个Session生成的id也保存在服务器的数据库中。因为这个有过期时间,所以不用担心被盗用。


模拟登陆

教程里面写了知乎的模拟登陆,是因为以前知乎不登录无法获取问题和答案,但是现在的知乎好像没有这样的规定了。所以爬虫的时候应该可以去掉模拟登陆,但是还是讲一讲。

教程里面的模拟登陆已经过时了,而且网上能找到的模拟登陆也都不适合现在的知乎了。现在知乎登录post的是一段密文,懒懒的我并不想去解密(其实是不会XD)。于是推荐用selenium,这个是一个python的模拟浏览器的模块,可以手动登录然后将cookies保存下来供scrapy用。所以是一种通用方案。

这里需要用到selenium库和chromedriver.exe,直接上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
zhihu_session = requests.session()
chromePath = r'G:\PythonWork\ArticleSpider\chromedriver.exe'
wd = webdriver.Chrome(executable_path=chromePath)
loginUrl = 'https://www.zhihu.com/signin'
wd.get(loginUrl)
wd.find_element_by_xpath('//*[@id="root"]/div/main/div/div/div/div[2]/div[1]/form/div[1]/div[2]/div[1]/input').send_keys('用户名')
wd.find_element_by_xpath('//*[@id="root"]/div/main/div/div/div/div[2]/div[1]/form/div[2]/div/div[1]/input').send_keys('密码')
# 通过url的变化判断是否登陆成功
while wd.current_url == loginUrl:
pass
cookies = wd.get_cookies()
for cookie in cookies:
zhihu_session.cookies.set(cookie['name'], cookie['value'])
wd.quit()
with open('zhihu_cookies.pkl', 'wb') as f:
# 用pickle这个库来把cookies保存到本地,就不用每次都模拟登陆了。
pickle.dump(zhihu_session.cookies, f, 0)
# zhihu_session.cookies.get_dict()这个方法可以获得cookies

这里面要注意的是,一个是user-agent,这个是一个浏览器客户端的信息,会包含在http请求头里面,这个可以参考自己浏览器的更改。知乎会检测你这个是否合法,如果没有这个或者不合法,会返回500的错误给你,不让你获取页面,后面爬虫的时候也要注意。然后里面有一个while,用来判断是否登陆成功。这个是一个通常的用法,如何嵌入scrapy呢?

嵌入scrapy也很简单。先在spider文件夹下打开powershell或者命令行,输入scrapy genspider zhihu www.zhihu.com,就可以看到spider文件夹下多了一个zhihu.py文件。

打开,在里面重载这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 重载的函数
def start_requests(self):
zhihu_session = requests.session()
chromePath = r'G:\PythonWork\ArticleSpider\chromedriver.exe'
wd = webdriver.Chrome(executable_path=chromePath)
loginUrl = 'https://www.zhihu.com/signin'
wd.get(loginUrl)
wd.find_element_by_xpath(
'//*[@id="root"]/div/main/div/div/div/div[2]/div[1]/form/div[1]/div[2]/div[1]/input').send_keys(
'用户名')
wd.find_element_by_xpath(
'//*[@id="root"]/div/main/div/div/div/div[2]/div[1]/form/div[2]/div/div[1]/input').send_keys('密码')
# 通过url的变化判断是否登陆成功
while wd.current_url == loginUrl:
pass
cookies = wd.get_cookies()
for cookie in cookies:
zhihu_session.cookies.set(cookie['name'], cookie['value'])
wd.quit()
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'
zhihu_header = {
'Connection': 'keep - alive',
'User-Agent': user_agent
}
# yield scrapy.Request(url='https://www.zhihu.com/signin', headers=zhihu_header, cookies=cookies, callback=self.check_login)
return [scrapy.FormRequest('https://www.zhihu.com/signin', headers=zhihu_header, cookies=cookies, callback=self.check_login)]
# 自定义的函数
def check_login(self, response):
for url in self.start_urls:
yield scrapy.Request(url, dont_filter=True, headers=self.headers)

start_requests这个方法是scrapy的一个入口函数,如果没写,默认会从上面的start_urls开始(这里没有,你们自己的文件里面有)。

重载做的事:首先是将cookies尝试从本地读取,如果读的到,就带着cookies访问登录页面,如果cookies有效,就会自动跳转到首页,如果无效,链接不变,就需要重新模拟登陆获取cookies

start_requests函数的回调函数一定是要的,因为要模仿原来start_requests里面做的事情,而回调函数check_login最后一定要把requestyield出去,交给下载器。

因为scrapy是一个异步的框架,yield出去的实例,在scrapy里面会判断,如果是item就路由给pipeline,如果是链接,就交给下载器等等。


提取问答页面

上面的check_login函数执行完了之后,就会去执行parse,所以我们要在这个函数里面来提取问答页面,然后交给下载器并进一步分析里面的字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def parse(self, response):
"""
提取页面中的所有url,并进一步爬取
如果提取的url是 /question/xxx 就下载之后进入解析函数
"""
all_urls = response.css('a::attr(href)').extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
all_urls1 = filter(lambda x: True if x.startswith('https') else False, all_urls)
for url in all_urls1:
match_obj = re.match('(.*zhihu.com/question/(\d+))(/|$).*', url)
if match_obj:
# 页面提取到question相关的页面,则下载后交由提取函数提取
request_url = match_obj.group(1)
# question_id = match_obj.group(2)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_detail)
else:
# 如果不是question页面,就进一步跟踪
yield scrapy.Request(url, headers=self.headers, callback=self.parse)

首先是拼接url,使其变成完整的url,然后有些url其实是js文件,要去掉,所以用了startswith判断。

最后,观察链接可以发现,知乎的问答页面链接是这样的:

https://www.zhihu.com/question/29173647/answer/437189494

但是点一下查看全部回答,就会变成:

https://www.zhihu.com/question/29173647

其实就是把后面的一点去掉了,所以我们在访问的时候,就直接访问第二个就好,提取的时候,用正则表达式匹配出来。然后yield出去,并回调parse_detail函数来提取具体的字段。如果不是问答的页面,就继续交给parse跟踪里面的链接。


网页内容提取

这里还是需要用scrapy shell进行调试,但是有一点小技巧需要提醒。

之前说过访问知乎的时候需要带上heards才不会返回500的错误,所以在这里也需要加上heards,但是不需要全部,只需要user-agent,用-s USER_AGENT=""这样的参数来添加user-agent参数,完整如下:

1
scrapy shell -s USER_AGENT="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36" https://www.zhihu.com/question/285353546

创建Item

提取内容之前,还是先创建Item,让内容能有地方保存,然后可以集中处理。

items.py文件里面创建两个类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class ZhihuQuestionItem(scrapy.Item):
# 知乎的问题 item
zhihu_id = scrapy.Field()
topics = scrapy.Field()
url = scrapy.Field()
title = scrapy.Field()
content = scrapy.Field()
answer_num = scrapy.Field()
comments_num = scrapy.Field()
watch_user_num = scrapy.Field()
click_num = scrapy.Field()
crawl_time = scrapy.Field()


class ZhihuAnswerItem(scrapy.Item):
# 知乎的回答
zhihu_id = scrapy.Field()
url = scrapy.Field()
question_id = scrapy.Field()
author_id = scrapy.Field()
content = scrapy.Field()
parise_num = scrapy.Field()
comments_num = scrapy.Field()
create_time = scrapy.Field()
update_time = scrapy.Field()
crawl_time = scrapy.Field()

从名字都可以看得出来,提取出了一些关键的字段。

提取Item

然后就是spiders里面的zhihu.py,教程里面因为当时知乎在改版,新版老版都混在一起,所以有两套提取,现在不用了,统一可以用下面的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
start_answer_urls = 'https://www.zhihu.com/api/v4/questions/{0}/answers?include=data[*].is_normal,admin_closed_comment,reward_info,is_collapsed,annotation_action,annotation_detail,collapse_reason,is_sticky,collapsed_by,suggest_edit,comment_count,can_comment,content,editable_content,voteup_count,reshipment_settings,comment_permission,created_time,updated_time,review_info,relevant_info,question,excerpt,relationship.is_authorized,is_author,voting,is_thanked,is_nothelp;data[*].mark_infos[*].url;data[*].author.follower_count,badge[?(type=best_answerer)].topics&limit={1}&offset={2}&sort_by=default'


def parse(self, response):
"""
提取页面中的所有url,并进一步爬取
如果提取的url是 /question/xxx 就下载之后进入解析函数
"""
all_urls = response.css('a::attr(href)').extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
all_urls1 = filter(lambda x: True if x.startswith('https') else False, all_urls)
for url in all_urls1:
match_obj = re.match('(.*zhihu.com/question/(\d+))(/|$).*', url)
if match_obj:
# 页面提取到question相关的页面,则下载后交由提取函数提取
request_url = match_obj.group(1)
# question_id = match_obj.group(2)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_detail)
else:
# 如果不是question页面,就进一步跟踪
yield scrapy.Request(url, headers=self.headers, callback=self.parse)

def parse_detail(self, response):
# 处理question页面,从页面中提取quesiont item

match_obj = re.match('(.*zhihu.com/question/(\d+))(/|$).*', response.url)
if match_obj:
question_id = int(match_obj.group(2))

item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
item_loader.add_css('title', 'h1.QuestionHeader-title::text')
item_loader.add_css('content', '.QuestionHeader-detail') # .QuestionHeader-detail span.RichText::text
item_loader.add_value('url', response.url)
item_loader.add_value('zhihu_id', question_id)
item_loader.add_css('answer_num', '.List-headerText span::text')
item_loader.add_css('comments_num', '.QuestionHeader-Comment button.Button--plain::text')
item_loader.add_css('watch_user_num', '.NumberBoard-itemInner strong.NumberBoard-itemValue::attr("title")')
item_loader.add_css('topics', '.QuestionHeader-topics .Popover div::text')

question_item = item_loader.load_item()

yield scrapy.Request(self.start_answer_urls.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer)
yield question_item

def parse_answer(self, response):
# 处理question的answer
answer_json = json.loads(response.text)
is_end = answer_json["paging"]['is_end']
totals_answer = answer_json["paging"]['totals']
next_url = answer_json["paging"]['next']

# 提取answer的具体字段
for answer in answer_json['data']:
answer_item = ZhihuAnswerItem()
answer_item['zhihu_id'] = answer['id']
answer_item['url'] = answer['url']
answer_item['question_id'] = answer['question']['id']
answer_item['author_id'] = answer['author']['id'] if 'id' in answer['author'] else None
answer_item['content'] = answer['content'] if 'content' in answer else None
answer_item['parise_num'] = answer['voteup_count']
answer_item['comments_num'] = answer['comment_count']
answer_item['update_time'] = answer['updated_time']
answer_item['create_time'] = answer['created_time']
answer_item['crawl_time'] = datetime.datetime.now()
yield answer_item

if not is_end:
yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)

提取字段的css选择还是老样子。大致讲一讲思路:先parse函数提取出页面上面存在的url,并分析是不是和问题相关的url,是的话就交给下载器并且调用回调函数parse_detail,进行字段解析,如果不是问题页面,就交给下载器下载,继续分析页面里面带的url,看有没有别的问题页面。相当于一种深度优先算法。

处理字段的函数parse_detail里面,第一个yield里面的url是知乎答案内容请求链接,因为知乎的回答并不是一次性显示在页面上的,而是随着你的下拉,一点点请求来的,所以需要访问这个链接,里面的三个参数分别是:问题的id请求答案的数量从第几个答案开始请求。下载页面之后调用回调函数parse_answer,去分析回答,提取问题相关的回答字段。

parse_answer方法里面的yieldurl是可以从上一个request请求返回值里面提取出来的。


保存Item到数据库

上面两个提取的步骤,最后都把Itemyield出去了,这些Item就会被送到pipelines里面,所以还是先去pipelines里面看看。

看过我之前写的一个关于伯乐在线的爬虫教程笔记的话,就应该知道pipelines.py文件里面有一个MysqlTwistedPipeline类,用于异步操作插入数据库。其中的do_insert函数来看看:

1
2
3
4
5
6
def do_insert(self, cursor, item):
insert_sql = """
insert into jobbole_article(title, create_date, url, url_object_id, front_image_url, front_image_path, comment_nums, fav_nums, praise_nums, tags, content)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_sql, (item['title'], item['create_date'], item['url'], item['url_object_id'], item['front_image_url'], item['front_image_path'], item['comment_nums'], item['fav_nums'], item['praise_nums'], item['tags'], item['content']))

所有的item都会路由到这里,但是不同的item要有不同的操作,但是这又是一个异步的机制,要是能知道这个item是哪个就好了。

可以通过item.__class__.__name__这个属性,来得到这个item实例的名字,和之前定义的对比一下,再来写也是可以的,但是这样有些问题,名字变了之后就会出错;而且当数据量大的时候,这每一个操作都会打开一个数据库,后面数据库的链接会变得很多,浪费资源。

所以这里采用的是一种类似于django的思想,把所有的底层都屏蔽掉,只调用一个方法就可以。代码如下:

1
2
3
4
5
def do_insert(self, cursor, item):
# 执行具体的插入语句
# if item.__class__.__name__ == 'JobBoleArticleItem': # 这种方法有点死,名字变了就会出错
insert_sql, params = item.get_insert_sql()
cursor.execute(insert_sql, params)

这里我们为每个item定义一个get_inset_sql方法,就在这里屏蔽掉了不同的item插入的语句不一样的问题。

所以这里又回到items.py文件里面,之前定义的两个item变成了如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# from ArticleSpider.settings import SQL_DATETIME_FORMAT, SQL_DATE_FORMAT
# 下面这两个量定义在settings里面,再引入过来
# SQL_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
# SQL_DATE_FORMAT = "%Y-%m-%d"
class ZhihuQuestionItem(scrapy.Item):
# 知乎的问题 item
zhihu_id = scrapy.Field()
topics = scrapy.Field()
url = scrapy.Field()
title = scrapy.Field()
content = scrapy.Field()
answer_num = scrapy.Field()
comments_num = scrapy.Field()
watch_user_num = scrapy.Field()
click_num = scrapy.Field()
crawl_time = scrapy.Field()

def get_insert_sql(self):
insert_sql = """
insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num), crawl_time=VALUES(crawl_time)
"""
zhihu_id = self['zhihu_id'][0]
topics = ','.join(self['topics'])
url = self['url'][0]
title = self['title'][0]
content = self['content'][0]
answer_num = int(self.get('answer_num', ['0'])[0].replace(',', ''))
comments_num = int(self['comments_num'][0].split(' ')[0]) if self['comments_num'][0] != '添加评论' else 0
watch_user_num = self['watch_user_num'][0]
click_num = self['watch_user_num'][1]
crawl_time = datetime.datetime.now().strftime(SQL_DATE_FORMAT)
params = (zhihu_id, topics, url, title, content, answer_num, comments_num, watch_user_num, click_num, crawl_time)
return insert_sql, params


class ZhihuAnswerItem(scrapy.Item):
# 知乎的回答
zhihu_id = scrapy.Field()
url = scrapy.Field()
question_id = scrapy.Field()
author_id = scrapy.Field()
content = scrapy.Field()
parise_num = scrapy.Field()
comments_num = scrapy.Field()
create_time = scrapy.Field()
update_time = scrapy.Field()
crawl_time = scrapy.Field()

def get_insert_sql(self):
insert_sql = """
insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, praise_num, comments_num,
create_time, update_time, crawl_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num),
praise_num=VALUES(praise_num), update_time=VALUES(update_time), crawl_time=VALUES(crawl_time)
"""

zhihu_id = self['zhihu_id']
url = self['url']
question_id = self['question_id']
author_id = self['author_id']
content = self['content']
parise_num = self['parise_num']
comments_num = self['comments_num']
create_time = datetime.datetime.fromtimestamp(self['create_time']).strftime(SQL_DATETIME_FORMAT)
update_time = datetime.datetime.fromtimestamp(self['update_time']).strftime(SQL_DATETIME_FORMAT)
crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT)

params = (zhihu_id, url, question_id, author_id, content, parise_num, comments_num, create_time, update_time, crawl_time)
return insert_sql, params

item的最终处理也放到了这里。和之前伯乐在线的处理方式不一样,其实item的处理方式有很多种,可以自己选。

其中需要注意的第一个地方是有两个量是定义在settings里面的,需要自己添加进去,再导入进来,也可以不这么麻烦。

第二个需要注意的地方是ZhihuQuestionItem类里面的回答数answer_num,实际运行的时候,有可能没有回答数,所以这个字段不存在,于是用的是self.get方法来获取,这个方法比较保险,如果没获取到,还可以填写一个默认值,就不会报错。类似的还有一个评论数comments_num

最后需要注意的地方时,sql语句里面有一个ON DUPLICATE KEY UPDATE content=VALUES(content)···(···表示省略),这个的意思是,如果主键存在的话,就换成更新里面的某些值。因为前面执行的是插入insert,但是如果反复运行,一些问题肯定还会爬到,再次爬取到的时候,主键就会重复,就会插入失败,所以要换成更新update的方式。


回顾一下

梳理一下整个流程:

  • 首先重写了start_requests,用于模拟登陆。
    • selenium模拟登陆。
    • pickle保存cookies,可以反复用。
    • 调用回调函数check_login,执行原本start_requests该执行的。
  • parse里面提取页面中问答页面,并交给下载器,如果无关,就继续追踪。
  • items.py文件里面创建item类,保存提取信息。
  • parse_detail里面处理下载的问答页面,创建itemloader,统一提取字段并保存到item
  • parse_answer里面处理下载的回答json,同上保存到item里面。
  • pipelines.py文件中创建MysqlTwistedPipeline类,用于异步将数据插入数据库。
    • 通过在item里面创建统一的方法,解决了不同item需要不同插入语句的问题。

全部代码

老规矩还是放在Github上面比较好,会混有伯乐在线的代码,请注意。

全部代码点这里