百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

干货分享,程序员自建代理ip池,轻松爬取数据不封ip没有反爬虫。

cac55 2025-01-05 16:01 31 浏览 0 评论

代理池主要分为4个模块:存储模块、获取模块、检测模块、接口模块无私分享全套Python爬虫干货,如果你也想学习Python,@ 私信小编获取

存储模块

这里我们使用Redis的有序集合,集合的每一个元素都是不重复的。另外,有序集合的每一个元素都有一个分数字段。

具体代码实现如下(ippool_save.py)

MAX_SCORE = 100 #最高分
MIN_SCORE = 0 #最低分
INITIAL_SCORE = 10  #初始分数
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = None
REDIS_KEY = 'proxies' #键名

import redis
from random import choice

class PoolEmptyError():
    def __str__(self):
        return PoolEmptyError

class RedisClient(object):
    def __init__(self,host=REDIS_HOST,port=REDIS_PORT,password=REDIS_PASSWORD):
        '''
        初始化
        :param host:地址
        :param port: 端口号
        :param password: 密码
        '''
        self.db = redis.StrictRedis(host=host,port=port,password=password,decode_responses=True)

    def add(self,proxy,score=INITIAL_SCORE):
        '''
        添加代理,设置初始分数
        :param proxy: 代理
        :param score: 分数
        :return: 添加结果
        '''
        if not self.db.zscore(REDIS_KEY,proxy):
            return self.db.zadd(REDIS_KEY,{proxy:score})

    def random(self):
        '''
        随即获取有效代理,首先尝试获取最高分数代理,如果最高分数不存在,则按照排名获取
        :return:
        '''
        result = self.db.zrangebyscore(REDIS_KEY,MAX_SCORE,MAX_SCORE)
        if len(result):
            return choice(result)
        else:
            result = self.db.zrevrange(REDIS_KEY,0,100)
            if len(result):
                return choice(result)
            else:
                raise PoolEmptyError

    def decrease(self, proxy):
        '''
        代理值减一分,分数小于最小值,则代理删除
        :param proxy: 代理
        :return: 修改后的代理分数
        '''
        score = self.db.zscore(REDIS_KEY,proxy)
        if score and score>MIN_SCORE:
            print("代理",proxy,"当前分数",score,"减1")
            return self.db.zincrby(REDIS_KEY,-1,proxy)
        else:
            print("代理",proxy,"当前分数",score,"移除")
            return self.db.zrem(REDIS_KEY,proxy)

    def exists(self,proxy):
        '''
        判断是否存在
        :param proxy: 代理
        :return: 是否存在
        '''
        return not self.db.zscore(REDIS_KEY,proxy) == None

    def max(self,proxy):
        '''
        将代理设置为MAX_SCORE
        :param proxy: 代理
        :return: 设置结果
        '''
        print("代理",proxy,"可用,设置为",MAX_SCORE)
        return self.db.zadd(REDIS_KEY,{proxy:MAX_SCORE})

    def count(self):
        '''
        获取数量
        :return:数量
        '''
        return self.db.zcard(REDIS_KEY)

    def all(self):
        '''
        获取全部代理
        :return: 全部代理列表
        '''
        return self.db.zrangebyscore(REDIS_KEY,MIN_SCORE,MAX_SCORE)


获取模块

获取模块的逻辑相对简单,首先要定义一个ippool_crawler.py来从各大网站抓取,具体代码如下:

import json
import requests
from lxml import etree
from ippool_save import RedisClient

class ProxyMetaclass(type):
    #参数依次是当前准备创建的类的对象;类的名字;类继承的父类集合;类的方法集合。
    def __new__(cls, name,bases,attrs):
        count = 0
        attrs['__CrawlFunc__'] = []
        for k,v in attrs.items():
            if 'crawl_' in k:
                attrs['__CrawlFunc__'].append(k)
                count+=1
        attrs['__CrawlFuncCount__'] = count
        return type.__new__(cls,name,bases,attrs)

class Crawler(object,metaclass=ProxyMetaclass):
    def __init__(self):
        self.proxy = RedisClient().random()
        self.proxies = {
            'http': 'http://' + self.proxy,
            'https': 'https://' + self.proxy
        }
    def get_proxies(self,callback):
        proxies = []
        for proxy in eval("self.{}()".format(callback)):
            print('成功获取代理',proxy)
            proxies.append(proxy)
        return proxies
        
	

我们还需要定义一个Getter类,用来动态地调用所有以crawl开头的方法,然后获取抓取到的代理,将其加入到数据库存储起来,具体代码如下(ippool_getter.py)

from ippool_save import RedisClient
from ippool_crawler import Crawler

POOL_UPPER_THRESHOLD = 1000

class Getter():
    def __init__(self):
        self.redis = RedisClient()
        self.crawler = Crawler()
    def is_over_threshold(self):
        if self.redis.count() >= POOL_UPPER_THRESHOLD:
            return True
        else:
            return False

    def run(self):
        print("获取器开始执行")
        if not self.is_over_threshold():
            for callback_label in range(self.crawler.__CrawlFuncCount__):
                callback = self.crawler.__CrawlFunc__[callback_label]
                proxies = self.crawler.get_proxies(callback)
                for proxy in proxies:
                    self.redis.add(proxy)

检测模块

我们已经将各个网站的代理都抓取下来了现在就需要一个检测模块来对所有代理进行多轮检测。

VALID_STATUS_CODES = [200]
TEST_URL = "http://www.baidu.com"
BATCH_TEST_SIZE = 100

from ippool_save import RedisClient
import aiohttp
import asyncio
import time

class Tester(object):
    def __init__(self):
        self.redis = RedisClient()

    async def test_single_proxy(self,proxy):
        conn = aiohttp.TCPConnector(verify_ssl=False)
        async with aiohttp.ClientSession(connector=conn) as session:
            try:
                if isinstance(proxy,bytes):
                    proxy = proxy.decode('utf-8')
                real_proxy = 'http://'+ proxy
                print("正在测试",proxy)
                async with session.get(TEST_URL,proxy=real_proxy,timeout=15) as response:
                    if response.status in VALID_STATUS_CODES:
                        self.redis.max(proxy)
                        print('代理可用',proxy)
                    else:
                        self.redis.decrease(proxy)
                        print('请求响应码不合法',proxy)
            except (TimeoutError,ArithmeticError):
                self.redis.decrease(proxy)
                print('代理请求失败',proxy)

    def run(self):
        print('测试开始运行')
        try:
            proxies = self.redis.all()
            loop = asyncio.get_event_loop()
            for i in range(0,len(proxies),BATCH_TEST_SIZE):
                test_proxies = proxies[i:i+BATCH_TEST_SIZE]
                tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
                loop.run_until_complete(asyncio.wait(tasks))
                time.sleep(5)
        except Exception as e:
            print('测试器发生错误', e.args)

接口模块

为了更方便地获取可用代理,我们增加了一个接口模块。

使用Flask来实现这个接口模块,实现代码如下(ippool_api.py)

from flask import Flask,g
from ippool_save import RedisClient

__all__ = ['app']
app = Flask(__name__)

def get_conn():
    if not hasattr(g,'redis'):
        g.redis = RedisClient()
    return  g.redis

@app.route('/')
def index():
    return '<h2>Welcome to Proxy Pool System</h2>'

@app.route('/random')
def get_proxy():
    conn = get_conn()
    return conn.random()

@app.route('/count')
def get_counts():
    conn = get_conn()
    return  str(conn.count())

if __name__ == '__main__':
    app.run()

调度模块

调度模块就是调用以上定义的3个模块,将这3个模块通过多进程的形式运行起来。

最后,只需要调用Scheduler的run()方法即可启动整个代码池。

TESTER_CYCLE = 20
GETTER_CYCLE = 20
TESTER_ENABLED = True
GETTER_ENABLED = True
API_ENABLED = True

from multiprocessing import Process
from ippool_api import app
from ippool_getter import Getter
from ippool_check import Tester
import time

class Scheduler():
    def schedule_tester(self,cycle=TESTER_CYCLE):
     tester = Tester()
     while True:
         print('测试器开始运行')
         tester.run()
         time.sleep(cycle)

    def schedule_getter(self,cycle=GETTER_CYCLE):
        getter = Getter()
        while True:
            print('开始抓取代理')
            getter.run()
            time.sleep(cycle)

    def schedule_api(self):
        app.run()

    def run(self):
        print('代理池开始运行')
        if TESTER_ENABLED:
            tester_process = Process(target=self.schedule_tester)
            tester_process.start()

        if GETTER_ENABLED:
            getter_process = Process(target=self.schedule_getter)
            getter_process.start()

        if API_ENABLED:
            api_process = Process(target=self.schedule_api)
            api_process.start()

if __name__ == '__main__':
    Scheduler().run()



为了帮助大家更轻松的学好Python,我给大家分享一套Python学习资料,希望对正在学习的你有所帮助!

获取方式:关注并私信小编 “ 学习 ”,即可免费获取!


相关推荐

iphone6自定义铃声设置流程(iphone6怎么自定义来电铃声)

苹果iphone6自定义铃声如何设置,iphone6自定义铃声怎么设置,iphone6自定义铃声设置教程,下面小编给大家分享一下。设置自定义铃声首先要自己制作或者找到一个铃声音频。1、在电脑上面把已经...

iphone手机三分钟更换铃声,无需电脑直接手机操作

iPhone怎么在手机上换铃声?无需电脑!1分钟教你给苹果手机换铃声众所周知,苹果手机的ios系统是比较封闭的,封闭系统就会给我们带来一些不便,这里要说的就是苹果手机更换个性化铃声就比较麻烦,因为io...

iPhone手机个性铃声设置详细教程(iphone个性铃声怎么设置)

iPhone现在已成街机了。朋友聚在一起的时候,是不是总有种以为是自己手机响了的赶脚。那么,小编今天跟大家分享一下iPhone怎么换铃声?,让你轻松设置属于你的个性铃声。1:电脑端安装iTools,安...

iPhone也能用自己喜欢的铃声了,2分钟包搞定!

听到超好听的铃声,怎样才能放进iPhone里?这貌似是一道千古难题。90%的iPhone小白:听到这么爽脆带感的iPhone铃声,我要用!我要用!我要用!视频然而打开iTunes发现,脑子一片空白……...

苹果iOS 26隐藏新铃声曝光:强调“玻璃质感”

IT之家6月20日消息,苹果在iOS26中隐藏了一个新的铃声,这是现有默认铃声“Reflection”的改版。“Reflection”自2017年iPhoneX问世以来一直是系...

苹果ios14充电提示音怎么设置 iPhone手机修改充电提示音教程

ios14充电提示音最近很火爆,大家都想要知道苹果设置充电提示音的方法,个性化的设置非常吸引大家,小编也会在这里教大家ios14苹果充电提示音的设置,操作的流程会直接分享在下方,各位玩家们都能一起来看...

苹果iphone手机直接设置铃声教程(ios怎么直接设置铃声)

iPhone苹果16自定义来电铃声攻略(ios自定义铃声怎么设置)

在智能手机的个性化设置中,更改来电铃声是展现自我风格的一种方式。对于使用iPhone苹果16的用户来说,设置一个独特的来电铃声,不仅能提升接听体验,还能让日常生活更加多彩。以下是如何为iPhone苹果...

未越狱iPhone用户自定义来电铃声和短信铃声的教程

其实自定义来电铃声和短信铃声的过程没有什么区别,但要注意,来电铃声的播放时间不能超过40秒,短信铃声的播放时间不能超过30秒,这也就是说长度为30-40秒的仅在iPhone的来电铃声中出现,少于30秒...

苹果手机怎么设置闹钟铃声?更改为歌曲铃声,亲测有效

很不是有很多小伙伴每天早上都被苹果手机刺耳的“雷达”闹钟铃声给吵醒呢?想要更换一个舒缓的闹钟铃声,却发现自己鼓捣半天却无法更换喜欢的歌曲闹钟铃声。苹果手机怎么设置闹钟铃声?下面小编就来分享如何将苹果手...

独家教程:iPhone手机铃声制作与更换,一般人我不告诉他

今天刚好自己作铃声。。想到吧里好多人都会问怎么制作铃声。于是顺便截图发吧里。虽然百度一下铃声制作已经泛滥。但是还是会一直有小白问。所以这里会详细图文说明。先介绍小白式铃声制作。http://www....

苹果用户iTunes自制自定义铃声教程

怎么制作iphone6铃声,如何使用iTunes剪切音乐自制铃声,新版iTunes怎么自制铃声,相信很多苹果用户都想使用一些个性化铃声。那么就学习一下这个方法吧。1、在电脑上面打开iTunes,选择编...

轻松学会!苹果手机怎么设置铃声来电铃声(3个方法)

在日常生活中,手机的铃声扮演着非常重要的角色,不仅是接收来电的提示,更是展示个人品位和风格的方式之一。而对于苹果手机用户来说,定制来电铃声可以让手机更具个性化,并且让您在繁忙的环境中更容易识别重要的来...

iPhone13怎么设置来电铃声?苹果13自定义铃声操作教程

我们知道,苹果iOS系统是相对封闭的,在很多功能设置上与安卓机大有不同。就如来电铃声,很多苹果用户就不喜欢使用默认的那些来电铃声,想要自定义铃声,却不知如何操作。最近,新购入iPhone13系列机型的...

iPhone 免费铃声:iOS 26 终于让设置变得更简单

苹果终于开窍了——省去了最烦人的繁琐步骤。iOS26终结了多年来困扰iPhone用户的麻烦操作:现在无需通过GarageBand应用,仅需轻点几下就能设置自定义铃声。完全免费、直接操作且极...

取消回复欢迎 发表评论: