# DNScope-reduced/utils/export_manager.py """ Centralized export functionality for DNScope. Handles all data export operations with forensic integrity and proper formatting. ENHANCED: Professional forensic executive summary generation for court-ready documentation. """ import json from datetime import datetime, timezone from typing import Dict, Any, List, Optional, Set, Tuple from decimal import Decimal from collections import defaultdict, Counter import networkx as nx from utils.helpers import _is_valid_domain, _is_valid_ip class ExportManager: """ Centralized manager for all DNScope export operations. Maintains forensic integrity and provides consistent export formats. ENHANCED: Advanced forensic analysis and professional reporting capabilities. """ def __init__(self): """Initialize export manager.""" pass def export_scan_results(self, scanner) -> Dict[str, Any]: """ Export complete scan results with forensic metadata. Args: scanner: Scanner instance with completed scan data Returns: Complete scan results dictionary """ graph_data = self.export_graph_json(scanner.graph) audit_trail = scanner.logger.export_audit_trail() provider_stats = {} for provider in scanner.providers: provider_stats[provider.get_name()] = provider.get_statistics() results = { 'scan_metadata': { 'target_domain': scanner.current_target, 'max_depth': scanner.max_depth, 'final_status': scanner.status, 'total_indicators_processed': scanner.indicators_processed, 'enabled_providers': list(provider_stats.keys()), 'session_id': scanner.session_id }, 'graph_data': graph_data, 'forensic_audit': audit_trail, 'provider_statistics': provider_stats, 'scan_summary': scanner.logger.get_forensic_summary() } # Add export metadata results['export_metadata'] = { 'export_timestamp': datetime.now(timezone.utc).isoformat(), 'export_version': '1.0.0', 'forensic_integrity': 'maintained' } return results def export_targets_list(self, scanner) -> str: """ Export all discovered domains and IPs as a text file. Args: scanner: Scanner instance with graph data Returns: Newline-separated list of targets """ nodes = scanner.graph.get_graph_data().get('nodes', []) targets = { node['id'] for node in nodes if _is_valid_domain(node['id']) or _is_valid_ip(node['id']) } return "\n".join(sorted(list(targets))) def generate_executive_summary(self, scanner) -> str: """ ENHANCED: Generate a comprehensive, court-ready forensic executive summary. Args: scanner: Scanner instance with completed scan data Returns: Professional forensic summary formatted for investigative use """ report = [] now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') # Get comprehensive data for analysis graph_data = scanner.graph.get_graph_data() nodes = graph_data.get('nodes', []) edges = graph_data.get('edges', []) audit_trail = scanner.logger.export_audit_trail() # Perform advanced analysis infrastructure_analysis = self._analyze_infrastructure_patterns(nodes, edges) # === HEADER AND METADATA === report.extend([ "=" * 80, "DIGITAL INFRASTRUCTURE RECONNAISSANCE REPORT", "=" * 80, "", f"Report Generated: {now}", f"Investigation Target: {scanner.current_target}", f"Analysis Session: {scanner.session_id}", f"Scan Depth: {scanner.max_depth} levels", f"Final Status: {scanner.status.upper()}", "" ]) # === EXECUTIVE SUMMARY === report.extend([ "EXECUTIVE SUMMARY", "-" * 40, "", f"This report presents the findings of a comprehensive passive reconnaissance analysis " f"conducted against the target '{scanner.current_target}'. The investigation employed " f"multiple intelligence sources and discovered {len(nodes)} distinct digital entities " f"connected through {len(edges)} verified relationships.", "", f"The analysis reveals a digital infrastructure comprising {infrastructure_analysis['domains']} " f"domain names, {infrastructure_analysis['ips']} IP addresses, and {infrastructure_analysis['isps']} " f"infrastructure service providers. Certificate transparency analysis identified " f"{infrastructure_analysis['cas']} certificate authorities managing the cryptographic " f"infrastructure for the investigated entities.", "", ]) # === METHODOLOGY === report.extend([ "INVESTIGATIVE METHODOLOGY", "-" * 40, "", "This analysis employed passive reconnaissance techniques using the following verified data sources:", "" ]) provider_info = { 'dns': 'Standard DNS resolution and reverse DNS lookups', 'crtsh': 'Certificate Transparency database analysis via crt.sh', 'shodan': 'Internet-connected device intelligence via Shodan API' } for provider in scanner.providers: provider_name = provider.get_name() stats = provider.get_statistics() description = provider_info.get(provider_name, f'{provider_name} data provider') report.extend([ f"• {provider.get_display_name()}: {description}", f" - Total Requests: {stats['total_requests']}", f" - Success Rate: {stats['success_rate']:.1f}%", f" - Relationships Discovered: {stats['relationships_found']}", "" ]) # === INFRASTRUCTURE ANALYSIS === report.extend([ "INFRASTRUCTURE ANALYSIS", "-" * 40, "" ]) # Domain Analysis if infrastructure_analysis['domains'] > 0: report.extend([ f"Domain Name Infrastructure ({infrastructure_analysis['domains']} entities):", "" ]) domain_details = self._get_detailed_domain_analysis(nodes, edges) for domain_info in domain_details[:10]: # Top 10 domains report.extend([ f"• {domain_info['domain']}", f" - Type: {domain_info['classification']}", f" - Connected IPs: {len(domain_info['ips'])}", f" - Certificate Status: {domain_info['cert_status']}", f" - Relationship Confidence: {domain_info['avg_confidence']:.2f}", ]) if domain_info['security_notes']: report.extend([ f" - Security Notes: {', '.join(domain_info['security_notes'])}", ]) report.append("") # IP Address Analysis if infrastructure_analysis['ips'] > 0: report.extend([ f"IP Address Infrastructure ({infrastructure_analysis['ips']} entities):", "" ]) ip_details = self._get_detailed_ip_analysis(nodes, edges) for ip_info in ip_details[:8]: # Top 8 IPs report.extend([ f"• {ip_info['ip']} ({ip_info['version']})", f" - Associated Domains: {len(ip_info['domains'])}", f" - ISP: {ip_info['isp'] or 'Unknown'}", f" - Geographic Location: {ip_info['location'] or 'Not determined'}", ]) if ip_info['open_ports']: report.extend([ f" - Exposed Services: {', '.join(map(str, ip_info['open_ports'][:5]))}" + (f" (and {len(ip_info['open_ports']) - 5} more)" if len(ip_info['open_ports']) > 5 else ""), ]) report.append("") # === RELATIONSHIP ANALYSIS === report.extend([ "ENTITY RELATIONSHIP ANALYSIS", "-" * 40, "" ]) # Network topology insights topology = self._analyze_network_topology(nodes, edges) report.extend([ f"Network Topology Assessment:", f"• Central Hubs: {len(topology['hubs'])} entities serve as primary connection points", f"• Isolated Clusters: {len(topology['clusters'])} distinct groupings identified", f"• Relationship Density: {topology['density']:.3f} (0=sparse, 1=fully connected)", f"• Average Path Length: {topology['avg_path_length']:.2f} degrees of separation", "" ]) # Key relationships key_relationships = self._identify_key_relationships(edges) if key_relationships: report.extend([ "Critical Infrastructure Relationships:", "" ]) for rel in key_relationships[:8]: # Top 8 relationships confidence_desc = self._describe_confidence(rel['confidence']) report.extend([ f"• {rel['source']} → {rel['target']}", f" - Relationship: {self._humanize_relationship_type(rel['type'])}", f" - Evidence Strength: {confidence_desc} ({rel['confidence']:.2f})", f" - Discovery Method: {rel['provider']}", "" ]) # === CERTIFICATE ANALYSIS === cert_analysis = self._analyze_certificate_infrastructure(nodes) if cert_analysis['total_certs'] > 0: report.extend([ "CERTIFICATE INFRASTRUCTURE ANALYSIS", "-" * 40, "", f"Certificate Status Overview:", f"• Total Certificates Analyzed: {cert_analysis['total_certs']}", f"• Valid Certificates: {cert_analysis['valid']}", f"• Expired/Invalid: {cert_analysis['expired']}", f"• Certificate Authorities: {len(cert_analysis['cas'])}", "" ]) if cert_analysis['cas']: report.extend([ "Certificate Authority Distribution:", "" ]) for ca, count in cert_analysis['cas'].most_common(5): report.extend([ f"• {ca}: {count} certificate(s)", ]) report.append("") # === TECHNICAL APPENDIX === report.extend([ "TECHNICAL APPENDIX", "-" * 40, "", "Data Quality Assessment:", f"• Total API Requests: {audit_trail.get('session_metadata', {}).get('total_requests', 0)}", f"• Data Providers Used: {len(audit_trail.get('session_metadata', {}).get('providers_used', []))}", f"• Relationship Confidence Distribution:", ]) # Confidence distribution confidence_dist = self._calculate_confidence_distribution(edges) for level, count in confidence_dist.items(): percentage = (count / len(edges) * 100) if edges else 0 report.extend([ f" - {level.title()} Confidence (≥{self._get_confidence_threshold(level)}): {count} ({percentage:.1f}%)", ]) report.extend([ "", "Correlation Analysis:", f"• Entity Correlations Identified: {len(scanner.graph.correlation_index)}", f"• Cross-Reference Validation: {self._count_cross_validated_relationships(edges)} relationships verified by multiple sources", "" ]) # === CONCLUSION === report.extend([ "CONCLUSION", "-" * 40, "", self._generate_conclusion(scanner.current_target, infrastructure_analysis, len(edges)), "", "This analysis was conducted using passive reconnaissance techniques and represents " "the digital infrastructure observable through public data sources at the time of investigation. " "All findings are supported by verifiable technical evidence and documented through " "a complete audit trail maintained for forensic integrity.", "", f"Investigation completed: {now}", f"Report authenticated by: DNScope v{self._get_version()}", "", "=" * 80, "END OF REPORT", "=" * 80 ]) return "\n".join(report) def _analyze_infrastructure_patterns(self, nodes: List[Dict], edges: List[Dict]) -> Dict[str, Any]: """Analyze infrastructure patterns and classify entities.""" analysis = { 'domains': len([n for n in nodes if n['type'] == 'domain']), 'ips': len([n for n in nodes if n['type'] == 'ip']), 'isps': len([n for n in nodes if n['type'] == 'isp']), 'cas': len([n for n in nodes if n['type'] == 'ca']), 'correlations': len([n for n in nodes if n['type'] == 'correlation_object']) } return analysis def _get_detailed_domain_analysis(self, nodes: List[Dict], edges: List[Dict]) -> List[Dict[str, Any]]: """Generate detailed analysis for each domain.""" domain_nodes = [n for n in nodes if n['type'] == 'domain'] domain_analysis = [] for domain in domain_nodes: # Find connected IPs connected_ips = [e['to'] for e in edges if e['from'] == domain['id'] and _is_valid_ip(e['to'])] # Determine classification classification = "Primary Domain" if domain['id'].startswith('www.'): classification = "Web Interface" elif any(subdomain in domain['id'] for subdomain in ['api.', 'mail.', 'smtp.']): classification = "Service Endpoint" elif domain['id'].count('.') > 1: classification = "Subdomain" # Certificate status cert_status = self._determine_certificate_status(domain) # Security notes security_notes = [] if cert_status == "Expired/Invalid": security_notes.append("Certificate validation issues") if len(connected_ips) == 0: security_notes.append("No IP resolution found") if len(connected_ips) > 5: security_notes.append("Multiple IP endpoints") # Average confidence domain_edges = [e for e in edges if e['from'] == domain['id']] avg_confidence = sum(e['confidence_score'] for e in domain_edges) / len(domain_edges) if domain_edges else 0 domain_analysis.append({ 'domain': domain['id'], 'classification': classification, 'ips': connected_ips, 'cert_status': cert_status, 'security_notes': security_notes, 'avg_confidence': avg_confidence }) # Sort by number of connections (most connected first) return sorted(domain_analysis, key=lambda x: len(x['ips']), reverse=True) def _get_detailed_ip_analysis(self, nodes: List[Dict], edges: List[Dict]) -> List[Dict[str, Any]]: """Generate detailed analysis for each IP address.""" ip_nodes = [n for n in nodes if n['type'] == 'ip'] ip_analysis = [] for ip in ip_nodes: # Find connected domains connected_domains = [e['from'] for e in edges if e['to'] == ip['id'] and _is_valid_domain(e['from'])] # Extract metadata from attributes ip_version = "IPv4" location = None isp = None open_ports = [] for attr in ip.get('attributes', []): if attr.get('name') == 'country': location = attr.get('value') elif attr.get('name') == 'org': isp = attr.get('value') elif attr.get('name') == 'shodan_open_port': open_ports.append(attr.get('value')) elif 'ipv6' in str(attr.get('metadata', {})).lower(): ip_version = "IPv6" # Find ISP from relationships if not isp: isp_edges = [e for e in edges if e['from'] == ip['id'] and e['label'].endswith('_isp')] isp = isp_edges[0]['to'] if isp_edges else None ip_analysis.append({ 'ip': ip['id'], 'version': ip_version, 'domains': connected_domains, 'isp': isp, 'location': location, 'open_ports': open_ports }) # Sort by number of connected domains return sorted(ip_analysis, key=lambda x: len(x['domains']), reverse=True) def _analyze_network_topology(self, nodes: List[Dict], edges: List[Dict]) -> Dict[str, Any]: """Analyze network topology and identify key structural patterns.""" if not nodes or not edges: return {'hubs': [], 'clusters': [], 'density': 0, 'avg_path_length': 0} # Create NetworkX graph G = nx.DiGraph() for node in nodes: G.add_node(node['id']) for edge in edges: G.add_edge(edge['from'], edge['to']) # Convert to undirected for certain analyses G_undirected = G.to_undirected() # Identify hubs (nodes with high degree centrality) centrality = nx.degree_centrality(G_undirected) hub_threshold = max(centrality.values()) * 0.7 if centrality else 0 hubs = [node for node, cent in centrality.items() if cent >= hub_threshold] # Find connected components (clusters) clusters = list(nx.connected_components(G_undirected)) # Calculate density density = nx.density(G_undirected) # Calculate average path length (for largest component) if G_undirected.number_of_nodes() > 1: largest_cc = max(nx.connected_components(G_undirected), key=len) subgraph = G_undirected.subgraph(largest_cc) try: avg_path_length = nx.average_shortest_path_length(subgraph) except: avg_path_length = 0 else: avg_path_length = 0 return { 'hubs': hubs, 'clusters': clusters, 'density': density, 'avg_path_length': avg_path_length } def _identify_key_relationships(self, edges: List[Dict]) -> List[Dict[str, Any]]: """Identify the most significant relationships in the infrastructure.""" # Score relationships by confidence and type importance relationship_importance = { 'dns_a_record': 0.9, 'dns_aaaa_record': 0.9, 'crtsh_cert_issuer': 0.8, 'shodan_isp': 0.8, 'crtsh_san_certificate': 0.7, 'dns_mx_record': 0.7, 'dns_ns_record': 0.7 } scored_edges = [] for edge in edges: base_confidence = edge.get('confidence_score', 0) type_weight = relationship_importance.get(edge.get('label', ''), 0.5) combined_score = (base_confidence * 0.7) + (type_weight * 0.3) scored_edges.append({ 'source': edge['from'], 'target': edge['to'], 'type': edge.get('label', ''), 'confidence': base_confidence, 'provider': edge.get('source_provider', ''), 'score': combined_score }) # Return top relationships by score return sorted(scored_edges, key=lambda x: x['score'], reverse=True) def _analyze_certificate_infrastructure(self, nodes: List[Dict]) -> Dict[str, Any]: """Analyze certificate infrastructure across all domains.""" domain_nodes = [n for n in nodes if n['type'] == 'domain'] ca_nodes = [n for n in nodes if n['type'] == 'ca'] valid_certs = 0 expired_certs = 0 total_certs = 0 cas = Counter() for domain in domain_nodes: for attr in domain.get('attributes', []): if attr.get('name') == 'cert_is_currently_valid': total_certs += 1 if attr.get('value') is True: valid_certs += 1 else: expired_certs += 1 elif attr.get('name') == 'cert_issuer_name': issuer = attr.get('value') if issuer: cas[issuer] += 1 return { 'total_certs': total_certs, 'valid': valid_certs, 'expired': expired_certs, 'cas': cas } def _has_expired_certificates(self, domain_node: Dict) -> bool: """Check if domain has expired certificates.""" for attr in domain_node.get('attributes', []): if (attr.get('name') == 'cert_is_currently_valid' and attr.get('value') is False): return True return False def _determine_certificate_status(self, domain_node: Dict) -> str: """Determine the certificate status for a domain.""" has_valid = False has_expired = False has_any = False for attr in domain_node.get('attributes', []): if attr.get('name') == 'cert_is_currently_valid': has_any = True if attr.get('value') is True: has_valid = True else: has_expired = True if not has_any: return "No Certificate Data" elif has_valid and not has_expired: return "Valid" elif has_expired and not has_valid: return "Expired/Invalid" else: return "Mixed Status" def _describe_confidence(self, confidence: float) -> str: """Convert confidence score to descriptive text.""" if confidence >= 0.9: return "Very High" elif confidence >= 0.8: return "High" elif confidence >= 0.6: return "Medium" elif confidence >= 0.4: return "Low" else: return "Very Low" def _humanize_relationship_type(self, rel_type: str) -> str: """Convert technical relationship types to human-readable descriptions.""" type_map = { 'dns_a_record': 'DNS A Record Resolution', 'dns_aaaa_record': 'DNS AAAA Record (IPv6) Resolution', 'dns_mx_record': 'Email Server (MX) Configuration', 'dns_ns_record': 'Name Server Delegation', 'dns_cname_record': 'DNS Alias (CNAME) Resolution', 'crtsh_cert_issuer': 'SSL Certificate Issuer Relationship', 'crtsh_san_certificate': 'Shared SSL Certificate', 'shodan_isp': 'Internet Service Provider Assignment', 'shodan_a_record': 'IP-to-Domain Resolution (Shodan)', 'dns_ptr_record': 'Reverse DNS Resolution' } return type_map.get(rel_type, rel_type.replace('_', ' ').title()) def _calculate_confidence_distribution(self, edges: List[Dict]) -> Dict[str, int]: """Calculate confidence score distribution.""" distribution = {'high': 0, 'medium': 0, 'low': 0} for edge in edges: confidence = edge.get('confidence_score', 0) if confidence >= 0.8: distribution['high'] += 1 elif confidence >= 0.6: distribution['medium'] += 1 else: distribution['low'] += 1 return distribution def _get_confidence_threshold(self, level: str) -> str: """Get confidence threshold for a level.""" thresholds = {'high': '0.80', 'medium': '0.60', 'low': '0.00'} return thresholds.get(level, '0.00') def _count_cross_validated_relationships(self, edges: List[Dict]) -> int: """Count relationships verified by multiple providers.""" # Group edges by source-target pair edge_pairs = defaultdict(list) for edge in edges: pair_key = f"{edge['from']}->{edge['to']}" edge_pairs[pair_key].append(edge.get('source_provider', '')) # Count pairs with multiple providers cross_validated = 0 for pair, providers in edge_pairs.items(): if len(set(providers)) > 1: # Multiple unique providers cross_validated += 1 return cross_validated def _generate_security_recommendations(self, infrastructure_analysis: Dict) -> List[str]: """Generate actionable security recommendations.""" recommendations = [] # Check for complex infrastructure if infrastructure_analysis['ips'] > 10: recommendations.append( "Document and validate the necessity of extensive IP address infrastructure" ) if infrastructure_analysis['correlations'] > 5: recommendations.append( "Investigate shared infrastructure components for operational security implications" ) if not recommendations: recommendations.append( "Continue monitoring for changes in the identified digital infrastructure" ) return recommendations def _generate_conclusion(self, target: str, infrastructure_analysis: Dict, total_relationships: int) -> str: """Generate a professional conclusion for the report.""" conclusion_parts = [ f"The passive reconnaissance analysis of '{target}' has successfully mapped " f"a digital infrastructure ecosystem consisting of {infrastructure_analysis['domains']} " f"domain names, {infrastructure_analysis['ips']} IP addresses, and " f"{total_relationships} verified inter-entity relationships." ] conclusion_parts.append( "All findings in this report are based on publicly available information and " "passive reconnaissance techniques. The analysis maintains full forensic integrity " "with complete audit trails for all data collection activities." ) return " ".join(conclusion_parts) def _count_bidirectional_relationships(self, graph) -> int: """Count bidirectional relationships in the graph.""" count = 0 for u, v in graph.edges(): if graph.has_edge(v, u): count += 1 return count // 2 # Each pair counted twice def _identify_hub_nodes(self, graph, nodes: List[Dict]) -> List[str]: """Identify nodes that serve as major hubs in the network.""" if not graph.nodes(): return [] degree_centrality = nx.degree_centrality(graph.to_undirected()) threshold = max(degree_centrality.values()) * 0.8 if degree_centrality else 0 return [node for node, centrality in degree_centrality.items() if centrality >= threshold] def _get_version(self) -> str: """Get DNScope version for report authentication.""" return "1.0.0-forensic" def export_graph_json(self, graph_manager) -> Dict[str, Any]: """ Export complete graph data as a JSON-serializable dictionary. Moved from GraphManager to centralize export functionality. Args: graph_manager: GraphManager instance with graph data Returns: Complete graph data with export metadata """ graph_data = nx.node_link_data(graph_manager.graph, edges="edges") return { 'export_metadata': { 'export_timestamp': datetime.now(timezone.utc).isoformat(), 'graph_creation_time': graph_manager.creation_time, 'last_modified': graph_manager.last_modified, 'total_nodes': graph_manager.get_node_count(), 'total_edges': graph_manager.get_edge_count(), 'graph_format': 'DNScope_v1_unified_model' }, 'graph': graph_data, 'statistics': graph_manager.get_statistics() } def serialize_to_json(self, data: Dict[str, Any], indent: int = 2) -> str: """ Serialize data to JSON with custom handling for non-serializable objects. Args: data: Data to serialize indent: JSON indentation level Returns: JSON string representation """ try: return json.dumps(data, indent=indent, cls=CustomJSONEncoder, ensure_ascii=False) except Exception: # Fallback to aggressive cleaning cleaned_data = self._clean_for_json(data) return json.dumps(cleaned_data, indent=indent, ensure_ascii=False) def _clean_for_json(self, obj, max_depth: int = 10, current_depth: int = 0) -> Any: """ Recursively clean an object to make it JSON serializable. Handles circular references and problematic object types. Args: obj: Object to clean max_depth: Maximum recursion depth current_depth: Current recursion depth Returns: JSON-serializable object """ if current_depth > max_depth: return f"" if obj is None or isinstance(obj, (bool, int, float, str)): return obj elif isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, (set, frozenset)): return list(obj) elif isinstance(obj, dict): cleaned = {} for key, value in obj.items(): try: # Ensure key is string clean_key = str(key) if not isinstance(key, str) else key cleaned[clean_key] = self._clean_for_json(value, max_depth, current_depth + 1) except Exception: cleaned[str(key)] = f"" return cleaned elif isinstance(obj, (list, tuple)): cleaned = [] for item in obj: try: cleaned.append(self._clean_for_json(item, max_depth, current_depth + 1)) except Exception: cleaned.append(f"") return cleaned elif hasattr(obj, '__dict__'): try: return self._clean_for_json(obj.__dict__, max_depth, current_depth + 1) except Exception: return str(obj) elif hasattr(obj, 'value'): # For enum-like objects return obj.value else: return str(obj) def generate_filename(self, target: str, export_type: str, timestamp: Optional[datetime] = None) -> str: """ Generate standardized filename for exports. Args: target: Target domain/IP being scanned export_type: Type of export (json, txt, summary) timestamp: Optional timestamp (defaults to now) Returns: Formatted filename with forensic naming convention """ if timestamp is None: timestamp = datetime.now(timezone.utc) timestamp_str = timestamp.strftime('%Y%m%d_%H%M%S') safe_target = "".join(c for c in target if c.isalnum() or c in ('-', '_', '.')).rstrip() extension_map = { 'json': 'json', 'txt': 'txt', 'summary': 'txt', 'targets': 'txt' } extension = extension_map.get(export_type, 'txt') return f"DNScope_{export_type}_{safe_target}_{timestamp_str}.{extension}" class CustomJSONEncoder(json.JSONEncoder): """Custom JSON encoder to handle non-serializable objects.""" def default(self, obj): if isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, set): return list(obj) elif isinstance(obj, Decimal): return float(obj) elif hasattr(obj, '__dict__'): # For custom objects, try to serialize their dict representation try: return obj.__dict__ except: return str(obj) elif hasattr(obj, 'value') and hasattr(obj, 'name'): # For enum objects return obj.value else: # For any other non-serializable object, convert to string return str(obj) # Global export manager instance export_manager = ExportManager()