# dnsrecon-reduced/app.py """ Flask application entry point for DNSRecon web interface. Provides REST API endpoints and serves the web interface with user session support. """ import json import traceback from flask import Flask, render_template, request, jsonify, send_file, session from datetime import datetime, timezone, timedelta import io import os from core.session_manager import session_manager from config import config from core.graph_manager import NodeType from utils.helpers import is_valid_target from decimal import Decimal app = Flask(__name__) app.config['SECRET_KEY'] = config.flask_secret_key app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=config.flask_permanent_session_lifetime_hours) def get_user_scanner(): """ Retrieves the scanner for the current session, or creates a new one if none exists. """ current_flask_session_id = session.get('dnsrecon_session_id') if current_flask_session_id: existing_scanner = session_manager.get_session(current_flask_session_id) if existing_scanner: return current_flask_session_id, existing_scanner new_session_id = session_manager.create_session() new_scanner = session_manager.get_session(new_session_id) if not new_scanner: raise Exception("Failed to create new scanner session") session['dnsrecon_session_id'] = new_session_id session.permanent = True return new_session_id, new_scanner class CustomJSONEncoder(json.JSONEncoder): """Custom JSON encoder to handle non-serializable objects.""" def default(self, obj): if isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, set): return list(obj) elif isinstance(obj, Decimal): return float(obj) elif hasattr(obj, '__dict__'): # For custom objects, try to serialize their dict representation try: return obj.__dict__ except: return str(obj) elif hasattr(obj, 'value') and hasattr(obj, 'name'): # For enum objects return obj.value else: # For any other non-serializable object, convert to string return str(obj) @app.route('/') def index(): """Serve the main web interface.""" return render_template('index.html') @app.route('/api/scan/start', methods=['POST']) def start_scan(): """ Starts a new reconnaissance scan. """ try: data = request.get_json() if not data or 'target' not in data: return jsonify({'success': False, 'error': 'Missing target in request'}), 400 target = data['target'].strip() max_depth = data.get('max_depth', config.default_recursion_depth) clear_graph = data.get('clear_graph', True) force_rescan_target = data.get('force_rescan_target', None) if not target: return jsonify({'success': False, 'error': 'Target cannot be empty'}), 400 if not is_valid_target(target): return jsonify({'success': False, 'error': 'Invalid target format.'}), 400 if not isinstance(max_depth, int) or not 1 <= max_depth <= 5: return jsonify({'success': False, 'error': 'Max depth must be an integer between 1 and 5'}), 400 user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'Failed to get scanner instance.'}), 500 success = scanner.start_scan(target, max_depth, clear_graph=clear_graph, force_rescan_target=force_rescan_target) if success: return jsonify({ 'success': True, 'message': 'Scan started successfully', 'scan_id': scanner.logger.session_id, 'user_session_id': user_session_id }) else: return jsonify({ 'success': False, 'error': f'Failed to start scan (scanner status: {scanner.status})', }), 409 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/scan/stop', methods=['POST']) def stop_scan(): """Stop the current scan.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No scanner found for session'}), 404 if not scanner.session_id: scanner.session_id = user_session_id scanner.stop_scan() session_manager.set_stop_signal(user_session_id) session_manager.update_scanner_status(user_session_id, 'stopped') session_manager.update_session_scanner(user_session_id, scanner) return jsonify({ 'success': True, 'message': 'Scan stop requested', 'user_session_id': user_session_id }) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/scan/status', methods=['GET']) def get_scan_status(): """Get current scan status.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({ 'success': True, 'status': { 'status': 'idle', 'target_domain': None, 'current_depth': 0, 'max_depth': 0, 'progress_percentage': 0.0, 'user_session_id': user_session_id } }) if not scanner.session_id: scanner.session_id = user_session_id status = scanner.get_scan_status() status['user_session_id'] = user_session_id return jsonify({'success': True, 'status': status}) except Exception as e: traceback.print_exc() return jsonify({ 'success': False, 'error': f'Internal server error: {str(e)}', 'fallback_status': {'status': 'error', 'progress_percentage': 0.0} }), 500 @app.route('/api/graph', methods=['GET']) def get_graph_data(): """Get current graph data.""" try: user_session_id, scanner = get_user_scanner() empty_graph = { 'nodes': [], 'edges': [], 'statistics': {'node_count': 0, 'edge_count': 0} } if not scanner: return jsonify({'success': True, 'graph': empty_graph, 'user_session_id': user_session_id}) graph_data = scanner.get_graph_data() or empty_graph return jsonify({'success': True, 'graph': graph_data, 'user_session_id': user_session_id}) except Exception as e: traceback.print_exc() return jsonify({ 'success': False, 'error': f'Internal server error: {str(e)}', 'fallback_graph': {'nodes': [], 'edges': [], 'statistics': {}} }), 500 @app.route('/api/graph/large-entity/extract', methods=['POST']) def extract_from_large_entity(): """Extract a node from a large entity.""" try: data = request.get_json() large_entity_id = data.get('large_entity_id') node_id = data.get('node_id') if not large_entity_id or not node_id: return jsonify({'success': False, 'error': 'Missing required parameters'}), 400 user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active session found'}), 404 success = scanner.extract_node_from_large_entity(large_entity_id, node_id) if success: session_manager.update_session_scanner(user_session_id, scanner) return jsonify({'success': True, 'message': f'Node {node_id} extracted successfully.'}) else: return jsonify({'success': False, 'error': f'Failed to extract node {node_id}.'}), 500 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/graph/node/', methods=['DELETE']) def delete_graph_node(node_id): """Delete a node from the graph.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active session found'}), 404 success = scanner.graph.remove_node(node_id) if success: session_manager.update_session_scanner(user_session_id, scanner) return jsonify({'success': True, 'message': f'Node {node_id} deleted successfully.'}) else: return jsonify({'success': False, 'error': f'Node {node_id} not found.'}), 404 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/graph/revert', methods=['POST']) def revert_graph_action(): """Reverts a graph action, such as re-adding a deleted node.""" try: data = request.get_json() if not data or 'type' not in data or 'data' not in data: return jsonify({'success': False, 'error': 'Invalid revert request format'}), 400 user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active session found'}), 404 action_type = data['type'] action_data = data['data'] if action_type == 'delete': node_to_add = action_data.get('node') if node_to_add: scanner.graph.add_node( node_id=node_to_add['id'], node_type=NodeType(node_to_add['type']), attributes=node_to_add.get('attributes'), description=node_to_add.get('description'), metadata=node_to_add.get('metadata') ) edges_to_add = action_data.get('edges', []) for edge in edges_to_add: if scanner.graph.graph.has_node(edge['from']) and scanner.graph.graph.has_node(edge['to']): scanner.graph.add_edge( source_id=edge['from'], target_id=edge['to'], relationship_type=edge['metadata']['relationship_type'], confidence_score=edge['metadata']['confidence_score'], source_provider=edge['metadata']['source_provider'], raw_data=edge.get('raw_data', {}) ) session_manager.update_session_scanner(user_session_id, scanner) return jsonify({'success': True, 'message': 'Delete action reverted successfully.'}) return jsonify({'success': False, 'error': f'Unknown revert action type: {action_type}'}), 400 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/export', methods=['GET']) def export_results(): """Export scan results as a JSON file with improved error handling.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active scanner session found'}), 404 # Get export data with error handling try: results = scanner.export_results() except Exception as e: return jsonify({'success': False, 'error': f'Failed to gather export data: {str(e)}'}), 500 # Add export metadata results['export_metadata'] = { 'user_session_id': user_session_id, 'export_timestamp': datetime.now(timezone.utc).isoformat(), 'export_version': '1.0.0', 'forensic_integrity': 'maintained' } # Generate filename with forensic naming convention timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') target = scanner.current_target or 'unknown' # Sanitize target for filename safe_target = "".join(c for c in target if c.isalnum() or c in ('-', '_', '.')).rstrip() filename = f"dnsrecon_{safe_target}_{timestamp}.json" # Serialize with custom encoder and error handling try: json_data = json.dumps(results, indent=2, cls=CustomJSONEncoder, ensure_ascii=False) except Exception as e: # If custom encoder fails, try a more aggressive approach try: # Convert problematic objects to strings recursively cleaned_results = _clean_for_json(results) json_data = json.dumps(cleaned_results, indent=2, ensure_ascii=False) except Exception as e2: return jsonify({ 'success': False, 'error': f'JSON serialization failed: {str(e2)}' }), 500 # Create file object file_obj = io.BytesIO(json_data.encode('utf-8')) return send_file( file_obj, as_attachment=True, download_name=filename, mimetype='application/json' ) except Exception as e: traceback.print_exc() return jsonify({ 'success': False, 'error': f'Export failed: {str(e)}', 'error_type': type(e).__name__ }), 500 @app.route('/api/export/targets', methods=['GET']) def export_targets(): """Export all discovered targets as a TXT file.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active scanner session found'}), 404 targets_txt = scanner.export_targets_txt() timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') safe_target = "".join(c for c in (scanner.current_target or 'unknown') if c.isalnum() or c in ('-', '_', '.')).rstrip() filename = f"dnsrecon_targets_{safe_target}_{timestamp}.txt" file_obj = io.BytesIO(targets_txt.encode('utf-8')) return send_file( file_obj, as_attachment=True, download_name=filename, mimetype='text/plain' ) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Export failed: {str(e)}'}), 500 @app.route('/api/export/summary', methods=['GET']) def export_summary(): """Export an executive summary as a TXT file.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active scanner session found'}), 404 summary_txt = scanner.generate_executive_summary() timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') safe_target = "".join(c for c in (scanner.current_target or 'unknown') if c.isalnum() or c in ('-', '_', '.')).rstrip() filename = f"dnsrecon_summary_{safe_target}_{timestamp}.txt" file_obj = io.BytesIO(summary_txt.encode('utf-8')) return send_file( file_obj, as_attachment=True, download_name=filename, mimetype='text/plain' ) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Export failed: {str(e)}'}), 500 def _clean_for_json(obj, max_depth=10, current_depth=0): """ Recursively clean an object to make it JSON serializable. Handles circular references and problematic object types. """ if current_depth > max_depth: return f"" if obj is None or isinstance(obj, (bool, int, float, str)): return obj elif isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, (set, frozenset)): return list(obj) elif isinstance(obj, dict): cleaned = {} for key, value in obj.items(): try: # Ensure key is string clean_key = str(key) if not isinstance(key, str) else key cleaned[clean_key] = _clean_for_json(value, max_depth, current_depth + 1) except Exception: cleaned[str(key)] = f"" return cleaned elif isinstance(obj, (list, tuple)): cleaned = [] for item in obj: try: cleaned.append(_clean_for_json(item, max_depth, current_depth + 1)) except Exception: cleaned.append(f"") return cleaned elif hasattr(obj, '__dict__'): try: return _clean_for_json(obj.__dict__, max_depth, current_depth + 1) except Exception: return str(obj) elif hasattr(obj, 'value'): # For enum-like objects return obj.value else: return str(obj) @app.route('/api/config/api-keys', methods=['POST']) def set_api_keys(): """Set API keys for the current session.""" try: data = request.get_json() if data is None: return jsonify({'success': False, 'error': 'No API keys provided'}), 400 user_session_id, scanner = get_user_scanner() session_config = scanner.config updated_providers = [] for provider_name, api_key in data.items(): api_key_value = str(api_key or '').strip() success = session_config.set_api_key(provider_name.lower(), api_key_value) if success: updated_providers.append(provider_name) if updated_providers: scanner._initialize_providers() session_manager.update_session_scanner(user_session_id, scanner) return jsonify({ 'success': True, 'message': f'API keys updated for: {", ".join(updated_providers)}', 'user_session_id': user_session_id }) else: return jsonify({'success': False, 'error': 'No valid API keys were provided.'}), 400 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/providers', methods=['GET']) def get_providers(): """Get enhanced information about available providers including API key sources.""" try: user_session_id, scanner = get_user_scanner() base_provider_info = scanner.get_provider_info() # Enhance provider info with API key source information enhanced_provider_info = {} for provider_name, info in base_provider_info.items(): enhanced_info = dict(info) # Copy base info if info['requires_api_key']: # Determine API key source and configuration status api_key = scanner.config.get_api_key(provider_name) backend_api_key = os.getenv(f'{provider_name.upper()}_API_KEY') if backend_api_key: # API key configured via backend/environment enhanced_info.update({ 'api_key_configured': True, 'api_key_source': 'backend', 'api_key_help': f'API key configured via environment variable {provider_name.upper()}_API_KEY' }) elif api_key: # API key configured via web interface enhanced_info.update({ 'api_key_configured': True, 'api_key_source': 'frontend', 'api_key_help': f'API key set via web interface (session-only)' }) else: # No API key configured enhanced_info.update({ 'api_key_configured': False, 'api_key_source': None, 'api_key_help': f'Requires API key to enable {info["display_name"]} integration' }) else: # Provider doesn't require API key enhanced_info.update({ 'api_key_configured': True, # Always "configured" for non-API providers 'api_key_source': None, 'api_key_help': None }) enhanced_provider_info[provider_name] = enhanced_info return jsonify({ 'success': True, 'providers': enhanced_provider_info, 'user_session_id': user_session_id }) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/config/providers', methods=['POST']) def configure_providers(): """Configure provider settings (enable/disable).""" try: data = request.get_json() if data is None: return jsonify({'success': False, 'error': 'No provider settings provided'}), 400 user_session_id, scanner = get_user_scanner() session_config = scanner.config updated_providers = [] for provider_name, settings in data.items(): provider_name_clean = provider_name.lower().strip() if 'enabled' in settings: # Update the enabled state in session config session_config.enabled_providers[provider_name_clean] = settings['enabled'] updated_providers.append(provider_name_clean) if updated_providers: # Reinitialize providers with new settings scanner._initialize_providers() session_manager.update_session_scanner(user_session_id, scanner) return jsonify({ 'success': True, 'message': f'Provider settings updated for: {", ".join(updated_providers)}', 'user_session_id': user_session_id }) else: return jsonify({'success': False, 'error': 'No valid provider settings were provided.'}), 400 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.errorhandler(404) def not_found(error): """Handle 404 errors.""" return jsonify({'success': False, 'error': 'Endpoint not found'}), 404 @app.errorhandler(500) def internal_error(error): """Handle 500 errors.""" traceback.print_exc() return jsonify({'success': False, 'error': 'Internal server error'}), 500 if __name__ == '__main__': config.load_from_env() app.run( host=config.flask_host, port=config.flask_port, debug=config.flask_debug, threaded=True )