Python如何使用队列方式实现多线程爬虫
说明:糗事百科段子的爬取,采用了队列和多线程的方式,其中关键点是Queue.task_done()、Queue.join(),保证了线程的有序进行。
代码如下
importrequests
fromlxmlimportetree
importjson
fromqueueimportQueue
importthreading
classQsbk(object):
def__init__(self):
self.headers={
"User-Agent":"Mozilla/5.0(X11;Linuxx86_64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/81.0.4044.138Safari/537.36",
"Referer":"https://www.qiushibaike.com/"
}
#实例化三个队列,用来存放内容
self.url_queue=Queue()
self.html_queue=Queue()
self.content_queue=Queue()
defget_total_url(self):
"""
获取了所有的页面url,并且返回url_list
return:url_list
现在放入url_queue队列中保存
"""
url_temp="https://www.qiushibaike.com/text/page/{}/"
url_list=list()
foriinrange(1,13):
#url_list.append(url_temp.format(i))
#将生成的url放入url_queue队列
self.url_queue.put(url_temp.format(i))
defparse_url(self):
"""
发送请求,获取响应,同时etree处理html
"""
whileself.url_queue.not_empty:
#判断非空,为空时结束循环
#从队列中取出一个url
url=self.url_queue.get()
print("parsingurl:",url)
#发送请求
response=requests.get(url,headers=self.headers,timeout=10)
#获取html字符串
html=response.content.decode()
#获取element类型的html
html=etree.HTML(html)
#将生成的element对象放入html_queue队列
self.html_queue.put(html)
#Queue.task_done()在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
self.url_queue.task_done()
defget_content(self):
"""
解析网页内容,获取想要的信息
"""
whileself.html_queue.not_empty:
items=list()
html=self.html_queue.get()
total_div=html.xpath("//div[@class='col1old-style-col1']/div")
foriintotal_div:
author_img=i.xpath(".//a[@rel='nofollow']/img/@src")
author_img="https"+author_img[0]iflen(author_img)>0elseNone
author_name=i.xpath(".//a[@rel='nofollow']/img/@alt")
author_name=author_name[0]iflen(author_name)>0elseNone
author_href=i.xpath("./a/@href")
author_href="https://www.qiushibaike.com/"+author_href[0]iflen(author_href)>0elseNone
author_gender=i.xpath("./div[1]/div/@class")
author_gender=author_gender[0].split("")[-1].replace("Icon","").strip()iflen(author_gender)>0elseNone
author_age=i.xpath("./div[1]/div/text()")
author_age=author_age[0]iflen(author_age)>0elseNone
content=i.xpath("./a/div/span/text()")
content=content[0].strip()iflen(content)>0elseNone
content_vote=i.xpath("./div[@class='stats']/span[@class='stats-vote']/i/text()")
content_vote=content_vote[0]iflen(content_vote)>0elseNone
content_comment_numbers=i.xpath("./div[@class='stats']/span[@class='stats-comments']/a/i/text()")
content_comment_numbers=content_comment_numbers[0]iflen(content_comment_numbers)>0elseNone
item={
"author_name":author_name,
"author_age":author_age,
"author_gender":author_gender,
"author_img":author_img,
"author_href":author_href,
"content":content,
"content_vote":content_vote,
"content_comment_numbers":content_comment_numbers,
}
items.append(item)
self.content_queue.put(items)
#task_done的时候,队列计数减一
self.html_queue.task_done()
defsave_items(self):
"""
保存items
"""
whileself.content_queue.not_empty:
items=self.content_queue.get()
withopen("quishibaike.txt",'a',encoding='utf-8')asf:
foriinitems:
json.dump(i,f,ensure_ascii=False,indent=2)
self.content_queue.task_done()
defrun(self):
#获取urllist
thread_list=list()
thread_url=threading.Thread(target=self.get_total_url)
thread_list.append(thread_url)
#发送网络请求
foriinrange(10):
thread_parse=threading.Thread(target=self.parse_url)
thread_list.append(thread_parse)
#提取数据
thread_get_content=threading.Thread(target=self.get_content)
thread_list.append(thread_get_content)
#保存
thread_save=threading.Thread(target=self.save_items)
thread_list.append(thread_save)
fortinthread_list:
#为每个进程设置为后台进程,效果是主进程退出子进程也会退出
t.setDaemon(True)
t.start()
#让主线程等待,所有的队列为空的时候才能退出
self.url_queue.join()
self.html_queue.join()
self.content_queue.join()
if__name__=="__main__":
obj=Qsbk()
obj.run()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。