当包裹被送到一个地点后,需要分配到不同的地方才算整个过程的结束。那么把这个过程放到分布式爬虫中,Master节点负责了前期搬运和分类包裹的工作,剩下的任务小伙伴们都猜到了,spider_Worker节点就承担了最后的配送任务。我们也可以换种理解的方法,Worker是一个搬运工人的意思。那么我们今天就spider_Worker节点在python分布式爬虫中的使用给大家带来分享。
在将多线程版本改写成分布式的爬虫,主要用的可跨平台的multiprocessing.managers的BaseManager模块,这个模块的主要功能就是将task_queue和result_queue两个队列注册成函数暴露到网上去,Master节点监听端口,让Worker子节点去连接,不同主机之间就可以通过注册的函数来共享同步资源,Master节点主要负责发送任务和获取结果,Worker就获取任务队列的任务开始跑,并将获取的结果存储到数据库获取返回回来。
spider_Worker 节点主要调用spider()函数对任务进行处理,方法都类似,子节点每获取一个链接就传回Master, 另外需要注意的是Master文件只能运行一个,但Worker节点可以同时运行多个并行同步处理task任务队列。
spider_Master.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#coding:utf-8
from multiprocessing.managers import BaseManager
from Queue import Queue
import time
import argparse
import MySQLdb
import sys
page = 2
word = 'inurl:login.action'
output = 'test.txt'
page = (page+1) * 10
host = '127.0.0.1'
port = 500
urls = []
class Master():
def __init__(self):
self.task_queue = Queue() #server需要先创建两个共享队列,worker端不需要
self.result_queue = Queue()
def start(self):
BaseManager.register('get_task_queue',callable=lambda:self.task_queue) #在网络上注册一个get_task_queue函数,即把两个队列暴露到网上,worker端不需要callable参数
BaseManager.register('get_result_queue',callable=lambda:self.result_queue)
manager = BaseManager(address=(host,port),authkey='sir')
manager.start() #master端为start,即开始监听端口,worker端为connect
task = manager.get_task_queue() #master和worker都是从网络上获取task队列和result队列,不能在创建的两个队列
result = manager.get_result_queue()
print 'put task'
for i in range(0,page,10):
target = 'https://www.baidu.com/s?wd=%s&pn=%s'%(word,i)
print 'put task %s'%target
task.put(target)
print 'try get result'
while True:
try:
url = result.get(True,5) #获取数据时需要超时长一些
print url
urls.append(url)
except:
break
manager.shutdown()
if __name__ == '__main__':
start = time.time()
server = Master()
server.start()
print '共爬取数据%s条'%len(urls)
print time.time()-start
with open(output,'a') as f:
for url in urls:
f.write(url[1]+'\n')
conn = MySQLdb.connect('localhost','root','root','Struct',charset='utf8')
cursor = conn.cursor()
for record in urls:
sql = "insert into s045 values('%s','%s','%s')"%(record[0],record[1],str(record[2]))
cursor.execute(sql)
conn.commit()
conn.close()
spider_Worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#coding:utf-8
import re
import Queue
import time
import requests
from multiprocessing.managers import BaseManager
from bs4 import BeautifulSoup as bs
host = '127.0.0.1'
port = 500
class Worder():
def __init__(self):
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64; rv:50.0) Gecko/20100101 Firefox/50.0'}
def spider(self,target,result):
urls = []
pn = int(target.split('=')[-1])/10 +1
# print pn
# print target
html = requests.get(target,headers=self.headers)
soup = bs(html.text,"lxml")
res = soup.find_all(name="a", attrs={'class':'c-showurl'})
for r in res:
try:
h = requests.get(r['href'],headers=self.headers,timeout=3)
if h.status_code == 200:
url = h.url
# print url
time.sleep(1)
title = re.findall(r'',h.content)[0]
# print url,title
title = title.decode('utf-8')
print 'send spider url:',url
result.put((pn,url,title))
else:
continue
except:
continue
# return urls
def start(self):
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
print 'Connect to server %s'%host
m = BaseManager(address=(host,port),authkey='sir')
m.connect()
task = m.get_task_queue()
result = m.get_result_queue()
print 'try get queue'
while True:
try:
target = task.get(True,1)
print 'run pages %s'%target
res = self.spider(target,result)
# print res
except:
break
if __name__ == '__main__':
w = Worder()
w.start()
看完本篇文章,相信大家对spider_Worker节点这个辛勤的搬运工有了新的认识,毕竟它承担了爬虫任务最后的配送环节,大家对最终的结果都是比较期待的。
上一篇:scrapy可以独立在python分布式爬虫内使用吗?
下一篇: 没有了
一级建造师二级建造师消防工程师造价工程师土建职称房地产经纪人公路检测工程师建筑八大员注册建筑师二级造价师监理工程师咨询工程师房地产估价师 城乡规划师结构工程师岩土工程师安全工程师设备监理师环境影响评价土地登记代理公路造价师公路监理师化工工程师暖通工程师给排水工程师计量工程师
执业药师执业医师卫生资格考试卫生高级职称护士资格证初级护师主管护师住院医师临床执业医师临床助理医师中医执业医师中医助理医师中西医医师中西医助理口腔执业医师口腔助理医师公共卫生医师公卫助理医师实践技能内科主治医师外科主治医师中医内科主治儿科主治医师妇产科医师西药士/师中药士/师临床检验技师临床医学理论中医理论