The Complete Guide to Web Scraping with Proxies (2025)
The Complete Guide to Web Scraping with Proxies (2025)
Web scraping has become an essential skill for developers, data scientists, and businesses looking to extract valuable information from the web. However, as websites implement increasingly sophisticated anti-bot measures, using proxies has shifted from optional to absolutely critical for successful scraping operations.
In this comprehensive guide, we'll explore everything you need to know about web scraping with proxies - from basic concepts to advanced techniques that will help you build robust, scalable scraping systems.
🎯 What is Web Scraping and Why Use Proxies?
Web scraping is the process of automatically extracting data from websites using code. While the concept is straightforward, the execution becomes complex when dealing with modern web applications that actively prevent automated access.
Common Web Scraping Challenges
Rate Limiting: Most websites implement rate limits to prevent server overload. Exceeding these limits results in temporary or permanent IP bans.
IP Blocking: Websites track IP addresses and block those exhibiting suspicious behavior patterns.
Geo-Restrictions: Many sites serve different content based on geographic location or block access from certain regions entirely.
Anti-Bot Detection: Modern websites use sophisticated fingerprinting techniques to identify and block automated traffic.
Session Management: Maintaining consistent sessions across multiple requests while avoiding detection.
Why Proxies Are Essential
Proxies act as intermediaries between your scraping application and target websites, providing several critical benefits:
- IP Rotation: Distribute requests across multiple IP addresses to avoid rate limits
- Geographic Diversity: Access geo-restricted content from different locations
- Anonymity: Hide your real IP address and location
- Scalability: Handle high-volume scraping operations efficiently
- Reliability: Maintain consistent access even if some IPs get blocked
🔧 Types of Proxies for Web Scraping
Understanding different proxy types is crucial for choosing the right solution for your scraping needs.
Datacenter Proxies
What they are: IP addresses hosted in data centers, not associated with internet service providers.
Advantages:
- High speed and reliability (up to 1 Gbit/s)
- Cost-effective for large-scale operations
- Excellent uptime (99.99%+)
- Large IP pools available
Best for: High-volume scraping, API interactions, general web scraping where residential IPs aren't required.
# Example datacenter proxy configuration
datacenter_proxy = {
'http': 'http://username:[email protected]:12933',
'https': 'https://username:[email protected]:12933'
}
Residential Proxies
What they are: IP addresses assigned to real residential devices by ISPs.
Advantages:
- Appear as regular users to websites
- Lower detection rates
- Better for social media and e-commerce scraping
Disadvantages:
- More expensive than datacenter proxies
- Generally slower speeds
- Less predictable availability
Mobile Proxies
What they are: IP addresses from mobile carrier networks.
Advantages:
- Highest success rates for mobile-first websites
- Excellent for social media scraping
- Very low detection rates
Disadvantages:
- Most expensive option
- Limited availability
- Slower speeds
🌐 HTTP vs SOCKS5 Protocols
HTTP Proxies
HTTP proxies work at the application layer and are designed specifically for web traffic.
Advantages:
- Optimized for web scraping
- Support for HTTP headers manipulation
- Better performance for web requests
Example Implementation:
import requests
http_proxy = {
'http': 'http://username:[email protected]:12933',
'https': 'http://username:[email protected]:12933'
}
response = requests.get('https://example.com', proxies=http_proxy)
SOCKS5 Proxies
SOCKS5 proxies work at the transport layer and can handle any type of traffic.
Advantages:
- Protocol agnostic (works with any application)
- Better for complex scraping scenarios
- Support for UDP traffic
Example Implementation:
import requests
import socks
import socket
# Configure SOCKS5 proxy
socks.set_default_proxy(socks.SOCKS5, "proxy.pinguproxy.com", 12533, username="user", password="pass")
socket.socket = socks.socksocket
response = requests.get('https://example.com')
🔄 Proxy Rotation Strategies
Effective proxy rotation is crucial for avoiding detection and maintaining consistent scraping performance.
Time-Based Rotation
Rotate proxies based on time intervals:
import time
import random
from itertools import cycle
class TimeBasedRotator:
def __init__(self, proxies, rotation_interval=60):
self.proxies = cycle(proxies)
self.current_proxy = next(self.proxies)
self.rotation_interval = rotation_interval
self.last_rotation = time.time()
def get_proxy(self):
if time.time() - self.last_rotation > self.rotation_interval:
self.current_proxy = next(self.proxies)
self.last_rotation = time.time()
return self.current_proxy
# Usage
proxies = [
{'http': 'http://proxy.pinguproxy.com:12933'},
{'http': 'http://proxy.pinguproxy.com:12933'},
{'http': 'http://proxy.pinguproxy.com:12933'}
]
rotator = TimeBasedRotator(proxies, rotation_interval=30)
Request-Based Rotation
Rotate proxies after a specific number of requests:
class RequestBasedRotator:
def __init__(self, proxies, requests_per_proxy=10):
self.proxies = cycle(proxies)
self.current_proxy = next(self.proxies)
self.requests_per_proxy = requests_per_proxy
self.request_count = 0
def get_proxy(self):
if self.request_count >= self.requests_per_proxy:
self.current_proxy = next(self.proxies)
self.request_count = 0
self.request_count += 1
return self.current_proxy
Intelligent Rotation
Rotate based on response status and performance:
class IntelligentRotator:
def __init__(self, proxies):
self.proxies = proxies
self.proxy_stats = {proxy: {'success': 0, 'failure': 0, 'avg_response_time': 0}
for proxy in proxies}
self.current_proxy = self.get_best_proxy()
def get_best_proxy(self):
# Select proxy with highest success rate and lowest response time
best_proxy = min(self.proxies,
key=lambda p: (self.proxy_stats[p]['failure'] /
max(self.proxy_stats[p]['success'] + self.proxy_stats[p]['failure'], 1),
self.proxy_stats[p]['avg_response_time']))
return best_proxy
def update_stats(self, proxy, success, response_time):
stats = self.proxy_stats[proxy]
if success:
stats['success'] += 1
else:
stats['failure'] += 1
# Update average response time
total_requests = stats['success'] + stats['failure']
stats['avg_response_time'] = ((stats['avg_response_time'] * (total_requests - 1)) + response_time) / total_requests
🐍 Python Implementation Examples
Basic Proxy Rotation with Requests
import requests
import random
import time
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class ProxyRotator:
def __init__(self, proxies):
self.proxies = proxies
self.session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def get_random_proxy(self):
return random.choice(self.proxies)
def scrape_url(self, url, headers=None):
proxy = self.get_random_proxy()
try:
response = self.session.get(
url,
proxies=proxy,
headers=headers or self.get_random_headers(),
timeout=10
)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
print(f"Error with proxy {proxy}: {e}")
return None
def get_random_headers(self):
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
]
return {
'User-Agent': random.choice(user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
# Usage example
proxies = [
{'http': 'http://user:[email protected]:12933', 'https': 'https://user:[email protected]:12933'},
{'http': 'http://user:[email protected]:12933', 'https': 'https://user:[email protected]:12933'},
]
scraper = ProxyRotator(proxies)
response = scraper.scrape_url('https://example.com')
Advanced Scrapy Integration
# middlewares.py
import random
import logging
from scrapy.downloadermiddlewares.retry import RetryMiddleware
from scrapy.utils.response import response_status_message
class RotatingProxyMiddleware:
def __init__(self, proxy_list):
self.proxy_list = proxy_list
@classmethod
def from_crawler(cls, crawler):
proxy_list = crawler.settings.getlist("ROTATING_PROXY_LIST")
return cls(proxy_list)
def process_request(self, request, spider):
proxy = random.choice(self.proxy_list)
request.meta['proxy'] = proxy
def process_response(self, request, response, spider):
if response.status in [403, 429, 503]:
# Retry with different proxy
return self._retry(request, "Blocked by proxy", spider)
return response
def _retry(self, request, reason, spider):
retries = request.meta.get('retry_times', 0) + 1
if retries <= 3:
retry_req = request.copy()
retry_req.meta['retry_times'] = retries
retry_req.dont_filter = True
return retry_req
else:
spider.logger.error(f"Gave up retrying {request.url} after {retries} attempts")
# settings.py
ROTATING_PROXY_LIST = [
'http://user:[email protected]:12933',
'http://user:[email protected]:12933',
'http://user:[email protected]:12933',
]
DOWNLOADER_MIDDLEWARES = {
'myproject.middlewares.RotatingProxyMiddleware': 350,
}
⚡ IPv4 vs IPv6 Considerations
IPv6 Advantages for Web Scraping
IPv6 offers significant advantages for large-scale scraping operations:
Massive Address Space: IPv6 provides 4.3 billion times more addresses than IPv4, allowing for extensive IP rotation.
Lower Costs: IPv6 addresses are typically more cost-effective due to abundant availability.
Better Performance: Modern infrastructure often provides better IPv6 performance.
Implementation Considerations
import socket
import requests
def test_ipv6_support(url):
"""Test if a website supports IPv6"""
try:
# Force IPv6 connection
requests.get(url, timeout=5)
return True
except requests.exceptions.RequestException:
return False
def configure_ipv6_proxy():
"""Configure IPv6 proxy settings"""
ipv6_proxy = {
'http': 'http://user:[email protected]:12933',
'https': 'https://user:[email protected]:12933'
}
return ipv6_proxy
# Mixed IPv4/IPv6 proxy pool
mixed_proxies = [
{'http': 'http://user:[email protected]:12933'}, # IPv4
{'http': 'http://user:[email protected]:12933'}, # IPv4
{'http': 'http://user:[email protected]:12933'}, # IPv4
]
🛡️ Bypassing Anti-Bot Detection
Common Detection Methods
IP-based Detection: Monitoring request patterns from specific IP addresses.
Behavioral Analysis: Analyzing request timing, patterns, and sequences.
Browser Fingerprinting: Checking for browser-specific headers and capabilities.
JavaScript Challenges: Requiring JavaScript execution to access content.
Evasion Techniques
import time
import random
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
class StealthScraper:
def __init__(self, proxies):
self.proxies = proxies
self.session = requests.Session()
def add_stealth_headers(self):
"""Add realistic browser headers"""
headers = {
'User-Agent': self.get_random_user_agent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
}
return headers
def human_like_delay(self, min_delay=1, max_delay=3):
"""Add human-like delays between requests"""
delay = random.uniform(min_delay, max_delay)
time.sleep(delay)
def scrape_with_selenium(self, url, proxy):
"""Use Selenium for JavaScript-heavy sites"""
chrome_options = Options()
chrome_options.add_argument(f'--proxy-server={proxy}')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
driver = webdriver.Chrome(options=chrome_options)
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
try:
driver.get(url)
time.sleep(random.uniform(2, 5))
return driver.page_source
finally:
driver.quit()
🔧 Advanced Proxy Pool Management
Dynamic Proxy Health Monitoring
import asyncio
import aiohttp
import time
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class ProxyHealth:
proxy: str
success_rate: float
avg_response_time: float
last_check: float
consecutive_failures: int
is_active: bool = True
class ProxyPoolManager:
def __init__(self, proxies: List[str], health_check_interval: int = 300):
self.proxies = {proxy: ProxyHealth(
proxy=proxy,
success_rate=1.0,
avg_response_time=0.0,
last_check=time.time(),
consecutive_failures=0
) for proxy in proxies}
self.health_check_interval = health_check_interval
self.test_url = "http://httpbin.org/ip"
async def check_proxy_health(self, session: aiohttp.ClientSession, proxy: str) -> bool:
"""Check if a proxy is working"""
try:
proxy_url = f"http://{proxy}"
async with session.get(
self.test_url,
proxy=proxy_url,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
return True
except Exception:
pass
return False
async def update_proxy_health(self):
"""Update health status for all proxies"""
async with aiohttp.ClientSession() as session:
tasks = []
for proxy in self.proxies.keys():
task = self.check_proxy_health(session, proxy)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for proxy, is_healthy in zip(self.proxies.keys(), results):
health = self.proxies[proxy]
if is_healthy:
health.consecutive_failures = 0
health.is_active = True
else:
health.consecutive_failures += 1
if health.consecutive_failures >= 3:
health.is_active = False
health.last_check = time.time()
def get_healthy_proxies(self) -> List[str]:
"""Get list of currently healthy proxies"""
return [proxy for proxy, health in self.proxies.items() if health.is_active]
def get_best_proxy(self) -> Optional[str]:
"""Get the best performing proxy"""
healthy_proxies = [(proxy, health) for proxy, health in self.proxies.items()
if health.is_active]
if not healthy_proxies:
return None
# Sort by success rate and response time
best_proxy = min(healthy_proxies,
key=lambda x: (1 - x[1].success_rate, x[1].avg_response_time))
return best_proxy[0]
# Usage
proxy_list = [
"user:[email protected]:12933",
"user:[email protected]:12933",
"user:[email protected]:12933"
]
pool_manager = ProxyPoolManager(proxy_list)
Concurrent Scraping with Proxy Pools
import asyncio
import aiohttp
from typing import List, Dict, Any
class ConcurrentScraper:
def __init__(self, proxies: List[str], max_concurrent: int = 10):
self.proxies = proxies
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = []
async def scrape_url(self, session: aiohttp.ClientSession, url: str, proxy: str) -> Dict[str, Any]:
"""Scrape a single URL with proxy"""
async with self.semaphore:
try:
proxy_url = f"http://{proxy}"
start_time = time.time()
async with session.get(
url,
proxy=proxy_url,
timeout=aiohttp.ClientTimeout(total=15)
) as response:
content = await response.text()
response_time = time.time() - start_time
return {
'url': url,
'proxy': proxy,
'status': response.status,
'content': content,
'response_time': response_time,
'success': True
}
except Exception as e:
return {
'url': url,
'proxy': proxy,
'error': str(e),
'success': False
}
async def scrape_urls(self, urls: List[str]) -> List[Dict[str, Any]]:
"""Scrape multiple URLs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = []
for i, url in enumerate(urls):
proxy = self.proxies[i % len(self.proxies)]
task = self.scrape_url(session, url, proxy)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
# Usage
urls_to_scrape = [
"https://example1.com",
"https://example2.com",
"https://example3.com"
]
scraper = ConcurrentScraper(proxy_list, max_concurrent=5)
results = asyncio.run(scraper.scrape_urls(urls_to_scrape))
📊 Performance Optimization Strategies
Bandwidth Optimization
class BandwidthOptimizer:
def __init__(self, max_bandwidth_mbps: float = 100):
self.max_bandwidth_mbps = max_bandwidth_mbps
self.request_queue = asyncio.Queue()
self.bandwidth_tracker = {}
def calculate_optimal_concurrency(self, avg_response_size_kb: float) -> int:
"""Calculate optimal concurrent requests based on bandwidth"""
# Convert to bits per second
max_bandwidth_bps = self.max_bandwidth_mbps * 1_000_000
avg_response_size_bits = avg_response_size_kb * 8 * 1024
# Estimate optimal concurrency
optimal_concurrency = max_bandwidth_bps / (avg_response_size_bits * 2) # Factor of 2 for safety
return max(1, int(optimal_concurrency))
async def rate_limited_request(self, session: aiohttp.ClientSession, url: str, proxy: str):
"""Make rate-limited request"""
# Implement token bucket algorithm
await self.request_queue.put(None)
try:
result = await self.scrape_url(session, url, proxy)
return result
finally:
await asyncio.sleep(0.1) # Minimum delay between requests
self.request_queue.task_done()
Memory-Efficient Data Processing
import json
from typing import Generator, Any
class MemoryEfficientProcessor:
def __init__(self, batch_size: int = 1000):
self.batch_size = batch_size
def process_large_dataset(self, data_generator: Generator[Any, None, None]):
"""Process large datasets in batches"""
batch = []
for item in data_generator:
batch.append(item)
if len(batch) >= self.batch_size:
yield self.process_batch(batch)
batch = []
# Process remaining items
if batch:
yield self.process_batch(batch)
def process_batch(self, batch: List[Any]) -> Dict[str, Any]:
"""Process a batch of scraped data"""
processed_data = {
'total_items': len(batch),
'processed_at': time.time(),
'data': []
}
for item in batch:
# Process individual item
processed_item = self.clean_and_validate(item)
if processed_item:
processed_data['data'].append(processed_item)
return processed_data
def clean_and_validate(self, item: Any) -> Optional[Dict[str, Any]]:
"""Clean and validate scraped data"""
# Implement your data cleaning logic here
if not item or not isinstance(item, dict):
return None
# Example cleaning
cleaned_item = {
'title': item.get('title', '').strip(),
'price': self.parse_price(item.get('price', '')),
'description': item.get('description', '').strip()[:500] # Limit description length
}
# Validate required fields
if not cleaned_item['title']:
return None
return cleaned_item
def parse_price(self, price_str: str) -> Optional[float]:
"""Parse price string to float"""
import re
# Remove currency symbols and extract numbers
price_match = re.search(r'[\d,]+\.?\d*', price_str.replace(',', ''))
if price_match:
try:
return float(price_match.group())
except ValueError:
pass
return None
🚨 Error Handling and Retry Logic
Robust Error Handling
import logging
from enum import Enum
from typing import Optional, Callable, Any
class ErrorType(Enum):
NETWORK_ERROR = "network_error"
PROXY_ERROR = "proxy_error"
RATE_LIMIT = "rate_limit"
BLOCKED = "blocked"
TIMEOUT = "timeout"
UNKNOWN = "unknown"
class RetryStrategy:
def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.logger = logging.getLogger(__name__)
def classify_error(self, exception: Exception, response_status: Optional[int] = None) -> ErrorType:
"""Classify error type for appropriate handling"""
if response_status:
if response_status == 429:
return ErrorType.RATE_LIMIT
elif response_status in [403, 406]:
return ErrorType.BLOCKED
elif response_status >= 500:
return ErrorType.NETWORK_ERROR
if isinstance(exception, (aiohttp.ClientProxyConnectionError, aiohttp.ClientConnectorError)):
return ErrorType.PROXY_ERROR
elif isinstance(exception, asyncio.TimeoutError):
return ErrorType.TIMEOUT
return ErrorType.UNKNOWN
async def retry_with_backoff(self,
func: Callable,
*args,
error_handler: Optional[Callable] = None,
**kwargs) -> Any:
"""Retry function with exponential backoff"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
error_type = self.classify_error(e)
if attempt == self.max_retries:
self.logger.error(f"Max retries exceeded for {func.__name__}: {e}")
if error_handler:
return await error_handler(e, error_type)
raise
# Calculate delay based on error type
delay = self.calculate_delay(error_type, attempt)
self.logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s")
await asyncio.sleep(delay)
raise last_exception
def calculate_delay(self, error_type: ErrorType, attempt: int) -> float:
"""Calculate delay based on error type and attempt number"""
base_delay = self.backoff_factor ** attempt
if error_type == ErrorType.RATE_LIMIT:
return base_delay * 2 # Longer delay for rate limits
elif error_type == ErrorType.BLOCKED:
return base_delay * 3 # Even longer for blocks
return base_delay
⚖️ Legal and Ethical Considerations
Respecting robots.txt
import urllib.robotparser
from urllib.parse import urljoin, urlparse
class RobotsTxtChecker:
def __init__(self):
self.robot_parsers = {}
def can_fetch(self, url: str, user_agent: str = "*") -> bool:
"""Check if URL can be fetched according to robots.txt"""
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
if base_url not in self.robot_parsers:
self.robot_parsers[base_url] = self.load_robots_txt(base_url)
rp = self.robot_parsers[base_url]
if rp:
return rp.can_fetch(user_agent, url)
return True # If robots.txt can't be loaded, assume allowed
def load_robots_txt(self, base_url: str) -> Optional[urllib.robotparser.RobotFileParser]:
"""Load and parse robots.txt"""
try:
robots_url = urljoin(base_url, '/robots.txt')
rp = urllib.robotparser.RobotFileParser()
rp.set_url(robots_url)
rp.read()
return rp
except Exception as e:
logging.warning(f"Could not load robots.txt for {base_url}: {e}")
return None
def get_crawl_delay(self, url: str, user_agent: str = "*") -> Optional[float]:
"""Get crawl delay from robots.txt"""
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
if base_url in self.robot_parsers:
rp = self.robot_parsers[base_url]
if rp:
return rp.crawl_delay(user_agent)
return None
Rate Limiting Best Practices
import time
from collections import defaultdict, deque
class RateLimiter:
def __init__(self, requests_per_second: float = 1.0):
self.requests_per_second = requests_per_second
self.min_interval = 1.0 / requests_per_second
self.last_request_time = defaultdict(float)
self.request_history = defaultdict(deque)
async def wait_if_needed(self, domain: str):
"""Wait if necessary to respect rate limits"""
current_time = time.time()
last_request = self.last_request_time[domain]
time_since_last = current_time - last_request
if time_since_last < self.min_interval:
wait_time = self.min_interval - time_since_last
await asyncio.sleep(wait_time)
self.last_request_time[domain] = time.time()
def is_rate_limited(self, domain: str, window_seconds: int = 60) -> bool:
"""Check if domain is currently rate limited"""
current_time = time.time()
history = self.request_history[domain]
# Remove old requests outside the window
while history and history[0] < current_time - window_seconds:
history.popleft()
# Check if we're at the limit
max_requests = int(self.requests_per_second * window_seconds)
return len(history) >= max_requests
def record_request(self, domain: str):
"""Record a request for rate limiting purposes"""
self.request_history[domain].append(time.time())
🔍 Monitoring and Debugging
Comprehensive Logging System
import logging
import json
from datetime import datetime
from typing import Dict, Any
class ScrapingLogger:
def __init__(self, log_file: str = "scraping.log"):
self.logger = logging.getLogger("scraping")
self.logger.setLevel(logging.INFO)
# File handler
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.WARNING)
# Formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def log_request(self, url: str, proxy: str, status_code: int, response_time: float):
"""Log individual request details"""
log_data = {
'timestamp': datetime.now().isoformat(),
'url': url,
'proxy': proxy,
'status_code': status_code,
'response_time': response_time,
'type': 'request'
}
self.logger.info(json.dumps(log_data))
def log_error(self, url: str, proxy: str, error: str, error_type: str):
"""Log error details"""
log_data = {
'timestamp': datetime.now().isoformat(),
'url': url,
'proxy': proxy,
'error': error,
'error_type': error_type,
'type': 'error'
}
self.logger.error(json.dumps(log_data))
def log_proxy_performance(self, proxy_stats: Dict[str, Any]):
"""Log proxy performance metrics"""
log_data = {
'timestamp': datetime.now().isoformat(),
'proxy_stats': proxy_stats,
'type': 'performance'
}
self.logger.info(json.dumps(log_data))
Performance Monitoring
import psutil
import time
from dataclasses import dataclass
from typing import List
@dataclass
class PerformanceMetrics:
timestamp: float
cpu_percent: float
memory_percent: float
network_io: Dict[str, int]
active_connections: int
requests_per_second: float
class PerformanceMonitor:
def __init__(self, monitoring_interval: int = 60):
self.monitoring_interval = monitoring_interval
self.metrics_history: List[PerformanceMetrics] = []
self.request_count = 0
self.last_request_count = 0
self.last_check_time = time.time()
def record_request(self):
"""Record a completed request"""
self.request_count += 1
def collect_metrics(self) -> PerformanceMetrics:
"""Collect current performance metrics"""
current_time = time.time()
# Calculate requests per second
time_diff = current_time - self.last_check_time
requests_diff = self.request_count - self.last_request_count
rps = requests_diff / time_diff if time_diff > 0 else 0
# Get system metrics
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
network_io = psutil.net_io_counters()._asdict()
active_connections = len(psutil.net_connections())
metrics = PerformanceMetrics(
timestamp=current_time,
cpu_percent=cpu_percent,
memory_percent=memory_percent,
network_io=network_io,
active_connections=active_connections,
requests_per_second=rps
)
self.metrics_history.append(metrics)
self.last_request_count = self.request_count
self.last_check_time = current_time
return metrics
def get_performance_summary(self) -> Dict[str, Any]:
"""Get performance summary"""
if not self.metrics_history:
return {}
recent_metrics = self.metrics_history[-10:] # Last 10 measurements
return {
'avg_cpu_percent': sum(m.cpu_percent for m in recent_metrics) / len(recent_metrics),
'avg_memory_percent': sum(m.memory_percent for m in recent_metrics) / len(recent_metrics),
'avg_rps': sum(m.requests_per_second for m in recent_metrics) / len(recent_metrics),
'total_requests': self.request_count,
'monitoring_duration': time.time() - self.metrics_history[0].timestamp
}
🎯 Real-World Use Cases and Examples
E-commerce Price Monitoring
class EcommerceScraper:
def __init__(self, proxies: List[str]):
self.proxies = proxies
self.rate_limiter = RateLimiter(requests_per_second=0.5) # Conservative rate
self.robots_checker = RobotsTxtChecker()
async def scrape_product_prices(self, product_urls: List[str]) -> List[Dict[str, Any]]:
"""Scrape product prices from e-commerce sites"""
results = []
async with aiohttp.ClientSession() as session:
for url in product_urls:
domain = urlparse(url).netloc
# Check robots.txt
if not self.robots_checker.can_fetch(url):
logging.warning(f"Robots.txt disallows scraping {url}")
continue
# Respect rate limits
await self.rate_limiter.wait_if_needed(domain)
# Get crawl delay from robots.txt
crawl_delay = self.robots_checker.get_crawl_delay(url)
if crawl_delay:
await asyncio.sleep(crawl_delay)
# Scrape with proxy rotation
proxy = random.choice(self.proxies)
result = await self.scrape_product_page(session, url, proxy)
if result:
results.append(result)
return results
async def scrape_product_page(self, session: aiohttp.ClientSession, url: str, proxy: str) -> Optional[Dict[str, Any]]:
"""Scrape individual product page"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
async with session.get(url, proxy=f"http://{proxy}", headers=headers, timeout=15) as response:
if response.status == 200:
html = await response.text()
return self.parse_product_data(html, url)
else:
logging.warning(f"Failed to scrape {url}: Status {response.status}")
except Exception as e:
logging.error(f"Error scraping {url} with proxy {proxy}: {e}")
return None
def parse_product_data(self, html: str, url: str) -> Dict[str, Any]:
"""Parse product data from HTML"""
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
# Generic selectors - customize for specific sites
title_selectors = ['h1', '.product-title', '[data-testid="product-title"]']
price_selectors = ['.price', '.product-price', '[data-testid="price"]']
title = self.extract_text_by_selectors(soup, title_selectors)
price = self.extract_text_by_selectors(soup, price_selectors)
return {
'url': url,
'title': title,
'price': self.clean_price(price),
'scraped_at': datetime.now().isoformat(),
'domain': urlparse(url).netloc
}
def extract_text_by_selectors(self, soup: BeautifulSoup, selectors: List[str]) -> str:
"""Extract text using multiple selectors"""
for selector in selectors:
element = soup.select_one(selector)
if element:
return element.get_text(strip=True)
return ""
def clean_price(self, price_text: str) -> Optional[float]:
"""Clean and convert price text to float"""
import re
# Remove currency symbols and extract numbers
price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
if price_match:
try:
return float(price_match.group())
except ValueError:
pass
return None
Social Media Monitoring
class SocialMediaScraper:
def __init__(self, proxies: List[str]):
self.proxies = proxies
self.session_manager = SessionManager()
async def scrape_social_mentions(self, keywords: List[str], platforms: List[str]) -> List[Dict[str, Any]]:
"""Scrape social media mentions"""
results = []
for platform in platforms:
platform_results = await self.scrape_platform(platform, keywords)
results.extend(platform_results)
return results
async def scrape_platform(self, platform: str, keywords: List[str]) -> List[Dict[str, Any]]:
"""Scrape specific social media platform"""
if platform == 'twitter':
return await self.scrape_twitter_mentions(keywords)
elif platform == 'reddit':
return await self.scrape_reddit_mentions(keywords)
# Add more platforms as needed
return []
async def scrape_twitter_mentions(self, keywords: List[str]) -> List[Dict[str, Any]]:
"""Scrape Twitter mentions (example implementation)"""
# Note: This is a simplified example
# Real implementation would need to handle Twitter's API or advanced scraping
results = []
for keyword in keywords:
search_url = f"https://twitter.com/search?q={keyword}&src=typed_query"
# Use residential proxies for social media
proxy = self.get_residential_proxy()
# Implement Twitter-specific scraping logic
# This would require handling JavaScript, authentication, etc.
return results
def get_residential_proxy(self) -> str:
"""Get residential proxy for social media scraping"""
# Filter for residential proxies if available
residential_proxies = [p for p in self.proxies if 'residential' in p]
return random.choice(residential_proxies) if residential_proxies else random.choice(self.proxies)
📈 Scaling Your Scraping Operation
Distributed Scraping Architecture
import redis
import json
from typing import Optional
class DistributedScraper:
def __init__(self, redis_host: str = 'localhost', redis_port: int = 6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
self.task_queue = 'scraping_tasks'
self.result_queue = 'scraping_results'
def add_scraping_task(self, url: str, proxy: str, metadata: Dict[str, Any] = None):
"""Add scraping task to distributed queue"""
task = {
'url': url,
'proxy': proxy,
'metadata': metadata or {},
'created_at': time.time(),
'attempts': 0
}
self.redis_client.lpush(self.task_queue, json.dumps(task))
def get_scraping_task(self) -> Optional[Dict[str, Any]]:
"""Get next scraping task from queue"""
task_data = self.redis_client.brpop(self.task_queue, timeout=30)
if task_data:
return json.loads(task_data[1])
return None
def save_result(self, task: Dict[str, Any], result: Dict[str, Any]):
"""Save scraping result"""
result_data = {
'task': task,
'result': result,
'completed_at': time.time()
}
self.redis_client.lpush(self.result_queue, json.dumps(result_data))
def handle_failed_task(self, task: Dict[str, Any], error: str):
"""Handle failed scraping task"""
task['attempts'] += 1
task['last_error'] = error
if task['attempts'] < 3: # Retry up to 3 times
self.redis_client.lpush(self.task_queue, json.dumps(task))
else:
# Move to failed queue
self.redis_client.lpush('failed_tasks', json.dumps(task))
class ScrapingWorker:
def __init__(self, worker_id: str, proxies: List[str]):
self.worker_id = worker_id
self.proxies = proxies
self.scraper = DistributedScraper()
self.running = False
async def start_worker(self):
"""Start the scraping worker"""
self.running = True
logging.info(f"Worker {self.worker_id} started")
while self.running:
task = self.scraper.get_scraping_task()
if task:
await self.process_task(task)
else:
await asyncio.sleep(1) # No tasks available
async def process_task(self, task: Dict[str, Any]):
"""Process individual scraping task"""
try:
url = task['url']
proxy = task['proxy']
# Perform scraping
async with aiohttp.ClientSession() as session:
async with session.get(
url,
proxy=f"http://{proxy}",
timeout=aiohttp.ClientTimeout(total=15)
) as response:
content = await response.text()
result = {
'url': url,
'status_code': response.status,
'content': content,
'worker_id': self.worker_id,
'proxy_used': proxy
}
self.scraper.save_result(task, result)
logging.info(f"Worker {self.worker_id} completed task for {url}")
except Exception as e:
self.scraper.handle_failed_task(task, str(e))
logging.error(f"Worker {self.worker_id} failed task for {task['url']}: {e}")
def stop_worker(self):
"""Stop the scraping worker"""
self.running = False
logging.info(f"Worker {self.worker_id} stopped")
Cloud Deployment Strategies
import boto3
from kubernetes import client, config
class CloudScrapingManager:
def __init__(self, cloud_provider: str = 'aws'):
self.cloud_provider = cloud_provider
if cloud_provider == 'aws':
self.setup_aws()
elif cloud_provider == 'kubernetes':
self.setup_kubernetes()
def setup_aws(self):
"""Setup AWS resources for distributed scraping"""
self.ec2 = boto3.client('ec2')
self.ecs = boto3.client('ecs')
self.sqs = boto3.client('sqs')
def setup_kubernetes(self):
"""Setup Kubernetes for container orchestration"""
config.load_incluster_config() # or load_kube_config() for local
self.k8s_apps = client.AppsV1Api()
self.k8s_core = client.CoreV1Api()
def scale_workers(self, desired_count: int):
"""Scale scraping workers based on demand"""
if self.cloud_provider == 'aws':
self.scale_ecs_service(desired_count)
elif self.cloud_provider == 'kubernetes':
self.scale_k8s_deployment(desired_count)
def scale_ecs_service(self, desired_count: int):
"""Scale ECS service"""
self.ecs.update_service(
cluster='scraping-cluster',
service='scraping-workers',
desiredCount=desired_count
)
def scale_k8s_deployment(self, desired_count: int):
"""Scale Kubernetes deployment"""
self.k8s_apps.patch_namespaced_deployment_scale(
name='scraping-workers',
namespace='default',
body={'spec': {'replicas': desired_count}}
)
🔧 Troubleshooting Common Issues
Proxy Connection Problems
class ProxyTroubleshooter:
def __init__(self):
self.test_urls = [
'http://httpbin.org/ip',
'https://httpbin.org/headers',
'http://icanhazip.com'
]
async def diagnose_proxy(self, proxy: str) -> Dict[str, Any]:
"""Comprehensive proxy diagnosis"""
diagnosis = {
'proxy': proxy,
'connectivity': False,
'response_time': None,
'ip_address': None,
'supports_https': False,
'errors': []
}
# Test basic connectivity
try:
start_time = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(
self.test_urls[0],
proxy=f"http://{proxy}",
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
diagnosis['connectivity'] = True
diagnosis['response_time'] = time.time() - start_time
data = await response.json()
diagnosis['ip_address'] = data.get('origin')
except Exception as e:
diagnosis['errors'].append(f"Connectivity test failed: {e}")
# Test HTTPS support
if diagnosis['connectivity']:
try:
async with aiohttp.ClientSession() as session:
async with session.get(
'https://httpbin.org/ip',
proxy=f"http://{proxy}",
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
diagnosis['supports_https'] = True
except Exception as e:
diagnosis['errors'].append(f"HTTPS test failed: {e}")
return diagnosis
def generate_troubleshooting_report(self, proxy_results: List[Dict[str, Any]]) -> str:
"""Generate human-readable troubleshooting report"""
report = "🔍 Proxy Troubleshooting Report\n"
report += "=" * 50 + "\n\n"
working_proxies = [p for p in proxy_results if p['connectivity']]
failed_proxies = [p for p in proxy_results if not p['connectivity']]
report += f"✅ Working Proxies: {len(working_proxies)}\n"
report += f"❌ Failed Proxies: {len(failed_proxies)}\n\n"
if working_proxies:
report += "Working Proxies:\n"
for proxy in working_proxies:
report += f" • {proxy['proxy']} - {proxy['response_time']:.2f}s - IP: {proxy['ip_address']}\n"
if failed_proxies:
report += "\nFailed Proxies:\n"
for proxy in failed_proxies:
report += f" • {proxy['proxy']}\n"
for error in proxy['errors']:
report += f" - {error}\n"
return report
Anti-Bot Detection Solutions
class AntiDetectionSuite:
def __init__(self):
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
]
self.accept_languages = [
'en-US,en;q=0.9',
'en-GB,en;q=0.9',
'en-US,en;q=0.8,es;q=0.7'
]
def generate_realistic_headers(self) -> Dict[str, str]:
"""Generate realistic browser headers"""
return {
'User-Agent': random.choice(self.user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': random.choice(self.accept_languages),
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
}
def simulate_human_behavior(self):
"""Simulate human-like browsing behavior"""
# Random delays between actions
delay = random.uniform(1, 5)
time.sleep(delay)
# Occasionally simulate longer pauses (reading content)
if random.random() < 0.1: # 10% chance
time.sleep(random.uniform(10, 30))
async def handle_cloudflare_challenge(self, session: aiohttp.ClientSession, url: str):
"""Handle Cloudflare challenges"""
# This is a simplified example
# Real implementation would need more sophisticated handling
try:
async with session.get(url) as response:
if 'cloudflare' in response.headers.get('server', '').lower():
# Wait for challenge to complete
await asyncio.sleep(5)
return await session.get(url)
return response
except Exception as e:
logging.error(f"Cloudflare challenge handling failed: {e}")
return None
📊 Performance Benchmarking
Comprehensive Benchmarking Suite
import statistics
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass
class BenchmarkResult:
proxy_type: str
avg_response_time: float
success_rate: float
throughput_rps: float
total_requests: int
failed_requests: int
bandwidth_usage_mbps: float
class PerformanceBenchmark:
def __init__(self, proxies: Dict[str, List[str]]):
self.proxies = proxies # {'datacenter': [...], 'residential': [...]}
self.test_urls = [
'http://httpbin.org/delay/1',
'https://httpbin.org/json',
'http://httpbin.org/html'
]
async def run_benchmark(self, duration_seconds: int = 300) -> Dict[str, BenchmarkResult]:
"""Run comprehensive benchmark test"""
results = {}
for proxy_type, proxy_list in self.proxies.items():
print(f"🚀 Benchmarking {proxy_type} proxies...")
result = await self.benchmark_proxy_type(proxy_type, proxy_list, duration_seconds)
results[proxy_type] = result
return results
async def benchmark_proxy_type(self, proxy_type: str, proxy_list: List[str], duration: int) -> BenchmarkResult:
"""Benchmark specific proxy type"""
start_time = time.time()
end_time = start_time + duration
response_times = []
successful_requests = 0
failed_requests = 0
total_bytes = 0
tasks = []
while time.time() < end_time:
proxy = random.choice(proxy_list)
url = random.choice(self.test_urls)
task = self.benchmark_single_request(proxy, url)
tasks.append(task)
# Limit concurrent requests
if len(tasks) >= 50:
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, dict) and result.get('success'):
successful_requests += 1
response_times.append(result['response_time'])
total_bytes += result['content_length']
else:
failed_requests += 1
tasks = []
# Process remaining tasks
if tasks:
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, dict) and result.get('success'):
successful_requests += 1
response_times.append(result['response_time'])
total_bytes += result['content_length']
else:
failed_requests += 1
# Calculate metrics
total_requests = successful_requests + failed_requests
actual_duration = time.time() - start_time
return BenchmarkResult(
proxy_type=proxy_type,
avg_response_time=statistics.mean(response_times) if response_times else 0,
success_rate=successful_requests / total_requests if total_requests > 0 else 0,
throughput_rps=successful_requests / actual_duration,
total_requests=total_requests,
failed_requests=failed_requests,
bandwidth_usage_mbps=(total_bytes * 8) / (actual_duration * 1_000_000)
)
async def benchmark_single_request(self, proxy: str, url: str) -> Dict[str, Any]:
"""Benchmark single request"""
start_time = time.time()
try:
async with aiohttp.ClientSession() as session:
async with session.get(
url,
proxy=f"http://{proxy}",
timeout=aiohttp.ClientTimeout(total=15)
) as response:
content = await response.read()
response_time = time.time() - start_time
return {
'success': True,
'response_time': response_time,
'status_code': response.status,
'content_length': len(content)
}
except Exception as e:
return {
'success': False,
'error': str(e),
'response_time': time.time() - start_time
}
def generate_benchmark_report(self, results: Dict[str, BenchmarkResult]) -> str:
"""Generate detailed benchmark report"""
report = "📊 Proxy Performance Benchmark Report\n"
report += "=" * 60 + "\n\n"
for proxy_type, result in results.items():
report += f"🔹 {proxy_type.upper()} PROXIES\n"
report += f" Average Response Time: {result.avg_response_time:.3f}s\n"
report += f" Success Rate: {result.success_rate:.1%}\n"
report += f" Throughput: {result.throughput_rps:.2f} requests/second\n"
report += f" Total Requests: {result.total_requests:,}\n"
report += f" Failed Requests: {result.failed_requests:,}\n"
report += f" Bandwidth Usage: {result.bandwidth_usage_mbps:.2f} Mbps\n\n"
# Performance comparison
if len(results) > 1:
report += "🏆 PERFORMANCE COMPARISON\n"
fastest = min(results.values(), key=lambda x: x.avg_response_time)
most_reliable = max(results.values(), key=lambda x: x.success_rate)
highest_throughput = max(results.values(), key=lambda x: x.throughput_rps)
report += f" Fastest: {fastest.proxy_type} ({fastest.avg_response_time:.3f}s)\n"
report += f" Most Reliable: {most_reliable.proxy_type} ({most_reliable.success_rate:.1%})\n"
report += f" Highest Throughput: {highest_throughput.proxy_type} ({highest_throughput.throughput_rps:.2f} RPS)\n"
return report
🎓 Best Practices Summary
Essential Guidelines for Production Scraping
- Always Respect robots.txt: Check and follow robots.txt directives
- Implement Rate Limiting: Never overwhelm target servers
- Use Appropriate Proxy Types: Match proxy type to use case
- Monitor Proxy Health: Continuously check proxy performance
- Handle Errors Gracefully: Implement comprehensive error handling and retry logic
- Rotate User Agents: Use realistic, rotating user agents
- Monitor Performance: Track metrics and optimize continuously
- Scale Responsibly: Increase load gradually and monitor impact
- Stay Legal: Comply with terms of service and applicable laws
- Document Everything: Maintain logs for debugging and compliance
Production-Ready Scraping Template
import asyncio
import aiohttp
import logging
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import time
import random
@dataclass
class ScrapingConfig:
proxies: List[str]
max_concurrent: int = 10
requests_per_second: float = 1.0
retry_attempts: int = 3
timeout_seconds: int = 15
respect_robots_txt: bool = True
user_agents: List[str] = None
class ProductionScraper:
def __init__(self, config: ScrapingConfig):
self.config = config
self.rate_limiter = RateLimiter(config.requests_per_second)
self.robots_checker = RobotsTxtChecker() if config.respect_robots_txt else None
self.proxy_manager = ProxyPoolManager(config.proxies)
self.performance_monitor = PerformanceMonitor()
self.logger = ScrapingLogger()
# Setup default user agents if not provided
if not config.user_agents:
self.config.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
]
async def scrape_urls(self, urls: List[str]) -> List[Dict[str, Any]]:
"""Main scraping method"""
semaphore = asyncio.Semaphore(self.config.max_concurrent)
tasks = []
for url in urls:
task = self.scrape_single_url(semaphore, url)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if isinstance(r, dict)]
async def scrape_single_url(self, semaphore: asyncio.Semaphore, url: str) -> Optional[Dict[str, Any]]:
"""Scrape single URL with all best practices"""
async with semaphore:
# Check robots.txt
if self.robots_checker and not self.robots_checker.can_fetch(url):
self.logger.log_error(url, "N/A", "Robots.txt disallows", "robots_blocked")
return None
# Rate limiting
domain = urlparse(url).netloc
await self.rate_limiter.wait_if_needed(domain)
# Get best proxy
proxy = self.proxy_manager.get_best_proxy()
if not proxy:
self.logger.log_error(url, "N/A", "No healthy proxies available", "no_proxy")
return None
# Retry logic
for attempt in range(self.config.retry_attempts):
try:
result = await self.make_request(url, proxy)
if result:
self.performance_monitor.record_request()
return result
except Exception as e:
if attempt == self.config.retry_attempts - 1:
self.logger.log_error(url, proxy, str(e), "max_retries_exceeded")
else:
await asyncio.sleep(2 ** attempt) # Exponential backoff
return None
async def make_request(self, url: str, proxy: str) -> Optional[Dict[str, Any]]:
"""Make HTTP request with proxy"""
headers = {
'User-Agent': random.choice(self.config.user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
start_time = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(
url,
proxy=f"http://{proxy}",
headers=headers,
timeout=aiohttp.ClientTimeout(total=self.config.timeout_seconds)
) as response:
content = await response.text()
response_time = time.time() - start_time
self.logger.log_request(url, proxy, response.status, response_time)
return {
'url': url,
'status_code': response.status,
'content': content,
'response_time': response_time,
'proxy_used': proxy,
'scraped_at': time.time()
}
# Usage example
async def main():
config = ScrapingConfig(
proxies=[
"user:[email protected]:12933",
"user:[email protected]:12933",
"user:[email protected]:12933"
],
max_concurrent=5,
requests_per_second=2.0,
retry_attempts=3
)
scraper = ProductionScraper(config)
urls_to_scrape = [
"https://example1.com",
"https://example2.com",
"https://example3.com"
]
results = await scraper.scrape_urls(urls_to_scrape)
print(f"Successfully scraped {len(results)} URLs")
for result in results:
print(f" {result['url']} - {result['status_code']} - {result['response_time']:.2f}s")
if __name__ == "__main__":
asyncio.run(main())
🚀 Conclusion
Web scraping with proxies is both an art and a science. Success requires understanding the technical aspects, respecting legal boundaries, and implementing robust, scalable solutions.
Key Takeaways
Choose the Right Proxy Type: Datacenter proxies like those from PinguProxy offer excellent performance and cost-effectiveness for most scraping operations, with speeds up to 1 Gbit/s and 99.99% uptime.
Implement Smart Rotation: Use intelligent proxy rotation strategies that consider performance metrics, not just random selection.
Respect the Web: Always follow robots.txt, implement appropriate rate limiting, and respect website terms of service.
Monitor and Optimize: Continuously monitor your scraping operations and optimize based on performance metrics.
Scale Responsibly: Start small and scale gradually, always monitoring the impact on target websites.
Stay Compliant: Ensure your scraping activities comply with applicable laws and regulations.
Next Steps
- Start Small: Begin with a simple scraping project using the code examples provided
- Choose Quality Proxies: Invest in reliable datacenter proxies for consistent performance
- Implement Monitoring: Set up comprehensive logging and monitoring from day one
- Test Thoroughly: Always test your scraping setup before deploying to production
- Stay Updated: Keep up with changes in anti-bot detection and proxy technologies
🐧 Ready to Get Started?
Ready to implement professional web scraping with high-performance proxies? PinguProxy offers lightning-fast datacenter proxies with IPv4 and IPv6 support, perfect for your scraping projects.
Get started today:
- 🚀 High Performance: Up to 1 Gbit/s speeds with 99.99% uptime
- 🌐 Massive IP Pool: 4.3 billion unique IP addresses
- 💰 Cost-Effective: Starting at just $9.99/month
- 🔧 Developer-Friendly: Complete API and easy integration
Start Your Journey | Contact Our Team
📚 Useful Resources
Essential Links:
- Contact Support - Get help with your scraping projects
- PinguProxy Dashboard - Manage your proxies and monitor usage
- API Documentation - Complete API reference
- Status Page - Real-time service status
Additional Resources:
Thank you for choosing PinguProxy. Your success is our mission! 💙