分布式爬虫,可以从几个层面考虑,分别是代理层面、执行层面和存储层面。
通过设置代理池,我们可以将下载任务分配给不同节点执行,有助于提供爬虫的网页下载速度。同时,这样还能有效降低因爬取速度太快而导致IP 被禁的可能性。
colly 实现代理 IP 的代码如下:
package mainimport ( "github.com/gocolly/colly" "github.com/gocolly/colly/proxy")func main() { c := colly.NewCollector() if p, err := proxy.RoundRobinProxySwitcher( "socks5://127.0.0.1:1337", "socks5://127.0.0.1:1338", "http://127.0.0.1:8080", ); err == nil { c.SetProxyFunc(p) } // ...}
proxy.RoundRobinProxySwitcher 是 colly 内置的通过轮询方式实现代理切换的函数。当然,我们也可以完全自定义。
比如,一个代理随机切换的案例,如下:
var proxies []*url.URL = []*url.URL{ &url.URL{Host: "127.0.0.1:8080"}, &url.URL{Host: "127.0.0.1:8081"}, }func randomProxySwitcher(_ *http.Request) (*url.URL, error) { return proxies[random.Intn(len(proxies))], nil}// ...c.SetProxyFunc(randomProxySwitcher)
不过需要注意,此时的爬虫仍然是中心化的,任务只在一个节点上执行。
这种方式通过将任务分配给不同的节点执行,实现真正意义的分布式。
如果实现分布式执行,首先需要面对一个问题,如何将任务分配给不同的节点,实现不同任务节点之间的协同工作呢?
首先,我们选择合适的通信方案。常见的通信协议有 HTTP、TCP,一种无状态的文本协议、一个是面向连接的协议。除此之外,还可选择的有种类丰富的 RPC 协议,比如 Jsonrpc、facebook 的 thrift、google 的 grpc 等。
文档提供了一个 HTTP 服务示例代码,负责接收请求与任务执行。如下:
package main import ( "encoding/json" "log" "net/http" "github.com/gocolly/colly" ) type pageInfo struct { StatusCode int Links map[string]int } func handler(w http.ResponseWriter, r *http.Request) { URL := r.URL.Query().Get("url") if URL == "" { log.Println("missing URL argument") return } log.Println("visiting", URL) c := colly.NewCollector() p := &pageInfo{Links: make(map[string]int)} // count links c.OnHTML("a[href]", func(e *colly.HTMLElement) { link := e.Request.AbsoluteURL(e.Attr("href")) if link != "" { p.Links[link]++ } }) // extract status code c.OnResponse(func(r *colly.Response) { log.Println("response received", r.StatusCode) p.StatusCode = r.StatusCode }) c.OnError(func(r *colly.Response, err error) { log.Println("error:", r.StatusCode, err) p.StatusCode = r.StatusCode }) c.Visit(URL) // dump results b, err := json.Marshal(p) if err != nil { log.Println("failed to serialize response:", err) return } w.Header().Add("Content-Type", "application/json") w.Write(b) } func main() { // example usage: curl -s 'http://127.0.0.1:7171/?url=http://go-colly.org/' addr := ":7171" http.HandleFunc("/", handler) log.Println("listening on", addr) log.Fatal(http.ListenAndServe(addr, nil)) }
这里并没有提供调度器的代码,不过实现不算复杂。任务完成后,服务会将相应的链接返回给调度器,调度器负责将新的任务发送给工作节点继续执行。
如果需要根据节点负载情况决定任务执行节点,还需要服务提供监控 API 获取节点性能数据帮助调度器决策。
我们已经通过将任务分配到不同节点执行实现了分布式。但部分数据,比如 cookies、访问的 url 记录等,在节点之间需要共享。默认情况下,这些数据是保存内存中的,只能是每个 collector 独享一份数据。
我们可以通过将数据保存至 redis、mongo 等存储中,实现节点间的数据共享。colly 支持在任何存储间切换,只要相应存储实现 colly/storage.Storage 接口中的方法。
其实,colly 已经内置了部分 storage 的实现,查看 storage。下一节也会谈到这个话题。