diff --git a/app.py b/app.py index 101e720..1dd1693 100644 --- a/app.py +++ b/app.py @@ -16,6 +16,7 @@ from core.session_manager import session_manager from config import config from core.graph_manager import NodeType from utils.helpers import is_valid_target +from utils.export_manager import export_manager from decimal import Decimal @@ -45,28 +46,7 @@ def get_user_scanner(): return new_session_id, new_scanner -class CustomJSONEncoder(json.JSONEncoder): - """Custom JSON encoder to handle non-serializable objects.""" - - def default(self, obj): - if isinstance(obj, datetime): - return obj.isoformat() - elif isinstance(obj, set): - return list(obj) - elif isinstance(obj, Decimal): - return float(obj) - elif hasattr(obj, '__dict__'): - # For custom objects, try to serialize their dict representation - try: - return obj.__dict__ - except: - return str(obj) - elif hasattr(obj, 'value') and hasattr(obj, 'name'): - # For enum objects - return obj.value - else: - # For any other non-serializable object, convert to string - return str(obj) + @app.route('/') def index(): """Serve the main web interface.""" @@ -105,7 +85,7 @@ def start_scan(): if success: return jsonify({ 'success': True, - 'message': 'Scan started successfully', + 'message': 'Reconnaissance scan started successfully', 'scan_id': scanner.logger.session_id, 'user_session_id': user_session_id }) @@ -309,41 +289,30 @@ def export_results(): if not scanner: return jsonify({'success': False, 'error': 'No active scanner session found'}), 404 - # Get export data with error handling + # Get export data using the new export manager try: - results = scanner.export_results() + results = export_manager.export_scan_results(scanner) except Exception as e: return jsonify({'success': False, 'error': f'Failed to gather export data: {str(e)}'}), 500 - # Add export metadata - results['export_metadata'] = { - 'user_session_id': user_session_id, - 'export_timestamp': datetime.now(timezone.utc).isoformat(), - 'export_version': '1.0.0', - 'forensic_integrity': 'maintained' - } + # Add user session metadata + results['export_metadata']['user_session_id'] = user_session_id + results['export_metadata']['forensic_integrity'] = 'maintained' - # Generate filename with forensic naming convention - timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') - target = scanner.current_target or 'unknown' - # Sanitize target for filename - safe_target = "".join(c for c in target if c.isalnum() or c in ('-', '_', '.')).rstrip() - filename = f"dnsrecon_{safe_target}_{timestamp}.json" + # Generate filename + filename = export_manager.generate_filename( + target=scanner.current_target or 'unknown', + export_type='json' + ) - # Serialize with custom encoder and error handling + # Serialize with export manager try: - json_data = json.dumps(results, indent=2, cls=CustomJSONEncoder, ensure_ascii=False) + json_data = export_manager.serialize_to_json(results) except Exception as e: - # If custom encoder fails, try a more aggressive approach - try: - # Convert problematic objects to strings recursively - cleaned_results = _clean_for_json(results) - json_data = json.dumps(cleaned_results, indent=2, ensure_ascii=False) - except Exception as e2: - return jsonify({ - 'success': False, - 'error': f'JSON serialization failed: {str(e2)}' - }), 500 + return jsonify({ + 'success': False, + 'error': f'JSON serialization failed: {str(e)}' + }), 500 # Create file object file_obj = io.BytesIO(json_data.encode('utf-8')) @@ -371,11 +340,14 @@ def export_targets(): if not scanner: return jsonify({'success': False, 'error': 'No active scanner session found'}), 404 - targets_txt = scanner.export_targets_txt() + # Use export manager for targets export + targets_txt = export_manager.export_targets_list(scanner) - timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') - safe_target = "".join(c for c in (scanner.current_target or 'unknown') if c.isalnum() or c in ('-', '_', '.')).rstrip() - filename = f"dnsrecon_targets_{safe_target}_{timestamp}.txt" + # Generate filename using export manager + filename = export_manager.generate_filename( + target=scanner.current_target or 'unknown', + export_type='targets' + ) file_obj = io.BytesIO(targets_txt.encode('utf-8')) @@ -398,11 +370,14 @@ def export_summary(): if not scanner: return jsonify({'success': False, 'error': 'No active scanner session found'}), 404 - summary_txt = scanner.generate_executive_summary() + # Use export manager for summary generation + summary_txt = export_manager.generate_executive_summary(scanner) - timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') - safe_target = "".join(c for c in (scanner.current_target or 'unknown') if c.isalnum() or c in ('-', '_', '.')).rstrip() - filename = f"dnsrecon_summary_{safe_target}_{timestamp}.txt" + # Generate filename using export manager + filename = export_manager.generate_filename( + target=scanner.current_target or 'unknown', + export_type='summary' + ) file_obj = io.BytesIO(summary_txt.encode('utf-8')) @@ -416,49 +391,6 @@ def export_summary(): traceback.print_exc() return jsonify({'success': False, 'error': f'Export failed: {str(e)}'}), 500 -def _clean_for_json(obj, max_depth=10, current_depth=0): - """ - Recursively clean an object to make it JSON serializable. - Handles circular references and problematic object types. - """ - if current_depth > max_depth: - return f"" - - if obj is None or isinstance(obj, (bool, int, float, str)): - return obj - elif isinstance(obj, datetime): - return obj.isoformat() - elif isinstance(obj, (set, frozenset)): - return list(obj) - elif isinstance(obj, dict): - cleaned = {} - for key, value in obj.items(): - try: - # Ensure key is string - clean_key = str(key) if not isinstance(key, str) else key - cleaned[clean_key] = _clean_for_json(value, max_depth, current_depth + 1) - except Exception: - cleaned[str(key)] = f"" - return cleaned - elif isinstance(obj, (list, tuple)): - cleaned = [] - for item in obj: - try: - cleaned.append(_clean_for_json(item, max_depth, current_depth + 1)) - except Exception: - cleaned.append(f"") - return cleaned - elif hasattr(obj, '__dict__'): - try: - return _clean_for_json(obj.__dict__, max_depth, current_depth + 1) - except Exception: - return str(obj) - elif hasattr(obj, 'value'): - # For enum-like objects - return obj.value - else: - return str(obj) - @app.route('/api/config/api-keys', methods=['POST']) def set_api_keys(): """Set API keys for the current session.""" diff --git a/core/graph_manager.py b/core/graph_manager.py index e5dc050..3d0c894 100644 --- a/core/graph_manager.py +++ b/core/graph_manager.py @@ -5,6 +5,7 @@ Graph data model for DNSRecon using NetworkX. Manages in-memory graph storage with confidence scoring and forensic metadata. Now fully compatible with the unified ProviderResult data model. UPDATED: Fixed correlation exclusion keys to match actual attribute names. +UPDATED: Removed export_json() method - now handled by ExportManager. """ import re from datetime import datetime, timezone @@ -212,7 +213,7 @@ class GraphManager: def _has_direct_edge_bidirectional(self, node_a: str, node_b: str) -> bool: """ Check if there's a direct edge between two nodes in either direction. - Returns True if node_aâ†'node_b OR node_bâ†'node_a exists. + Returns True if node_aâ†'node_b OR node_bâ†'node_a exists. """ return (self.graph.has_edge(node_a, node_b) or self.graph.has_edge(node_b, node_a)) @@ -503,22 +504,6 @@ class GraphManager: 'statistics': self.get_statistics()['basic_metrics'] } - def export_json(self) -> Dict[str, Any]: - """Export complete graph data as a JSON-serializable dictionary.""" - graph_data = nx.node_link_data(self.graph, edges="edges") - return { - 'export_metadata': { - 'export_timestamp': datetime.now(timezone.utc).isoformat(), - 'graph_creation_time': self.creation_time, - 'last_modified': self.last_modified, - 'total_nodes': self.get_node_count(), - 'total_edges': self.get_edge_count(), - 'graph_format': 'dnsrecon_v1_unified_model' - }, - 'graph': graph_data, - 'statistics': self.get_statistics() - } - def _get_confidence_distribution(self) -> Dict[str, int]: """Get distribution of edge confidence scores with empty graph handling.""" distribution = {'high': 0, 'medium': 0, 'low': 0} diff --git a/core/scanner.py b/core/scanner.py index f3768a5..062580a 100644 --- a/core/scanner.py +++ b/core/scanner.py @@ -17,6 +17,7 @@ from core.graph_manager import GraphManager, NodeType from core.logger import get_forensic_logger, new_session from core.provider_result import ProviderResult from utils.helpers import _is_valid_ip, _is_valid_domain +from utils.export_manager import export_manager from providers.base_provider import BaseProvider from core.rate_limiter import GlobalRateLimiter @@ -868,114 +869,6 @@ class Scanner: graph_data['initial_targets'] = list(self.initial_targets) return graph_data - def export_results(self) -> Dict[str, Any]: - graph_data = self.graph.export_json() - audit_trail = self.logger.export_audit_trail() - provider_stats = {} - for provider in self.providers: - provider_stats[provider.get_name()] = provider.get_statistics() - - return { - 'scan_metadata': { - 'target_domain': self.current_target, 'max_depth': self.max_depth, - 'final_status': self.status, 'total_indicators_processed': self.indicators_processed, - 'enabled_providers': list(provider_stats.keys()), 'session_id': self.session_id - }, - 'graph_data': graph_data, - 'forensic_audit': audit_trail, - 'provider_statistics': provider_stats, - 'scan_summary': self.logger.get_forensic_summary() - } - - def export_targets_txt(self) -> str: - """Export all discovered domains and IPs as a text file.""" - nodes = self.graph.get_graph_data().get('nodes', []) - targets = {node['id'] for node in nodes if _is_valid_domain(node['id']) or _is_valid_ip(node['id'])} - return "\n".join(sorted(list(targets))) - - def generate_executive_summary(self) -> str: - """Generate a natural-language executive summary of the scan results.""" - summary = [] - now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z') - scan_metadata = self.get_scan_status() - graph_data = self.graph.get_graph_data() - nodes = graph_data.get('nodes', []) - edges = graph_data.get('edges', []) - - summary.append(f"DNSRecon Executive Summary") - summary.append(f"Report Generated: {now}") - summary.append("="*40) - - # Scan Overview - summary.append("\n## Scan Overview") - summary.append(f"- Initial Target: {self.current_target}") - summary.append(f"- Scan Status: {self.status.capitalize()}") - summary.append(f"- Analysis Depth: {self.max_depth}") - summary.append(f"- Total Indicators Found: {len(nodes)}") - summary.append(f"- Total Relationships Discovered: {len(edges)}") - - # Key Findings - summary.append("\n## Key Findings") - domains = [n for n in nodes if n['type'] == 'domain'] - ips = [n for n in nodes if n['type'] == 'ip'] - isps = [n for n in nodes if n['type'] == 'isp'] - cas = [n for n in nodes if n['type'] == 'ca'] - - summary.append(f"- Discovered {len(domains)} unique domain(s).") - summary.append(f"- Identified {len(ips)} unique IP address(es).") - if isps: - summary.append(f"- Infrastructure is hosted across {len(isps)} unique ISP(s).") - if cas: - summary.append(f"- Found certificates issued by {len(cas)} unique Certificate Authorit(y/ies).") - - # Detailed Findings - summary.append("\n## Detailed Findings") - - # Domain Analysis - if domains: - summary.append("\n### Domain Analysis") - for domain in domains[:5]: # report on first 5 - summary.append(f"\n- Domain: {domain['id']}") - - # Associated IPs - associated_ips = [edge['to'] for edge in edges if edge['from'] == domain['id'] and _is_valid_ip(edge['to'])] - if associated_ips: - summary.append(f" - Associated IPs: {', '.join(associated_ips)}") - - # Certificate info - cert_attributes = [attr for attr in domain.get('attributes', []) if attr.get('name', '').startswith('cert_')] - if cert_attributes: - issuer = next((attr['value'] for attr in cert_attributes if attr['name'] == 'cert_issuer_name'), 'N/A') - valid_until = next((attr['value'] for attr in cert_attributes if attr['name'] == 'cert_not_after'), 'N/A') - summary.append(f" - Certificate Issuer: {issuer}") - summary.append(f" - Certificate Valid Until: {valid_until}") - - # IP Address Analysis - if ips: - summary.append("\n### IP Address Analysis") - for ip in ips[:5]: # report on first 5 - summary.append(f"\n- IP Address: {ip['id']}") - - # Hostnames - hostnames = [edge['to'] for edge in edges if edge['from'] == ip['id'] and _is_valid_domain(edge['to'])] - if hostnames: - summary.append(f" - Associated Hostnames: {', '.join(hostnames)}") - - # ISP - isp_edge = next((edge for edge in edges if edge['from'] == ip['id'] and self.graph.graph.nodes[edge['to']]['type'] == 'isp'), None) - if isp_edge: - summary.append(f" - ISP: {isp_edge['to']}") - - # Data Sources - summary.append("\n## Data Sources") - provider_stats = self.logger.get_forensic_summary().get('provider_statistics', {}) - for provider, stats in provider_stats.items(): - summary.append(f"- {provider.capitalize()}: {stats.get('relationships_discovered', 0)} relationships from {stats.get('successful_requests', 0)} requests.") - - summary.append("\n" + "="*40) - summary.append("End of Report") - - return "\n".join(summary) def get_provider_info(self) -> Dict[str, Dict[str, Any]]: info = {} diff --git a/providers/shodan_provider.py b/providers/shodan_provider.py index f21c2dc..31e3ee2 100644 --- a/providers/shodan_provider.py +++ b/providers/shodan_provider.py @@ -146,35 +146,30 @@ class ShodanProvider(BaseProvider): result = self._process_shodan_data(normalized_ip, data) self._save_to_cache(cache_file, result, data) # Save both result and raw data elif response and response.status_code == 404: - # Handle 404 "No information available" as successful empty result - try: - error_data = response.json() - if "No information available" in error_data.get('error', ''): - # This is a successful query - Shodan just has no data - self.logger.logger.debug(f"Shodan has no information for {normalized_ip}") - result = ProviderResult() # Empty but successful result - # Cache the empty result to avoid repeated queries - self._save_to_cache(cache_file, result, {'error': 'No information available'}) - else: - # Some other 404 error - treat as failure - raise requests.exceptions.RequestException(f"Shodan API returned 404: {error_data}") - except (ValueError, KeyError): - # Could not parse JSON response - treat as failure - raise requests.exceptions.RequestException(f"Shodan API returned 404 with unparseable response") + # Handle all 404s as successful "no information available" responses + # Shodan returns 404 when no information is available for an IP + self.logger.logger.debug(f"Shodan has no information for {normalized_ip}") + result = ProviderResult() # Empty but successful result + # Cache the empty result to avoid repeated queries + self._save_to_cache(cache_file, result, {'error': 'No information available'}) elif cache_status == "stale": # If API fails on a stale cache, use the old data result = self._load_from_cache(cache_file) + self.logger.logger.info(f"Using stale cache for {normalized_ip} due to API failure") else: # Other HTTP error codes should be treated as failures status_code = response.status_code if response else "No response" raise requests.exceptions.RequestException(f"Shodan API returned HTTP {status_code}") except requests.exceptions.RequestException as e: - self.logger.logger.info(f"Shodan API query returned no info for {normalized_ip}: {e}") + self.logger.logger.debug(f"Shodan API error for {normalized_ip}: {e}") if cache_status == "stale": + # Use stale cache if available result = self._load_from_cache(cache_file) + self.logger.logger.info(f"Using stale cache for {normalized_ip} due to API error") else: - # Re-raise for retry scheduling - but only for actual failures + # FIXED: Only re-raise for actual network/timeout errors, not 404s + # 404s are already handled above as successful empty results raise e return result diff --git a/utils/__init__.py b/utils/__init__.py index e69de29..ae1ff67 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -0,0 +1,22 @@ +# dnsrecon-reduced/utils/__init__.py + +""" +Utility modules for DNSRecon. +Contains helper functions, export management, and supporting utilities. +""" + +from .helpers import is_valid_target, _is_valid_domain, _is_valid_ip, get_ip_version, normalize_ip +from .export_manager import export_manager, ExportManager, CustomJSONEncoder + +__all__ = [ + 'is_valid_target', + '_is_valid_domain', + '_is_valid_ip', + 'get_ip_version', + 'normalize_ip', + 'export_manager', + 'ExportManager', + 'CustomJSONEncoder' +] + +__version__ = "1.0.0" \ No newline at end of file diff --git a/utils/export_manager.py b/utils/export_manager.py new file mode 100644 index 0000000..5bd6eb9 --- /dev/null +++ b/utils/export_manager.py @@ -0,0 +1,336 @@ +# dnsrecon-reduced/utils/export_manager.py + +""" +Centralized export functionality for DNSRecon. +Handles all data export operations with forensic integrity and proper formatting. +""" + +import json +from datetime import datetime, timezone +from typing import Dict, Any, List, Optional +from decimal import Decimal + +from utils.helpers import _is_valid_domain, _is_valid_ip +import networkx as nx + + +class ExportManager: + """ + Centralized manager for all DNSRecon export operations. + Maintains forensic integrity and provides consistent export formats. + """ + + def __init__(self): + """Initialize export manager.""" + pass + + def export_scan_results(self, scanner) -> Dict[str, Any]: + """ + Export complete scan results with forensic metadata. + + Args: + scanner: Scanner instance with completed scan data + + Returns: + Complete scan results dictionary + """ + graph_data = self.export_graph_json(scanner.graph) + audit_trail = scanner.logger.export_audit_trail() + provider_stats = {} + + for provider in scanner.providers: + provider_stats[provider.get_name()] = provider.get_statistics() + + results = { + 'scan_metadata': { + 'target_domain': scanner.current_target, + 'max_depth': scanner.max_depth, + 'final_status': scanner.status, + 'total_indicators_processed': scanner.indicators_processed, + 'enabled_providers': list(provider_stats.keys()), + 'session_id': scanner.session_id + }, + 'graph_data': graph_data, + 'forensic_audit': audit_trail, + 'provider_statistics': provider_stats, + 'scan_summary': scanner.logger.get_forensic_summary() + } + + # Add export metadata + results['export_metadata'] = { + 'export_timestamp': datetime.now(timezone.utc).isoformat(), + 'export_version': '1.0.0', + 'forensic_integrity': 'maintained' + } + + return results + + def export_targets_list(self, scanner) -> str: + """ + Export all discovered domains and IPs as a text file. + + Args: + scanner: Scanner instance with graph data + + Returns: + Newline-separated list of targets + """ + nodes = scanner.graph.get_graph_data().get('nodes', []) + targets = { + node['id'] for node in nodes + if _is_valid_domain(node['id']) or _is_valid_ip(node['id']) + } + return "\n".join(sorted(list(targets))) + + def generate_executive_summary(self, scanner) -> str: + """ + Generate a natural-language executive summary of scan results. + + Args: + scanner: Scanner instance with completed scan data + + Returns: + Formatted executive summary text + """ + summary = [] + now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z') + scan_metadata = scanner.get_scan_status() + graph_data = scanner.graph.get_graph_data() + nodes = graph_data.get('nodes', []) + edges = graph_data.get('edges', []) + + summary.append(f"DNSRecon Executive Summary") + summary.append(f"Report Generated: {now}") + summary.append("="*40) + + # Scan Overview + summary.append("\n## Scan Overview") + summary.append(f"- Initial Target: {scanner.current_target}") + summary.append(f"- Scan Status: {scanner.status.capitalize()}") + summary.append(f"- Analysis Depth: {scanner.max_depth}") + summary.append(f"- Total Indicators Found: {len(nodes)}") + summary.append(f"- Total Relationships Discovered: {len(edges)}") + + # Key Findings + summary.append("\n## Key Findings") + domains = [n for n in nodes if n['type'] == 'domain'] + ips = [n for n in nodes if n['type'] == 'ip'] + isps = [n for n in nodes if n['type'] == 'isp'] + cas = [n for n in nodes if n['type'] == 'ca'] + + summary.append(f"- Discovered {len(domains)} unique domain(s).") + summary.append(f"- Identified {len(ips)} unique IP address(es).") + if isps: + summary.append(f"- Infrastructure is hosted across {len(isps)} unique ISP(s).") + if cas: + summary.append(f"- Found certificates issued by {len(cas)} unique Certificate Authorit(y/ies).") + + # Detailed Findings + summary.append("\n## Detailed Findings") + + # Domain Analysis + if domains: + summary.append("\n### Domain Analysis") + for domain in domains[:5]: # Report on first 5 + summary.append(f"\n- Domain: {domain['id']}") + + # Associated IPs + associated_ips = [edge['to'] for edge in edges + if edge['from'] == domain['id'] and _is_valid_ip(edge['to'])] + if associated_ips: + summary.append(f" - Associated IPs: {', '.join(associated_ips)}") + + # Certificate info + cert_attributes = [attr for attr in domain.get('attributes', []) + if attr.get('name', '').startswith('cert_')] + if cert_attributes: + issuer = next((attr['value'] for attr in cert_attributes + if attr['name'] == 'cert_issuer_name'), 'N/A') + valid_until = next((attr['value'] for attr in cert_attributes + if attr['name'] == 'cert_not_after'), 'N/A') + summary.append(f" - Certificate Issuer: {issuer}") + summary.append(f" - Certificate Valid Until: {valid_until}") + + # IP Address Analysis + if ips: + summary.append("\n### IP Address Analysis") + for ip in ips[:5]: # Report on first 5 + summary.append(f"\n- IP Address: {ip['id']}") + + # Hostnames + hostnames = [edge['to'] for edge in edges + if edge['from'] == ip['id'] and _is_valid_domain(edge['to'])] + if hostnames: + summary.append(f" - Associated Hostnames: {', '.join(hostnames)}") + + # ISP + isp_edge = next((edge for edge in edges + if edge['from'] == ip['id'] and + any(node['id'] == edge['to'] and node['type'] == 'isp' + for node in nodes)), None) + if isp_edge: + summary.append(f" - ISP: {isp_edge['to']}") + + # Data Sources + summary.append("\n## Data Sources") + provider_stats = scanner.logger.get_forensic_summary().get('provider_statistics', {}) + for provider, stats in provider_stats.items(): + relationships = stats.get('relationships_discovered', 0) + requests = stats.get('successful_requests', 0) + summary.append(f"- {provider.capitalize()}: {relationships} relationships from {requests} requests.") + + summary.append("\n" + "="*40) + summary.append("End of Report") + + return "\n".join(summary) + + def export_graph_json(self, graph_manager) -> Dict[str, Any]: + """ + Export complete graph data as a JSON-serializable dictionary. + Moved from GraphManager to centralize export functionality. + + Args: + graph_manager: GraphManager instance with graph data + + Returns: + Complete graph data with export metadata + """ + graph_data = nx.node_link_data(graph_manager.graph, edges="edges") + + return { + 'export_metadata': { + 'export_timestamp': datetime.now(timezone.utc).isoformat(), + 'graph_creation_time': graph_manager.creation_time, + 'last_modified': graph_manager.last_modified, + 'total_nodes': graph_manager.get_node_count(), + 'total_edges': graph_manager.get_edge_count(), + 'graph_format': 'dnsrecon_v1_unified_model' + }, + 'graph': graph_data, + 'statistics': graph_manager.get_statistics() + } + + def serialize_to_json(self, data: Dict[str, Any], indent: int = 2) -> str: + """ + Serialize data to JSON with custom handling for non-serializable objects. + + Args: + data: Data to serialize + indent: JSON indentation level + + Returns: + JSON string representation + """ + try: + return json.dumps(data, indent=indent, cls=CustomJSONEncoder, ensure_ascii=False) + except Exception: + # Fallback to aggressive cleaning + cleaned_data = self._clean_for_json(data) + return json.dumps(cleaned_data, indent=indent, ensure_ascii=False) + + def _clean_for_json(self, obj, max_depth: int = 10, current_depth: int = 0) -> Any: + """ + Recursively clean an object to make it JSON serializable. + Handles circular references and problematic object types. + + Args: + obj: Object to clean + max_depth: Maximum recursion depth + current_depth: Current recursion depth + + Returns: + JSON-serializable object + """ + if current_depth > max_depth: + return f"" + + if obj is None or isinstance(obj, (bool, int, float, str)): + return obj + elif isinstance(obj, datetime): + return obj.isoformat() + elif isinstance(obj, (set, frozenset)): + return list(obj) + elif isinstance(obj, dict): + cleaned = {} + for key, value in obj.items(): + try: + # Ensure key is string + clean_key = str(key) if not isinstance(key, str) else key + cleaned[clean_key] = self._clean_for_json(value, max_depth, current_depth + 1) + except Exception: + cleaned[str(key)] = f"" + return cleaned + elif isinstance(obj, (list, tuple)): + cleaned = [] + for item in obj: + try: + cleaned.append(self._clean_for_json(item, max_depth, current_depth + 1)) + except Exception: + cleaned.append(f"") + return cleaned + elif hasattr(obj, '__dict__'): + try: + return self._clean_for_json(obj.__dict__, max_depth, current_depth + 1) + except Exception: + return str(obj) + elif hasattr(obj, 'value'): + # For enum-like objects + return obj.value + else: + return str(obj) + + def generate_filename(self, target: str, export_type: str, timestamp: Optional[datetime] = None) -> str: + """ + Generate standardized filename for exports. + + Args: + target: Target domain/IP being scanned + export_type: Type of export (json, txt, summary) + timestamp: Optional timestamp (defaults to now) + + Returns: + Formatted filename with forensic naming convention + """ + if timestamp is None: + timestamp = datetime.now(timezone.utc) + + timestamp_str = timestamp.strftime('%Y%m%d_%H%M%S') + safe_target = "".join(c for c in target if c.isalnum() or c in ('-', '_', '.')).rstrip() + + extension_map = { + 'json': 'json', + 'txt': 'txt', + 'summary': 'txt', + 'targets': 'txt' + } + + extension = extension_map.get(export_type, 'txt') + return f"dnsrecon_{export_type}_{safe_target}_{timestamp_str}.{extension}" + + +class CustomJSONEncoder(json.JSONEncoder): + """Custom JSON encoder to handle non-serializable objects.""" + + def default(self, obj): + if isinstance(obj, datetime): + return obj.isoformat() + elif isinstance(obj, set): + return list(obj) + elif isinstance(obj, Decimal): + return float(obj) + elif hasattr(obj, '__dict__'): + # For custom objects, try to serialize their dict representation + try: + return obj.__dict__ + except: + return str(obj) + elif hasattr(obj, 'value') and hasattr(obj, 'name'): + # For enum objects + return obj.value + else: + # For any other non-serializable object, convert to string + return str(obj) + + +# Global export manager instance +export_manager = ExportManager() \ No newline at end of file