4. Communication Channels & Protocols (ΠΠ°Π½Π°Π»ΠΈ ΠΊΠΎΠΌΡΠ½ΡΠΊΠ°ΡΡΡ)¶
π ΠΠΌΡΡΡ¶
- ΠΠ³Π»ΡΠ΄ ΠΊΠ°Π½Π°Π»ΡΠ²
- Event Bus (Observer Pattern)
- Direct Method Calls
- Async Messaging (Celery)
- Π€ΠΎΡΠΌΠ°ΡΠΈ ΠΏΠΎΠ²ΡΠ΄ΠΎΠΌΠ»Π΅Π½Ρ
ΠΠ³Π»ΡΠ΄ ΠΊΠ°Π½Π°Π»ΡΠ²¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β COMMUNICATION CHANNELS β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ β
β β Spider β β
β β (Orchestr.) β β
β ββββββββ¬βββββββ β
β β β
β β 1. Direct Method Calls (sync/async) β
β β Spider β Scheduler.add_url() β
β β Spider β Driver.fetch() β
β β Spider β Storage.save_graph() β
β β β
β βββββββββββββββββββ¬ββββββββββββββββββ¬ββββββββββββββββββ β
β βΌ βΌ βΌ βΌ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Scheduler β β Driver β β Storage β β Plugins β β
β ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ β
β β β β β β
β β 2. Event Bus (Observer Pattern - async/sync) β
β β ΠΠΎΠΌΠΏΠΎΠ½Π΅Π½ΡΠΈ ΠΏΡΠ±Π»ΡΠΊΡΡΡΡ ΠΏΠΎΠ΄ΡΡ β EventBus β Subscribers β
β β β
β βββββββββββββββββββ΄ββββββββββββββββββ΄ββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββ β
β β EventBus β β
β β (Pub/Sub) β β
β ββββββββββ¬βββββββββ β
β β β
β β notify() β
β ββββββββββββββββΌβββββββββββββββ β
β βΌ βΌ βΌ β
β ββββββββββββ ββββββββββββ ββββββββββββ β
β β Dashboardβ β Loggers β β Analyticsβ β
β ββββββββββββ ββββββββββββ ββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β 3. Async Messaging (Celery - Π΄Π»Ρ distributed mode) β
β β
β βββββββββββββββ βββββββββββββββββββ βββββββββββββββ β
β β Coordinator βββββΆβ Redis/RabbitMQ βββββΆβ Workers β β
β β (Master) β β (Broker) β β (Celery) β β
β βββββββββββββββ βββββββββββββββββββ ββββββββ¬βββββββ β
β β β
β βΌ β
β βββββββββββββββ β
β β MongoDB/ β β
β β PostgreSQL β β
β βββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Event Bus¶
ΠΡΠΈΠ·Π½Π°ΡΠ΅Π½Π½Ρ¶
Loose coupling ΠΌΡΠΆ ΠΊΠΎΠΌΠΏΠΎΠ½Π΅Π½ΡΠ°ΠΌΠΈ ΡΠ΅ΡΠ΅Π· Observer Pattern.
ΠΡΠΎΡΠΎΠΊΠΎΠ»¶
# ΠΡΠ±Π»ΡΠΊΠ°ΡΡΡ (Producer)
event = CrawlerEvent.create(
event_type=EventType.NODE_SCANNED,
data={'url': 'https://example.com', 'status': 200},
metadata={'spider_id': 'spider-1'}
)
event_bus.publish(event) # Sync
await event_bus.publish_async(event) # Async
# ΠΡΠ΄ΠΏΠΈΡΠΊΠ° (Consumer)
def handler(event: CrawlerEvent):
print(f"Event: {event.event_type}, Data: {event.data}")
event_bus.subscribe(EventType.NODE_SCANNED, handler)
Π€ΠΎΡΠΌΠ°Ρ CrawlerEvent¶
@dataclass
class CrawlerEvent:
event_type: EventType # Enum Π· 50+ ΡΠΈΠΏΡΠ²
timestamp: datetime # Π§Π°Ρ ΠΏΠΎΠ΄ΡΡ
data: Dict[str, Any] # Payload (JSON-serializable)
metadata: Dict[str, Any] # ΠΠΎΠ΄Π°ΡΠΊΠΎΠ²Ρ ΠΌΠ΅ΡΠ°Π΄Π°Π½Ρ
ΠΠ°ΡΠ΅Π³ΠΎΡΡΡ ΠΏΠΎΠ΄ΡΠΉ¶
| ΠΠ°ΡΠ΅Π³ΠΎΡΡΡ | Event Types | ΠΠΏΠΈΡ |
|---|---|---|
| Node | NODE_CREATED, NODE_SCAN_STARTED, NODE_SCANNED, NODE_FAILED | ΠΠΈΡΡΡΠ²ΠΈΠΉ ΡΠΈΠΊΠ» Π²ΡΠ·Π»Π° |
| Crawler | CRAWL_STARTED, CRAWL_COMPLETED, CRAWL_PAUSED, CRAWL_RESUMED | Π‘ΡΠ°Π½ ΠΊΡΠ°ΡΠ»Π΅ΡΠ° |
| Scheduler | URL_ADDED_TO_QUEUE, URL_EXCLUDED, URL_PRIORITIZED | Π§Π΅ΡΠ³Π° URL |
| Storage | GRAPH_SAVED, GRAPH_LOADED, STORAGE_UPGRADED | ΠΠΏΠ΅ΡΠ°ΡΡΡ Π·Ρ ΡΡ ΠΎΠ²ΠΈΡΠ΅ΠΌ |
| Plugin | PLUGIN_STARTED, PLUGIN_COMPLETED, PLUGIN_FAILED | ΠΠΈΠΊΠΎΠ½Π°Π½Π½Ρ ΠΏΠ»Π°Π³ΡΠ½ΡΠ² |
| Middleware | RATE_LIMIT_WAIT, PROXY_SELECTED, RETRY_STARTED | Middleware events |
| Fetch | FETCH_STARTED, FETCH_SUCCESS, FETCH_ERROR | HTTP Π·Π°ΠΏΠΈΡΠΈ |
| Progress | PROGRESS_UPDATE, PAGE_FETCH_TIME | ΠΠΎΠ½ΡΡΠΎΡΠΈΠ½Π³ |
ΠΡΠΈΠΊΠ»Π°Π΄ΠΈ Π²ΠΈΠΊΠΎΡΠΈΡΡΠ°Π½Π½Ρ¶
# 1. ΠΠΎΠ³ΡΠ²Π°Π½Π½Ρ Π²ΡΡΡ
ΠΏΠΎΠ΄ΡΠΉ
def log_handler(event):
logger.info(f"{event.event_type.value}: {event.data}")
for event_type in EventType:
event_bus.subscribe(event_type, log_handler)
# 2. ΠΠΎΠ½ΡΡΠΎΡΠΈΠ½Π³ ΠΏΡΠΎΠ³ΡΠ΅ΡΡ
def progress_handler(event):
data = event.data
print(f"Progress: {data['scanned']}/{data['total']} ({data['percent']:.1f}%)")
event_bus.subscribe(EventType.PROGRESS_UPDATE, progress_handler)
# 3. Error alerting
async def error_alert(event):
if event.data.get('severity') == 'critical':
await send_slack_alert(event.data)
event_bus.subscribe(EventType.ERROR_OCCURRED, error_alert)
# 4. Analytics collection
class AnalyticsCollector:
def __init__(self):
self.events = []
def collect(self, event):
self.events.append(event.to_dict())
collector = AnalyticsCollector()
event_bus.subscribe(EventType.NODE_SCANNED, collector.collect)
Direct Method Calls¶
ΠΡΠΎΡΠΎΠΊΠΎΠ» Spider β Components¶
Spider
β
ββββΆ Scheduler
β β’ add_url(url, depth) β bool
β β’ add_node(node) β bool
β β’ get_next_url() β Optional[Tuple[str, int]]
β β’ is_empty() β bool
β
ββββΆ Driver (async)
β β’ await fetch(url) β FetchResponse
β β’ await fetch_many(urls) β List[FetchResponse]
β β’ await close()
β
ββββΆ Storage (async)
β β’ await save_graph(graph) β bool
β β’ await load_graph() β Optional[Graph]
β β’ await exists() β bool
β β’ await close()
β
ββββΆ NodeScanner
β β’ await scan_node(node, html) β List[str]
β
ββββΆ LinkProcessor
β’ process_links(source_node, links) β List[Node]
β’ create_edge(source, target) β Edge
ΠΡΠΎΡΠΎΠΊΠΎΠ» Node β Plugins¶
# Node.process_html() Π²Π½ΡΡΡΡΡΠ½ΡΠΎ Π²ΠΈΠΊΠ»ΠΈΠΊΠ°Ρ:
await plugin_manager.execute(NodePluginType.ON_BEFORE_SCAN, context)
await plugin_manager.execute(NodePluginType.ON_HTML_PARSED, context)
await plugin_manager.execute(NodePluginType.ON_AFTER_SCAN, context)
ΠΠ½ΡΠ΅ΡΡΠ΅ΠΉΡΠΈ (Protocols)¶
# IDriver
class IDriver(Protocol):
async def fetch(self, url: str) -> FetchResponse: ...
async def fetch_many(self, urls: List[str]) -> List[FetchResponse]: ...
async def close(self) -> None: ...
# IStorage
class IStorage(Protocol):
async def save_graph(self, graph) -> bool: ...
async def load_graph(self) -> Optional[Graph]: ...
async def exists(self) -> bool: ...
async def close(self) -> None: ...
# IFilter
class IFilter(Protocol):
def should_scan(self, url: str) -> bool: ...
def apply(self, url: str, node: Node) -> bool: ...
Async Messaging¶
Celery Architecture¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DISTRIBUTED CRAWLING β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β LOCAL (Master) β
β βββββββββββββββββββββββ β
β β EasyDistributedCrawler β β
β β β β
β β β’ from_yaml(config) β β
β β β’ crawl() β β
β β β’ get_stats() β β
β ββββββββββββ¬βββββββββββ β
β β β
β β 1. Push crawl_page_task β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββ β
β β REDIS/RABBITMQ BROKER β β
β β β β
β β Queue: graph_crawler β β
β β βββββββ βββββββ βββββββ βββββββ β β
β β βTask1β βTask2β βTask3β β... β β β
β β βββββββ βββββββ βββββββ βββββββ β β
β β β β
β β Format: β β
β β { β β
β β "url": "https://...", β β
β β "depth": 2, β β
β β "config": {...} β β
β β } β β
β ββββββββββββββββββββ¬βββββββββββββββββββββββββββ β
β β β
β β 2. Workers pull tasks β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CELERY WORKERS (N servers) β β
β β β β
β β βββββββββββββ βββββββββββββ βββββββββββββ β β
β β β Worker 1 β β Worker 2 β β Worker N β β β
β β β β β β β β β β
β β β β’ Driver β β β’ Driver β β β’ Driver β β β
β β β β’ Plugins β β β’ Plugins β β β’ Plugins β β β
β β β β’ Parser β β β’ Parser β β β’ Parser β β β
β β βββββββ¬ββββββ βββββββ¬ββββββ βββββββ¬ββββββ β β
β β β β β β β
β β β 3. Fetch & Extract β β
β β βΌ βΌ βΌ β β
β ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ β
β β β
β β 4. Save results β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββ β
β β MONGODB/POSTGRESQL (Results) β β
β β β β
β β Collection: nodes β β
β β Collection: edges β β
β β Collection: queue (pending URLs) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Task Protocol¶
# Celery Task Definition
@celery.task(bind=True, max_retries=3)
def crawl_page_task(self, url: str, depth: int, config: dict):
"""
Crawl single page task.
Input:
url: URL to crawl
depth: Current depth
config: Crawler configuration
Output:
{
'url': str,
'status': 'success' | 'error',
'node': NodeDTO | None,
'links': List[str],
'error': str | None
}
"""
...
Configuration Protocol (YAML)¶
# config.yaml
broker:
type: redis # redis | rabbitmq
host: server11.example.com
port: 6379
db: 0
database:
type: mongodb # mongodb | postgresql
host: server12.example.com
port: 27017
database: crawler_results
crawl_task:
urls:
- https://example.com
max_depth: 3
max_pages: 1000
extractors:
- phones
- emails
- prices
workers: 10
task_time_limit: 600
Π€ΠΎΡΠΌΠ°ΡΠΈ ΠΏΠΎΠ²ΡΠ΄ΠΎΠΌΠ»Π΅Π½Ρ¶
FetchResponse¶
@dataclass
class FetchResponse:
url: str # Original URL
html: Optional[str] # HTML content
status_code: Optional[int] # HTTP status
headers: Dict[str, str] # Response headers
error: Optional[str] # Error message
final_url: Optional[str] # After redirects
redirect_chain: List[str] # Redirect history
# Properties
response.is_success # error is None and html is not None
response.is_ok # status_code 2xx
response.is_redirect # final_url != url
NodePluginContext¶
@dataclass
class NodePluginContext:
# Basic (always available)
node: Node
url: str
depth: int
should_scan: bool
can_create_edges: bool
# HTML Stage (after fetch)
html: Optional[str]
html_tree: Optional[Any] # BeautifulSoup/lxml tree
parser: Optional[BaseAdapter]
# Results (modifiable)
metadata: Dict[str, Any]
user_data: Dict[str, Any]
extracted_links: List[str]
MiddlewareContext¶
@dataclass
class MiddlewareContext:
url: str
headers: Dict[str, str]
cookies: Dict[str, str]
proxy: Optional[str]
timeout: int
# Response (POST_REQUEST)
response: Optional[FetchResponse]
error: Optional[Exception]
# Control
skip_request: bool = False # Skip fetch (cache hit)
retry_count: int = 0
GraphDTO (Clean Architecture)¶
@dataclass
class GraphDTO:
nodes: List[NodeDTO]
edges: List[EdgeDTO]
stats: GraphStatsDTO
metadata: Dict[str, Any]
@dataclass
class NodeDTO:
url: str
node_id: str
depth: int
scanned: bool
metadata: Dict[str, Any]
user_data: Dict[str, Any]
@dataclass
class EdgeDTO:
edge_id: str
source_node_id: str
target_node_id: str
anchor_text: str
link_type: List[str]