go语言爬虫并发

Concurrent Crawler In Go

Posted by BY on August 19, 2018

前言

使用go语言做点东西,跟着网上大神一起学习,做做笔记。

正文

学习一个语言快速的方法就是不断的用。 接着前一章节继续学习go语言的爬虫。由于go语言是对并发有着良好的支持,在原有的单任务版爬虫的基础上,改动并不是很大,将原有的解析器parser和获取器fetcher合并成为新的抽象worker。由于在单任务版的基础上会有很多个worker一同协同工作,所以在原有的基础上需要增加调度器scheduler这一层抽象,来分配任务调度worker工作。爬虫引擎engine将请求request给调度scheduler,调度将request分配给worker,随后worker将请求request和item存回引擎engine。引擎engine和调度schedule和worker之间不再是调用进行传递,而是借助于channel进行传递。其中并发爬虫一共有两套解决方案,第一套是所有worker共用一个输入,即所有worker去抢来自调度scheduler的请求request。

步骤1

将原有的engine.go文件调整 将原来engine.go文件名改为simple.go。将原来该文件中的获取页面和解析页面提取出来,封装成worker函数。并且定义一个内容空的结构体SimpleEngine。原来的run方法归为SimpleEngine指针来调用。

//简单引擎 并将engine.go改为simple.go
type SimpleEngine struct {}
func (e *SimpleEngine)Run(seeds ...Request) {
	var requests [] Request
	for _, r := range seeds {
		requests = append(requests, r)
	}

	for len(requests) > 0 {
		r := requests[0]
		requests = requests[1:]
		//在此处调用
		parseResult, err := worker(r)
		if nil != err {
			continue
		}
		requests = append(requests, parseResult.Requests...)
		for _, item := range parseResult.Items {
			log.Printf("Got item %v", item)
		}
	}
}
//将原来的获取页面和解析页面提取出来
func worker(r Request) (ParseResult, error) {
	log.Printf("Fetching %s", r.Url)
	body, err := fetcher.Fetch(r.Url)
	if err != nil {
		log.Printf("Fetcher:error fetching url %s %v", r.Url, err)
		return ParseResult{}, err
	}
	return r.ParserFunc(body), nil
}

步骤2

新增concurrent.go文件 定义一个结构体ConcurrentEngine,其中包括一个调度器Scheduler(接口),和一个可配置的WorkerCount。 定义一个接口为Scheduler,包括两个方法,一个是提交请求Submit方法和配置主工作channel的方法ConfigureMasterWorkerChan。

//并发引擎 ./engine/concurrent.go
type ConcurrentEngine struct {
	Scheduler Scheduler
	WorkerCount int
}  
type Scheduler interface {
	Submit(Request)
	ConfigureMasterWorkerChan(chan Request)
}

func (e *ConcurrentEngine )Run(seeds ...Request) {
	in := make(chan Request)
	out := make(chan ParseResult)
	e.Scheduler.ConfigureMasterWorkerChan(in)
	for i:= 0; i < e.WorkerCount; i++ {
		createWorker(in, out)
	}
	for _, r := range seeds {
		e.Scheduler.Submit(r)
	}
	for {
		result := <- out;
		for _, item := range result.Items{
			fmt.Printf("Got item: %v", item)
		}
		for _, request := range result.Requests{
			e.Scheduler.Submit(request)
		}
	}
}

func createWorker(in chan Request, out chan ParseResult) {
	go func() {
		for {
			request := <- in
			//调用worker函数,在simple.go函数
			result, err := worker(request)
			if nil != err {
				continue
			}
			out <- result
		}
	}()
}

步骤3

新增简单调度器 新增simple.go文件。

//简单调度 ./scheduler/simple.go
type SimpleScheduler struct {
	workerChan chan engine.Request
}
//要改变s里的内容,需要加*
func (s *SimpleScheduler) Submit(r engine.Request){
	go func(){s.workerChan <- r}()
}
func (s *SimpleScheduler) ConfigureMasterWorkerChan(c chan engine.Request){
	s.workerChan = c
}

步骤4

更改main方法中的配置

func main() {
	e := engine.ConcurrentEngine{
		Scheduler: &scheduler.SimpleScheduler{},
		WorkerCount:10,
	}
	e.Run(engine.Request{
		Url: url,
		ParserFunc:parser.ParseCityList,
	})
}

步骤3

整体架构 可以想象成我们手动的点击网页。一、首先我们点击城市列表,将会出现很多城市。二、我们点开城市,将会看到很多用户的简要信息,并且这些用户信息是以分页的形式进行展示的。三、点开用户的简要信息后,将会看到这个用户的详细信息。我们可以从上述操作可以看出,城市列表页面、城市下用户简要信息页面和用户详细信息页面的展示方式有差别,我们可以将这样的结构抽象成为树形结构,城市列表拥有很多个城市节点,城市节点下有很多用户节点。同样按照手动点击页面的方式,首先需要城市列表的解析器,其次需要城市的解析器,最后还需要用户的解析器。每一层都对应一个解析器。将每一个抓取数据的请求,可以抽象为URL和其对应的解析器。每一个解析器的输入都是UTF-8编码的文本,输出是叶子请求(URL和子解析器,例如城市页面对应的解析器的输出为用户URL和用户解析器)。将单机版爬虫分为种子(初始请求)、引擎、获取器、任务队列。首先将种子加入任务队列,引擎从队列中获取一个请求,引擎根据请求中的Url使用获取器来获取页面,获取器返回文本,然后引擎调用请求中对应的解析器解析返回请求列表,引擎将返回的加入任务队列,直到引擎处理完所有的请求。 调度引擎

//引擎 ./engine/engine.go
func Run(seeds ...Request) {
	var requests [] Request
	for _, r := range seeds {
		requests = append(requests, r)
	}

	for len(requests) > 0 {
		r := requests[0]
		requests = requests[1:]
		log.Printf("Fetching %s", r.Url)
		body, err := fetcher.Fetch(r.Url)
		if err != nil {
			log.Printf("Fetcher:error fetching url %s %v", r.Url, err)
			continue
		}
		parseResult := r.ParserFunc(body)
		requests = append(requests, parseResult.Requests...)
		for _, item := range parseResult.Items {
			log.Printf("Got item %v", item)
		}
	}
}

类型

//类型 ./engine/type.go
type Request struct {
	Url string
	ParserFunc func([]byte) ParseResult
}

type ParseResult struct {
	Requests []Request
	Items []interface{}
}
//用户为解析树上的叶子,当用户解析器解析完后返回什么都不做的解析器
func NilParser([]byte) ParseResult{
	return ParseResult{}
}

页面获取器

//获取器 ./fetcher/fetch.go
//用户获取页面
func Fetch(url string) ([]byte, error) {
	resp, err := http.Get(url)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		//fmt.Println("Error: status code", resp.StatusCode)
		return nil, fmt.Errorf("wrong status code:%d", resp.StatusCode)
	}
	bodyReader := bufio.NewReader(resp.Body)
	e := determineEncoding(bodyReader)
	utf8Reader := transform.NewReader(resp.Body, e.NewDecoder())
	return ioutil.ReadAll(utf8Reader)
}
//检测页面编码格式
func determineEncoding(r *bufio.Reader) encoding.Encoding {
	bytes, err := r.Peek(1024)
	if err != nil {
		log.Printf("fetcher error: %v",err)
		return unicode.UTF8
	}
	e, _, _ := charset.DetermineEncoding(bytes, "")
	return e
}

解析器之城市列表解析器

//城市列表解析器 ./zhenai/parser/citylist.go 爬取的网站相关
func ParseCityList(content []byte) engine.ParseResult{
	//<a target="_blank" href="http://city.zhenai.com/dongguan">东莞</a>
	//<a href="http://city.zhenai.com/weinan"
	//								class="">渭南</a>
	re := regexp.MustCompile(`<a href="(http://city.zhenai.com/[a-zA-Z0-9]+)"[^>]*>([^<]*)</a>`)
	matches := re.FindAllSubmatch(content, -1)
	result := engine.ParseResult{}
	for _, match := range matches {
		result.Items = append(result.Items, string(match[2]))
		result.Requests = append(result.Requests,
			engine.Request{Url:string(match[1]),
							ParserFunc: engine.NilParser,
		})
	}
	return result
}

城市解析器

//城市解析器 ./zhenai/parser/city.go 爬取的网站相关
const cityRe = `<a href="(http://album.zhenai.com/u/[0-9]+)" target="_blank">([^<]*)</a>`
const curCity = `地区:</dt>
			
				<dd><a  href="http://city.zhenai.com/[a-zA-Z0-9]+" class="cur" >([^<]*)</a></dd>`
const nextPage = `<a class="next-page"
				href="(http://city.zhenai.com/[a-zA-Z0-9]+/[0-9]+)">下一页</a>`
//将下一页网址加入对列中
func ParseCity(contents []byte) engine.ParseResult {
	re := regexp.MustCompile(cityRe)
	matches := re.FindAllSubmatch(contents, -1)

	result := engine.ParseResult{}
	reNextPage := regexp.MustCompile(nextPage)
	mnp := reNextPage.FindAllSubmatch(contents,1)
	if nil != mnp {
		reCurCity := regexp.MustCompile(curCity)
		mcc := reCurCity.FindAllSubmatch(contents,1)
		if nil != mcc {
			result.Items = append(result.Items, "City "+string(mcc[0][1]))
			fmt.Println("run here")
			result.Requests = append(result.Requests,
				engine.Request{Url:string(mnp[0][1]),
					ParserFunc: ParseCity,
				})
		}

	}
	for _, m := range matches {
		name := string(m[2])
		result.Items = append(result.Items, "User "+string(m[2]))
		result.Requests = append(result.Requests, engine.Request{
			Url:string(m[1]),
			ParserFunc: func(c []byte) engine.ParseResult{
				return ParseProfile(c, name)
			},
		})
	}
	return result
}

用户简介解析器

//用户简介解析器 ./zhenai/parser/profile.go 爬取的网站相关
const ageReStr = `<span class="label">年龄:</span>([\d]+)岁</td>`
const heightReStr = `<span class="label">身高:</span>([\d]+)CM</td>`
const incomeReStr = `<td><span class="label">月收入:</span>([^<]+)</td>`
const weightReStr = `<span class="label">体重:</span><span field="">([\d]+)KG</span>`
const genderReStr = `<span class="label">性别:</span><span field="">([^<]+)</span>`
const xinzuoReStr = `<span class="label">星座:</span><span field="">([^<]+)</span>`
const marriageReStr = `<td><span class="label">婚况:</span>([^<]+)</td>`
const educationReStr = `<td><span class="label">学历:</span>([^<]+)</td>`
const occupationReStr = `<td><span class="label">职业:</span><span field="">([^<]+)</span></td>`
const hokouReStr = `<td><span class="label">籍贯:</span>([^<]+)</td>`
const houseReStr = `<span class="label">住房条件:</span><span field="">([^<]+)</span>`
const carReStr  = `<span class="label">是否购车:</span><span field="">([^<]+)</span>`

var ageRe = regexp.MustCompile(ageReStr)
var heightRe = regexp.MustCompile(heightReStr)
var incomeRe = regexp.MustCompile(incomeReStr)
var weightRe = regexp.MustCompile(weightReStr)
var genderRe = regexp.MustCompile(genderReStr)
var xinzuoRe = regexp.MustCompile(xinzuoReStr)
var marriageRe = regexp.MustCompile(marriageReStr)
var educationRe = regexp.MustCompile(educationReStr)
var occupationRe = regexp.MustCompile(occupationReStr)
var hokouRe = regexp.MustCompile(hokouReStr)
var houseRe = regexp.MustCompile(houseReStr)
var carRe = regexp.MustCompile(carReStr)
func ParseProfile(contents []byte, name string) engine.ParseResult {
	profile := model.Profile{}
	age, err := strconv.Atoi(extraString(contents, ageRe))
	if nil == err {
		profile.Age = age
	}
	height, err := strconv.Atoi(extraString(contents, heightRe))
	if nil == err {
		profile.Height = height
	}
	weight, err := strconv.Atoi(extraString(contents, weightRe))
	if nil == err {
		profile.Weight = weight
	}
	profile.Income = extraString(contents, incomeRe)
	profile.Gender = extraString(contents, genderRe)
	profile.Xinzuo = extraString(contents, xinzuoRe)
	profile.Marriage = extraString(contents, marriageRe)
	profile.Education = extraString(contents, educationRe)
	profile.Occupation = extraString(contents, occupationRe)
	profile.Hokou = extraString(contents, hokouRe)
	profile.House = extraString(contents, houseRe)
	profile.Car = extraString(contents, carRe)
	profile.Name = name
	result := engine.ParseResult{
		Items: []interface{}{profile},
	}
	return result
}
func extraString(contents []byte, re *regexp.Regexp) string {
	match := re.FindSubmatch(contents)
	if len(match) >= 2 {
		return string(match[1])
	} else {
		return ""
	}
}

main方法体

//main方法体 ./main.go
func main() {
	engine.Run(engine.Request{
		Url: url,
		ParserFunc:parser.ParseCityList,
	})
}

总结:

单机版爬虫已完成

结语

后面章节将介绍并发版爬虫。