from locust import task from locust_plugins.users.playwright import PlaywrightUser, PageWithRetry, pw, event import json class ComprehensiveAxeSiteTester ( PlaywrightUser ): def on_start ( self ): """Initialize comprehensive axe site testing""" self .all_violations = [] self .pages_scanned = set () self .pages_to_scan = [] self .scan_summary = { 'total_pages' : 0 , 'total_violations' : 0 , 'critical_violations' : 0 , 'serious_violations' : 0 , 'moderate_violations' : 0 , 'minor_violations' : 0 } @task ( 3 ) @pw async def comprehensive_site_scan ( self , page : PageWithRetry): """Comprehensive axe scan across entire site""" if not self .pages_to_scan: await self ._discover_all_pages(page) if self .pages_to_scan: page_url = self .pages_to_scan.pop( 0 ) if page_url not in self .pages_scanned: await self ._comprehensive_axe_test(page, page_url) self .pages_scanned.add(page_url) async def _discover_all_pages ( self , page : PageWithRetry): """Discover all internal pages for comprehensive testing""" async with event( self , "Site Discovery" ): await page.goto( '/' ) # Get all internal links links = await page.locator( 'a[href]' ).all() for link in links[: 50 ]: # Limit to 50 pages for comprehensive testing try : href = await link.get_attribute( 'href' ) if href and self ._is_internal_page(href, page.url): normalized_url = self ._normalize_url(href, page.url) if normalized_url and normalized_url not in self .pages_to_scan: self .pages_to_scan.append(normalized_url) except : pass print ( f "Discovered { len ( self .pages_to_scan) } pages for comprehensive axe scanning" ) def _is_internal_page ( self , href , base_url ): """Check if URL is internal page""" if not href: return False # Skip anchors, external protocols, and resources skip_patterns = [ '#' , 'mailto:' , 'tel:' , 'javascript:' , '.pdf' , '.jpg' , '.jpeg' , '.png' , '.gif' , '.css' , '.js' , '.zip' , '.doc' , '.xml' ] if any (pattern in href.lower() for pattern in skip_patterns): return False # Check if internal if href.startswith( '/' ): return True elif href.startswith( 'http' ): base_domain = base_url.split( '/' )[ 2 ] return base_domain in href else : return True # Relative links def _normalize_url ( self , href , base_url ): """Normalize URL for consistency""" if href.startswith( '/' ): return href elif href.startswith( 'http' ): base_domain = base_url.split( '/' )[ 2 ] if base_domain in href: return '/' + href.split(base_domain, 1 )[ 1 ].lstrip( '/' ) else : # Relative URL return '/' + href.lstrip( './' ) return None async def _comprehensive_axe_test ( self , page : PageWithRetry, page_url : str ): """Comprehensive axe test on a single page""" async with event( self , f "COMPREHENSIVE AXE: { page_url } " ): await page.goto(page_url) # Inject axe-core await self ._inject_axe_core(page) # Run comprehensive axe scan with all rule sets axe_results = await self ._run_comprehensive_axe_scan(page) if axe_results: self ._process_comprehensive_results(page_url, axe_results) violations = axe_results.get( 'violations' , []) if violations: critical = len ([v for v in violations if v.get( 'impact' ) == 'critical' ]) serious = len ([v for v in violations if v.get( 'impact' ) == 'serious' ]) failure_msg = f "❌ { len (violations) } axe violations ( { critical } critical, { serious } serious)" raise Exception (failure_msg) else : print ( f "✅ { page_url } - No violations found" ) async def _inject_axe_core ( self , page : PageWithRetry): """Inject axe-core library""" await page.add_script_tag( url = "https://cdnjs.cloudflare.com/ajax/libs/axe-core/4.8.2/axe.min.js" ) await page.wait_for_function( "typeof axe !== 'undefined'" ) async def _run_comprehensive_axe_scan ( self , page : PageWithRetry): """Run comprehensive axe scan with all rules""" try : # Configure comprehensive scan axe_config = { 'runOnly' : { 'type' : 'tag' , 'values' : [ 'wcag2a' , 'wcag2aa' , 'wcag21aa' , 'wcag22aa' , 'best-practice' , 'experimental' ] } } results = await page.evaluate( """ (config) => { return new Promise((resolve) => { axe.run(config, (err, results) => { if (err) { resolve(null); } else { resolve(results); } }); }); } """ , axe_config) return results except Exception as e: print ( f "Error running comprehensive axe scan: { str (e) } " ) return None def _process_comprehensive_results ( self , page_url , results ): """Process comprehensive axe results""" violations = results.get( 'violations' , []) self .scan_summary[ 'total_pages' ] += 1 self .scan_summary[ 'total_violations' ] += len (violations) for violation in violations: impact = violation.get( 'impact' , 'unknown' ) # Count by impact if impact == 'critical' : self .scan_summary[ 'critical_violations' ] += 1 elif impact == 'serious' : self .scan_summary[ 'serious_violations' ] += 1 elif impact == 'moderate' : self .scan_summary[ 'moderate_violations' ] += 1 elif impact == 'minor' : self .scan_summary[ 'minor_violations' ] += 1 # Store detailed violation violation_detail = { 'page' : page_url, 'rule_id' : violation.get( 'id' ), 'impact' : impact, 'description' : violation.get( 'description' ), 'help' : violation.get( 'help' ), 'help_url' : violation.get( 'helpUrl' ), 'tags' : violation.get( 'tags' , []), 'nodes' : len (violation.get( 'nodes' , [])) } self .all_violations.append(violation_detail) # Log violation print ( f "AXE [ { impact.upper() } ]: { violation_detail[ 'rule_id' ] } - { violation_detail[ 'description' ] } " f "( { violation_detail[ 'nodes' ] } elements) on { page_url } " ) @task ( 1 ) @pw async def generate_accessibility_audit_report ( self , page : PageWithRetry): """Generate comprehensive accessibility audit report""" if self .scan_summary[ 'total_pages' ] < 3 : return async with event( self , "Accessibility Audit Report" ): # Calculate compliance score total_possible_score = self .scan_summary[ 'total_pages' ] * 100 penalty_score = ( self .scan_summary[ 'critical_violations' ] * 10 + self .scan_summary[ 'serious_violations' ] * 5 + self .scan_summary[ 'moderate_violations' ] * 2 + self .scan_summary[ 'minor_violations' ] * 1 ) compliance_score = max ( 0 , 100 - (penalty_score / total_possible_score * 100 )) print ( f "

ACCESSIBILITY AUDIT REPORT:" ) print ( f "Pages scanned: { self .scan_summary[ 'total_pages' ] } " ) print ( f "Total violations: { self .scan_summary[ 'total_violations' ] } " ) print ( f "Critical: { self .scan_summary[ 'critical_violations' ] } " ) print ( f "Serious: { self .scan_summary[ 'serious_violations' ] } " ) print ( f "Moderate: { self .scan_summary[ 'moderate_violations' ] } " ) print ( f "Minor: { self .scan_summary[ 'minor_violations' ] } " ) print ( f "Compliance Score: { compliance_score :.1f} /100" ) def on_stop ( self ): """Final comprehensive report""" print ( "

" + "=" * 60 ) print ( "COMPREHENSIVE AXE ACCESSIBILITY AUDIT COMPLETE" ) print ( "=" * 60 ) print ( f "Total pages scanned: { len ( self .pages_scanned) } " ) print ( f "Total violations found: { len ( self .all_violations) } " ) if self .all_violations: # Top violation types rule_counts = {} for violation in self .all_violations: rule_id = violation[ 'rule_id' ] rule_counts[rule_id] = rule_counts.get(rule_id, 0 ) + 1 print ( "

TOP VIOLATION TYPES:" ) sorted_rules = sorted (rule_counts.items(), key = lambda x : x[ 1 ], reverse = True ) for rule_id, count in sorted_rules[: 10 ]: print ( f " { rule_id } : { count } violations" ) # Most problematic pages page_counts = {} for violation in self .all_violations: page = violation[ 'page' ] page_counts[page] = page_counts.get(page, 0 ) + 1 print ( "

MOST PROBLEMATIC PAGES:" ) sorted_pages = sorted (page_counts.items(), key = lambda x : x[ 1 ], reverse = True ) for page, count in sorted_pages[: 5 ]: print ( f " { page } : { count } violations" ) else : print ( "✅ No accessibility violations found across entire site!" )