接口模块需要用 API 来提供对外服务的接口,当然也可以直接连数据库来取,但是这样就需要知道数据库的连接信息,不太安全,而且需要配置连接,所以一个比较安全和方便的方式就是提供一个 Web API 接口,通过访问接口即可拿到可用代理
代码地址:https://github.com/Stevengz/Proxy_pool
另外三篇:
Python搭建代理IP池(一)- 获取 IP
Python搭建代理IP池(二)- 存储 IP
Python搭建代理IP池(三)- 检测 IP
添加设置
添加模块开关变量
setting.py
# 数据库地址 HOST = '127.0.0.1' # MySql端口 MYSQL_PORT = 3306 # MySQl用户名、密码 MYSQL_USERNAME = '***' MYSQL_PASSWORD = '***' # 数据库名 SQL_NAME = 'test' # 代理等级 MAX_SCORE = 30 MIN_SCORE = 0 INITIAL_SCORE = 10 VALID_STATUS_CODES = [200, 302] # 代理池数量界限 POOL_UPPER_THRESHOLD = 1000 # 检查周期 TESTER_CYCLE = 20 # 获取周期 GETTER_CYCLE = 300 # 测试API,建议抓哪个网站测哪个 TEST_URL = 'http://www.baidu.com' # API配置 API_HOST = '0.0.0.0' API_PORT = 5555 # 开关 TESTER_ENABLED = True GETTER_ENABLED = True API_ENABLED = True # 最大批测试量 BATCH_TEST_SIZE = 10
TESTER_ENABLED、GETTER_ENABLED、API_ENABLED 都是布尔类型,True 或者 False。标明了测试模块、获取模块、接口模块的开关,如果为 True,则代表模块开启
定义接口
使用框架:Flask
api.py
from flask import Flask, g from db import MySqlClient __all__ = ['app'] app = Flask(__name__) def get_conn(): if not hasattr(g, 'mysql'): g.mysql = MySqlClient() return g.mysql @app.route('/') def index(): return '<h2>Welcome to Proxy Pool System</h2>' # 随机代理 @app.route('/random') def get_proxy(): conn = get_conn() return conn.random() # 代理池总量 @app.route('/count') def get_counts(): conn = get_conn() return str(conn.count()) if __name__ == '__main__': app.run()
声明了一个 Flask 对象,定义了三个接口,分别是首页、随机代理页、获取数量页。
只需要访问对应的接口即可获取到可用代理:
调度模块
调用定义的获取、存储、检测三个模块,将这三个模块通过多进程的形式运行起来
scheduler.py
import time from multiprocessing import Process from api import app from getter import Getter from tester import Tester from db import MySqlClient from setting import * class Scheduler(): # 定时测试代理 def schedule_tester(self, cycle=TESTER_CYCLE): tester = Tester() while True: print('测试器开始运行') tester.run() time.sleep(cycle) # 定时获取代理 def schedule_getter(self, cycle=GETTER_CYCLE): getter = Getter() while True: print('开始抓取代理') getter.run() time.sleep(cycle) # 开启API def schedule_api(self): app.run(API_HOST, API_PORT) def run(self): print('代理池开始运行') if TESTER_ENABLED: tester_process = Process(target=self.schedule_tester) tester_process.start() if GETTER_ENABLED: getter_process = Process(target=self.schedule_getter) getter_process.start() if API_ENABLED: api_process = Process(target=self.schedule_api) api_process.start() if __name__ == "__main__": scheduler = Scheduler() scheduler.run()
启动入口是 run() 方法,分别判断了三个模块的开关,如果开启的话,就新建一个 Process 进程,设置好启动目标,然后调用 start() 方法运行,这样三个进程就可以并行执行,互不干扰
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。