#!/usr/bin/env python3 """ Aggregate test results from multiple sources and generate comprehensive reports. Used by CI/CD pipeline to combine results from parallel test execution. """ import json import xml.etree.ElementTree as ET import argparse import sys from pathlib import Path from typing import Dict, List, Any from dataclasses import dataclass, asdict from datetime import datetime import re @dataclass class TestCase: """Individual test case result.""" name: str suite: str status: str # passed, failed, skipped, error duration: float message: str = "" output: str = "" @dataclass class TestSuite: """Test suite results.""" name: str tests: int = 0 passed: int = 0 failed: int = 0 skipped: int = 0 errors: int = 0 duration: float = 0.0 test_cases: List[TestCase] = None def __post_init__(self): if self.test_cases is None: self.test_cases = [] @dataclass class StageResults: """Results for a testing stage.""" name: str status: str total: int passed: int failed: int skipped: int duration: float pass_rate: float emoji: str = "" @dataclass class AggregatedResults: """Complete aggregated test results.""" overall_passed: bool total_tests: int total_passed: int total_failed: int total_skipped: int total_duration: float tests_per_second: float parallel_efficiency: float stage1: StageResults stage2: StageResults stage3: StageResults test_suites: List[TestSuite] failed_tests: List[TestCase] slowest_tests: List[TestCase] timestamp: str class TestResultAggregator: """Aggregates test results from multiple sources.""" def __init__(self, input_dir: Path): self.input_dir = input_dir self.test_suites = {} self.all_tests = [] self.stage_results = {} def parse_junit_xml(self, xml_file: Path) -> TestSuite: """Parse JUnit XML test results.""" try: tree = ET.parse(xml_file) root = tree.getroot() # Handle both testsuites and testsuite root elements if root.tag == "testsuites": suites = root.findall("testsuite") else: suites = [root] suite_results = TestSuite(name=xml_file.stem) for suite_elem in suites: suite_name = suite_elem.get("name", "unknown") for testcase_elem in suite_elem.findall("testcase"): test_name = testcase_elem.get("name") classname = testcase_elem.get("classname", suite_name) time = float(testcase_elem.get("time", 0)) # Determine status status = "passed" message = "" output = "" failure = testcase_elem.find("failure") error = testcase_elem.find("error") skipped = testcase_elem.find("skipped") if failure is not None: status = "failed" message = failure.get("message", "") output = failure.text or "" elif error is not None: status = "error" message = error.get("message", "") output = error.text or "" elif skipped is not None: status = "skipped" message = skipped.get("message", "") test_case = TestCase( name=test_name, suite=classname, status=status, duration=time, message=message, output=output ) suite_results.test_cases.append(test_case) suite_results.tests += 1 suite_results.duration += time if status == "passed": suite_results.passed += 1 elif status == "failed": suite_results.failed += 1 elif status == "skipped": suite_results.skipped += 1 elif status == "error": suite_results.errors += 1 return suite_results except (ET.ParseError, IOError) as e: print(f"Warning: Failed to parse {xml_file}: {e}", file=sys.stderr) return TestSuite(name=xml_file.stem) def parse_json_results(self, json_file: Path) -> TestSuite: """Parse JSON test results (gtest format).""" try: with open(json_file) as f: data = json.load(f) suite_results = TestSuite(name=json_file.stem) # Handle both single suite and multiple suites if "testsuites" in data: suites = data["testsuites"] elif "testsuite" in data: suites = [data] else: suites = [] for suite in suites: suite_name = suite.get("name", "unknown") for test in suite.get("testsuite", []): test_name = test.get("name") status = "passed" if test.get("result") == "COMPLETED" else "failed" duration = float(test.get("time", "0").replace("s", "")) test_case = TestCase( name=test_name, suite=suite_name, status=status, duration=duration, output=test.get("output", "") ) suite_results.test_cases.append(test_case) suite_results.tests += 1 suite_results.duration += duration if status == "passed": suite_results.passed += 1 else: suite_results.failed += 1 return suite_results except (json.JSONDecodeError, IOError, KeyError) as e: print(f"Warning: Failed to parse {json_file}: {e}", file=sys.stderr) return TestSuite(name=json_file.stem) def collect_results(self): """Collect all test results from input directory.""" # Find all result files xml_files = list(self.input_dir.rglob("*.xml")) json_files = list(self.input_dir.rglob("*.json")) print(f"Found {len(xml_files)} XML and {len(json_files)} JSON result files") # Parse XML results for xml_file in xml_files: # Skip non-test XML files if "coverage" in xml_file.name.lower(): continue suite = self.parse_junit_xml(xml_file) if suite.tests > 0: self.test_suites[suite.name] = suite self.all_tests.extend(suite.test_cases) # Parse JSON results for json_file in json_files: # Skip non-test JSON files if any(skip in json_file.name.lower() for skip in ["summary", "metrics", "times", "coverage"]): continue suite = self.parse_json_results(json_file) if suite.tests > 0: # Merge with existing suite if name matches if suite.name in self.test_suites: existing = self.test_suites[suite.name] existing.test_cases.extend(suite.test_cases) existing.tests += suite.tests existing.passed += suite.passed existing.failed += suite.failed existing.skipped += suite.skipped existing.errors += suite.errors existing.duration += suite.duration else: self.test_suites[suite.name] = suite self.all_tests.extend(suite.test_cases) def categorize_by_stage(self): """Categorize results by CI stage.""" # Initialize stage results stages = { "stage1": StageResults("Smoke Tests", "unknown", 0, 0, 0, 0, 0.0, 0.0), "stage2": StageResults("Unit Tests", "unknown", 0, 0, 0, 0, 0.0, 0.0), "stage3": StageResults("Integration Tests", "unknown", 0, 0, 0, 0, 0.0, 0.0), } # Categorize tests for test in self.all_tests: # Determine stage based on test name or suite stage = None if "smoke" in test.name.lower() or "critical" in test.name.lower(): stage = "stage1" elif "unit" in test.suite.lower() or "unit" in test.name.lower(): stage = "stage2" elif ("integration" in test.suite.lower() or "integration" in test.name.lower() or "e2e" in test.name.lower() or "gui" in test.name.lower()): stage = "stage3" else: # Default to unit tests stage = "stage2" if stage: stage_result = stages[stage] stage_result.total += 1 stage_result.duration += test.duration if test.status == "passed": stage_result.passed += 1 elif test.status in ["failed", "error"]: stage_result.failed += 1 elif test.status == "skipped": stage_result.skipped += 1 # Calculate pass rates and status for stage_key, stage in stages.items(): if stage.total > 0: stage.pass_rate = (stage.passed / stage.total) * 100 stage.status = "✅" if stage.failed == 0 else "❌" stage.emoji = "✅" if stage.failed == 0 else "❌" else: stage.status = "⏭️" stage.emoji = "⏭️" self.stage_results = stages def generate_summary(self) -> AggregatedResults: """Generate aggregated summary of all results.""" total_tests = len(self.all_tests) total_passed = sum(1 for t in self.all_tests if t.status == "passed") total_failed = sum(1 for t in self.all_tests if t.status in ["failed", "error"]) total_skipped = sum(1 for t in self.all_tests if t.status == "skipped") total_duration = sum(t.duration for t in self.all_tests) # Find failed tests failed_tests = [t for t in self.all_tests if t.status in ["failed", "error"]] # Find slowest tests slowest_tests = sorted(self.all_tests, key=lambda t: t.duration, reverse=True)[:10] # Calculate metrics tests_per_second = total_tests / total_duration if total_duration > 0 else 0 # Estimate parallel efficiency (simplified) num_shards = len(self.test_suites) if num_shards > 1: ideal_time = total_duration / num_shards actual_time = max(suite.duration for suite in self.test_suites.values()) parallel_efficiency = (ideal_time / actual_time * 100) if actual_time > 0 else 0 else: parallel_efficiency = 100 return AggregatedResults( overall_passed=(total_failed == 0), total_tests=total_tests, total_passed=total_passed, total_failed=total_failed, total_skipped=total_skipped, total_duration=round(total_duration, 2), tests_per_second=round(tests_per_second, 2), parallel_efficiency=round(parallel_efficiency, 1), stage1=self.stage_results.get("stage1"), stage2=self.stage_results.get("stage2"), stage3=self.stage_results.get("stage3"), test_suites=list(self.test_suites.values()), failed_tests=failed_tests, slowest_tests=slowest_tests, timestamp=datetime.now().isoformat() ) def generate_html_report(self, results: AggregatedResults, output_path: Path): """Generate HTML report from aggregated results.""" html = f""" Yaze Test Results - {datetime.now().strftime('%Y-%m-%d %H:%M')}

🎯 Yaze Test Results Report

{'PASSED' if results.overall_passed else 'FAILED'}
Overall Status
{results.total_tests}
Total Tests
{results.total_passed}
Passed
{results.total_failed}
Failed
{results.total_duration}s
Duration
{results.parallel_efficiency}%
Efficiency

📊 Pass Rate

{results.total_passed / results.total_tests * 100:.1f}%

🚀 Stage Results

Stage Status Tests Passed Failed Pass Rate Duration
Stage 1: Smoke {results.stage1.emoji} {results.stage1.total} {results.stage1.passed} {results.stage1.failed} {results.stage1.pass_rate:.1f}% {results.stage1.duration:.2f}s
Stage 2: Unit {results.stage2.emoji} {results.stage2.total} {results.stage2.passed} {results.stage2.failed} {results.stage2.pass_rate:.1f}% {results.stage2.duration:.2f}s
Stage 3: Integration {results.stage3.emoji} {results.stage3.total} {results.stage3.passed} {results.stage3.failed} {results.stage3.pass_rate:.1f}% {results.stage3.duration:.2f}s
{'

❌ Failed Tests

' if results.failed_tests else ''} {''.join(f'' for t in results.failed_tests[:20])} {'
TestSuiteMessage
{t.name}{t.suite}{t.message[:100]}
' if results.failed_tests else ''}

🐌 Slowest Tests

{''.join(f'' for t in results.slowest_tests)}
Test Suite Duration
{t.name}{t.suite}{t.duration:.3f}s

Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | Yaze Project

""" output_path.write_text(html) def main(): """Main entry point.""" parser = argparse.ArgumentParser( description="Aggregate test results from multiple sources" ) parser.add_argument( "--input-dir", type=Path, required=True, help="Directory containing test result files" ) parser.add_argument( "--output", type=Path, default=Path("results_summary.json"), help="Output JSON file for aggregated results" ) parser.add_argument( "--generate-html", type=Path, help="Generate HTML report at specified path" ) args = parser.parse_args() if not args.input_dir.exists(): print(f"Error: Input directory not found: {args.input_dir}", file=sys.stderr) sys.exit(1) # Create aggregator aggregator = TestResultAggregator(args.input_dir) # Collect and process results print("Collecting test results...") aggregator.collect_results() print(f"Found {len(aggregator.all_tests)} total tests across " f"{len(aggregator.test_suites)} suites") # Categorize by stage aggregator.categorize_by_stage() # Generate summary summary = aggregator.generate_summary() # Save JSON summary with open(args.output, 'w') as f: # Convert dataclasses to dict summary_dict = asdict(summary) json.dump(summary_dict, f, indent=2, default=str) print(f"Summary saved to {args.output}") # Generate HTML report if requested if args.generate_html: aggregator.generate_html_report(summary, args.generate_html) print(f"HTML report saved to {args.generate_html}") # Print summary print(f"\n{'=' * 60}") print(f"Test Results Summary") print(f"{'=' * 60}") print(f"Overall Status: {'✅ PASSED' if summary.overall_passed else '❌ FAILED'}") print(f"Total Tests: {summary.total_tests}") print(f"Passed: {summary.total_passed} ({summary.total_passed/summary.total_tests*100:.1f}%)") print(f"Failed: {summary.total_failed}") print(f"Duration: {summary.total_duration}s") # Exit with appropriate code sys.exit(0 if summary.overall_passed else 1) if __name__ == "__main__": main()