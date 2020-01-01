from locust import HttpUser, task, between import json import time import random import re from urllib.parse import urljoin, urlparse, urlunparse from collections import defaultdict, deque import requests class BrokenLinksChecker ( HttpUser ): wait_time = between( 0.5 , 2 ) def on_start ( self ): """Initialize broken links checking""" self .visited_pages = set () self .pages_to_check = deque() self .broken_links = [] self .redirect_chains = [] self .external_links = {} self .anchor_links = {} self .link_stats = defaultdict( int ) self .base_domain = None # Start with homepage self ._initialize_crawler() def _initialize_crawler ( self ): """Initialize the crawler with the homepage""" homepage_response = self .client.get( '/' , name = "homepage_initial_check" ) if homepage_response.status_code == 200 : self .base_domain = urlparse( self .client.base_url).netloc self .pages_to_check.append( '/' ) print ( f "Starting broken links check for domain: { self .base_domain } " ) else : print ( f "Failed to access homepage: { homepage_response.status_code } " ) @task ( 5 ) def crawl_and_check_links ( self ): """Main crawling task to check links on pages""" if not self .pages_to_check: return current_page = self .pages_to_check.popleft() if current_page in self .visited_pages: return self .visited_pages.add(current_page) # Get the page content response = self .client.get(current_page, name = "crawl_page_for_links" ) if response.status_code == 200 : self ._extract_and_validate_links(current_page, response.text) else : self ._record_broken_link(current_page, response.status_code, "Page not accessible" ) def _extract_and_validate_links ( self , page_url , html_content ): """Extract all links from HTML and validate them""" # Find all links in the HTML link_patterns = [ r '<a [ ^ > ] + href= [ " \' ]([ ^ " \' ] + )[ " \' ][ ^ > ] * >' , # <a href="..."> r '<link [ ^ > ] + href= [ " \' ]([ ^ " \' ] + )[ " \' ][ ^ > ] * >' , # <link href="..."> r '<img [ ^ > ] + src= [ " \' ]([ ^ " \' ] + )[ " \' ][ ^ > ] * >' , # <img src="..."> r '<script [ ^ > ] + src= [ " \' ]([ ^ " \' ] + )[ " \' ][ ^ > ] * >' , # <script src="..."> r '<iframe [ ^ > ] + src= [ " \' ]([ ^ " \' ] + )[ " \' ][ ^ > ] * >' , # <iframe src="..."> ] all_links = [] for pattern in link_patterns: links = re.findall(pattern, html_content, re. IGNORECASE ) all_links.extend(links) # Process each link for link in all_links: self ._validate_single_link(page_url, link) # Add internal pages to crawl queue internal_links = [link for link in all_links if self ._is_internal_link(link)] for link in internal_links: normalized_link = self ._normalize_url(link) if normalized_link and normalized_link not in self .visited_pages: self .pages_to_check.append(normalized_link) def _validate_single_link ( self , source_page , link ): """Validate a single link and categorize issues""" self .link_stats[ 'total_links' ] += 1 # Skip certain types of links if self ._should_skip_link(link): self .link_stats[ 'skipped_links' ] += 1 return # Handle different link types if link.startswith( '#' ): self ._validate_anchor_link(source_page, link) elif self ._is_internal_link(link): self ._validate_internal_link(source_page, link) else : self ._validate_external_link(source_page, link) def _should_skip_link ( self , link ): """Determine if a link should be skipped""" skip_patterns = [ r ' ^ mailto:' , r ' ^ tel:' , r ' ^ javascript:' , r ' ^ data:' , r ' ^ # $ ' , # Empty anchor r ' ^\s * $ ' , # Empty or whitespace ] for pattern in skip_patterns: if re.match(pattern, link, re. IGNORECASE ): return True return False def _validate_internal_link ( self , source_page , link ): """Validate internal links""" self .link_stats[ 'internal_links' ] += 1 normalized_url = self ._normalize_url(link) if not normalized_url: self ._record_broken_link(source_page, 0 , f "Invalid URL format: { link } " ) return # Check if it's a fragment link if '#' in normalized_url: base_url, fragment = normalized_url.split( '#' , 1 ) self ._validate_anchor_link(source_page, f "# { fragment } " , base_url) normalized_url = base_url # Test the internal link with self .client.get(normalized_url, name = "validate_internal_link" , catch_response = True ) as response: if response.status_code == 404 : self ._record_broken_link(source_page, 404 , f "Internal link not found: { link } " ) response.failure( "404 Not Found" ) elif response.status_code >= 400 : self ._record_broken_link(source_page, response.status_code, f "Internal link error: { link } " ) response.failure( f "HTTP { response.status_code } " ) elif 300 <= response.status_code < 400 : self ._track_redirect_chain(source_page, link, response) response.success() else : self .link_stats[ 'valid_internal_links' ] += 1 response.success() def _validate_external_link ( self , source_page , link ): """Validate external links (with rate limiting)""" self .link_stats[ 'external_links' ] += 1 # Rate limit external link checking domain = urlparse(link).netloc current_time = time.time() if domain in self .external_links: last_check = self .external_links[domain].get( 'last_check' , 0 ) if current_time - last_check < 5 : # 5 second rate limit per domain return self .external_links[domain] = { 'last_check' : current_time} try : # Use requests for external links to avoid LoadForge rate limits response = requests.head(link, timeout = 10 , allow_redirects = True ) if response.status_code == 404 : self ._record_broken_link(source_page, 404 , f "External link not found: { link } " ) elif response.status_code >= 400 : self ._record_broken_link(source_page, response.status_code, f "External link error: { link } " ) else : self .link_stats[ 'valid_external_links' ] += 1 except requests.exceptions.RequestException as e: self ._record_broken_link(source_page, 0 , f "External link failed: { link } - { str (e) } " ) def _validate_anchor_link ( self , source_page , anchor , target_page = None ): """Validate anchor/fragment links""" self .link_stats[ 'anchor_links' ] += 1 if target_page is None : target_page = source_page # Get the target page content to check for anchor response = self .client.get(target_page, name = "validate_anchor_target" ) if response.status_code == 200 : anchor_id = anchor[ 1 :] # Remove the # # Check for anchor in HTML anchor_patterns = [ f 'id=[" \' ]? { re.escape(anchor_id) } [" \' ]?' , f 'name=[" \' ]? { re.escape(anchor_id) } [" \' ]?' , f '<a[^>]+name=[" \' ]? { re.escape(anchor_id) } [" \' ]?[^>]*>' , ] found_anchor = False for pattern in anchor_patterns: if re.search(pattern, response.text, re. IGNORECASE ): found_anchor = True break if not found_anchor: self ._record_broken_link(source_page, 0 , f "Anchor not found: { anchor } on page { target_page } " ) else : self .link_stats[ 'valid_anchor_links' ] += 1 else : self ._record_broken_link(source_page, response.status_code, f "Cannot check anchor { anchor } - target page error" ) def _track_redirect_chain ( self , source_page , original_link , response ): """Track and analyze redirect chains""" redirect_info = { 'source_page' : source_page, 'original_link' : original_link, 'status_code' : response.status_code, 'final_url' : response.url, 'redirect_count' : len (response.history) } if redirect_info[ 'redirect_count' ] > 3 : self ._record_broken_link(source_page, response.status_code, f "Too many redirects ( { redirect_info[ 'redirect_count' ] } ): { original_link } " ) self .redirect_chains.append(redirect_info) self .link_stats[ 'redirected_links' ] += 1 def _record_broken_link ( self , source_page , status_code , description ): """Record a broken link with details""" broken_link = { 'source_page' : source_page, 'status_code' : status_code, 'description' : description, 'timestamp' : time.time(), 'severity' : self ._get_severity(status_code) } self .broken_links.append(broken_link) self .link_stats[ 'broken_links' ] += 1 print ( f "BROKEN LINK: { description } (found on { source_page } )" ) def _get_severity ( self , status_code ): """Determine severity of broken link""" if status_code == 404 : return 'HIGH' elif status_code >= 500 : return 'CRITICAL' elif status_code >= 400 : return 'MEDIUM' else : return 'LOW' def _is_internal_link ( self , link ): """Check if a link is internal to the current domain""" if link.startswith( '/' ): return True if link.startswith( 'http' ): return urlparse(link).netloc == self .base_domain return True # Relative links are internal def _normalize_url ( self , link ): """Normalize URL for consistent checking""" try : if link.startswith( '/' ): return link elif link.startswith( 'http' ): parsed = urlparse(link) if parsed.netloc == self .base_domain: return parsed.path + ( '?' + parsed.query if parsed.query else '' ) + ( '#' + parsed.fragment if parsed.fragment else '' ) return link else : # Relative link return '/' + link.lstrip( './' ) except : return None @task ( 1 ) def generate_broken_links_report ( self ): """Generate comprehensive broken links report""" if len ( self .broken_links) == 0 and len ( self .visited_pages) < 5 : return # Not enough data yet report = { 'timestamp' : time.time(), 'domain' : self .base_domain, 'pages_crawled' : len ( self .visited_pages), 'total_broken_links' : len ( self .broken_links), 'link_statistics' : dict ( self .link_stats), 'broken_links_by_severity' : self ._group_by_severity(), 'top_broken_pages' : self ._get_top_broken_pages(), 'redirect_analysis' : self ._analyze_redirects() } # Send report to monitoring endpoint self .client.post( '/api/qa/broken-links-report' , json = report, name = "submit_broken_links_report" ) print ( f "REPORT: Found { len ( self .broken_links) } broken links across { len ( self .visited_pages) } pages" ) def _group_by_severity ( self ): """Group broken links by severity""" severity_groups = defaultdict( list ) for link in self .broken_links: severity_groups[link[ 'severity' ]].append(link) return dict (severity_groups) def _get_top_broken_pages ( self ): """Get pages with most broken links""" page_counts = defaultdict( int ) for link in self .broken_links: page_counts[link[ 'source_page' ]] += 1 return sorted (page_counts.items(), key = lambda x : x[ 1 ], reverse = True )[: 10 ] def _analyze_redirects ( self ): """Analyze redirect patterns""" return { 'total_redirects' : len ( self .redirect_chains), 'excessive_redirects' : len ([r for r in self .redirect_chains if r[ 'redirect_count' ] > 3 ]), 'redirect_domains' : list ( set ([urlparse(r[ 'final_url' ]).netloc for r in self .redirect_chains])) } def on_stop ( self ): """Final report on test completion""" print ( "

=== BROKEN LINKS CHECK COMPLETE ===" ) print ( f "Pages crawled: { len ( self .visited_pages) } " ) print ( f "Total links checked: { self .link_stats[ 'total_links' ] } " ) print ( f "Broken links found: { len ( self .broken_links) } " ) print ( f "Redirect chains: { len ( self .redirect_chains) } " ) if self .broken_links: print ( "

BROKEN LINKS SUMMARY:" ) for link in self .broken_links[: 10 ]: # Show first 10 print ( f " - { link[ 'description' ] } [ { link[ 'severity' ] } ]" )