# dnsrecon-reduced/app.py """ Flask application entry point for DNSRecon web interface. Provides REST API endpoints and serves the web interface with user session support. """ import json import traceback from flask import Flask, render_template, request, jsonify, send_file, session from datetime import datetime, timezone, timedelta import io import os from core.session_manager import session_manager from config import config from core.graph_manager import NodeType from utils.helpers import is_valid_target from utils.export_manager import export_manager from decimal import Decimal app = Flask(__name__) app.config['SECRET_KEY'] = config.flask_secret_key app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=config.flask_permanent_session_lifetime_hours) def get_user_scanner(): """ Retrieves the scanner for the current session, or creates a new one if none exists. """ current_flask_session_id = session.get('dnsrecon_session_id') if current_flask_session_id: existing_scanner = session_manager.get_session(current_flask_session_id) if existing_scanner: return current_flask_session_id, existing_scanner new_session_id = session_manager.create_session() new_scanner = session_manager.get_session(new_session_id) if not new_scanner: raise Exception("Failed to create new scanner session") session['dnsrecon_session_id'] = new_session_id session.permanent = True return new_session_id, new_scanner @app.route('/') def index(): """Serve the main web interface.""" return render_template('index.html') @app.route('/api/scan/start', methods=['POST']) def start_scan(): """ Starts a new reconnaissance scan. """ try: data = request.get_json() if not data or 'target' not in data: return jsonify({'success': False, 'error': 'Missing target in request'}), 400 target = data['target'].strip() max_depth = data.get('max_depth', config.default_recursion_depth) clear_graph = data.get('clear_graph', True) force_rescan_target = data.get('force_rescan_target', None) if not target: return jsonify({'success': False, 'error': 'Target cannot be empty'}), 400 if not is_valid_target(target): return jsonify({'success': False, 'error': 'Invalid target format.'}), 400 if not isinstance(max_depth, int) or not 1 <= max_depth <= 5: return jsonify({'success': False, 'error': 'Max depth must be an integer between 1 and 5'}), 400 user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'Failed to get scanner instance.'}), 500 success = scanner.start_scan(target, max_depth, clear_graph=clear_graph, force_rescan_target=force_rescan_target) if success: return jsonify({ 'success': True, 'message': 'Reconnaissance scan started successfully', 'scan_id': scanner.logger.session_id, 'user_session_id': user_session_id }) else: return jsonify({ 'success': False, 'error': f'Failed to start scan (scanner status: {scanner.status})', }), 409 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/scan/stop', methods=['POST']) def stop_scan(): """Stop the current scan.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No scanner found for session'}), 404 if not scanner.session_id: scanner.session_id = user_session_id scanner.stop_scan() session_manager.set_stop_signal(user_session_id) session_manager.update_scanner_status(user_session_id, 'stopped') session_manager.update_session_scanner(user_session_id, scanner) return jsonify({ 'success': True, 'message': 'Scan stop requested', 'user_session_id': user_session_id }) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/scan/status', methods=['GET']) def get_scan_status(): """Get current scan status.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({ 'success': True, 'status': { 'status': 'idle', 'target_domain': None, 'current_depth': 0, 'max_depth': 0, 'progress_percentage': 0.0, 'user_session_id': user_session_id } }) if not scanner.session_id: scanner.session_id = user_session_id status = scanner.get_scan_status() status['user_session_id'] = user_session_id return jsonify({'success': True, 'status': status}) except Exception as e: traceback.print_exc() return jsonify({ 'success': False, 'error': f'Internal server error: {str(e)}', 'fallback_status': {'status': 'error', 'progress_percentage': 0.0} }), 500 @app.route('/api/graph', methods=['GET']) def get_graph_data(): """Get current graph data.""" try: user_session_id, scanner = get_user_scanner() empty_graph = { 'nodes': [], 'edges': [], 'statistics': {'node_count': 0, 'edge_count': 0} } if not scanner: return jsonify({'success': True, 'graph': empty_graph, 'user_session_id': user_session_id}) graph_data = scanner.get_graph_data() or empty_graph return jsonify({'success': True, 'graph': graph_data, 'user_session_id': user_session_id}) except Exception as e: traceback.print_exc() return jsonify({ 'success': False, 'error': f'Internal server error: {str(e)}', 'fallback_graph': {'nodes': [], 'edges': [], 'statistics': {}} }), 500 @app.route('/api/graph/large-entity/extract', methods=['POST']) def extract_from_large_entity(): """ FIXED: Extract a node from a large entity with proper error handling. """ try: data = request.get_json() large_entity_id = data.get('large_entity_id') node_id = data.get('node_id') if not large_entity_id or not node_id: return jsonify({'success': False, 'error': 'Missing required parameters'}), 400 user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active session found'}), 404 # FIXED: Check if node exists and provide better error messages if not scanner.graph.graph.has_node(node_id): return jsonify({ 'success': False, 'error': f'Node {node_id} not found in graph' }), 404 # FIXED: Check if node is actually part of the large entity node_data = scanner.graph.graph.nodes[node_id] metadata = node_data.get('metadata', {}) current_large_entity = metadata.get('large_entity_id') if not current_large_entity: return jsonify({ 'success': False, 'error': f'Node {node_id} is not part of any large entity' }), 400 if current_large_entity != large_entity_id: return jsonify({ 'success': False, 'error': f'Node {node_id} belongs to large entity {current_large_entity}, not {large_entity_id}' }), 400 # FIXED: Check if large entity exists if not scanner.graph.graph.has_node(large_entity_id): return jsonify({ 'success': False, 'error': f'Large entity {large_entity_id} not found' }), 404 # Perform the extraction success = scanner.extract_node_from_large_entity(large_entity_id, node_id) if success: # Force immediate session state update session_manager.update_session_scanner(user_session_id, scanner) return jsonify({ 'success': True, 'message': f'Node {node_id} extracted successfully from {large_entity_id}.', 'extracted_node': node_id, 'large_entity': large_entity_id }) else: # This should not happen with the improved checks above, but handle it gracefully return jsonify({ 'success': False, 'error': f'Failed to extract node {node_id} from {large_entity_id}. Node may have already been extracted.' }), 409 except json.JSONDecodeError: return jsonify({'success': False, 'error': 'Invalid JSON in request body'}), 400 except Exception as e: traceback.print_exc() return jsonify({ 'success': False, 'error': f'Internal server error: {str(e)}', 'error_type': type(e).__name__ }), 500 @app.route('/api/graph/node/', methods=['DELETE']) def delete_graph_node(node_id): """Delete a node from the graph.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active session found'}), 404 success = scanner.graph.remove_node(node_id) if success: session_manager.update_session_scanner(user_session_id, scanner) return jsonify({'success': True, 'message': f'Node {node_id} deleted successfully.'}) else: return jsonify({'success': False, 'error': f'Node {node_id} not found.'}), 404 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/graph/revert', methods=['POST']) def revert_graph_action(): """Reverts a graph action, such as re-adding a deleted node.""" try: data = request.get_json() if not data or 'type' not in data or 'data' not in data: return jsonify({'success': False, 'error': 'Invalid revert request format'}), 400 user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active session found'}), 404 action_type = data['type'] action_data = data['data'] if action_type == 'delete': node_to_add = action_data.get('node') if node_to_add: scanner.graph.add_node( node_id=node_to_add['id'], node_type=NodeType(node_to_add['type']), attributes=node_to_add.get('attributes'), description=node_to_add.get('description'), metadata=node_to_add.get('metadata') ) edges_to_add = action_data.get('edges', []) for edge in edges_to_add: if scanner.graph.graph.has_node(edge['from']) and scanner.graph.graph.has_node(edge['to']): scanner.graph.add_edge( source_id=edge['from'], target_id=edge['to'], relationship_type=edge['metadata']['relationship_type'], confidence_score=edge['metadata']['confidence_score'], source_provider=edge['metadata']['source_provider'], raw_data=edge.get('raw_data', {}) ) session_manager.update_session_scanner(user_session_id, scanner) return jsonify({'success': True, 'message': 'Delete action reverted successfully.'}) return jsonify({'success': False, 'error': f'Unknown revert action type: {action_type}'}), 400 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/export', methods=['GET']) def export_results(): """Export scan results as a JSON file with improved error handling.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active scanner session found'}), 404 # Get export data using the new export manager try: results = export_manager.export_scan_results(scanner) except Exception as e: return jsonify({'success': False, 'error': f'Failed to gather export data: {str(e)}'}), 500 # Add user session metadata results['export_metadata']['user_session_id'] = user_session_id results['export_metadata']['forensic_integrity'] = 'maintained' # Generate filename filename = export_manager.generate_filename( target=scanner.current_target or 'unknown', export_type='json' ) # Serialize with export manager try: json_data = export_manager.serialize_to_json(results) except Exception as e: return jsonify({ 'success': False, 'error': f'JSON serialization failed: {str(e)}' }), 500 # Create file object file_obj = io.BytesIO(json_data.encode('utf-8')) return send_file( file_obj, as_attachment=True, download_name=filename, mimetype='application/json' ) except Exception as e: traceback.print_exc() return jsonify({ 'success': False, 'error': f'Export failed: {str(e)}', 'error_type': type(e).__name__ }), 500 @app.route('/api/export/targets', methods=['GET']) def export_targets(): """Export all discovered targets as a TXT file.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active scanner session found'}), 404 # Use export manager for targets export targets_txt = export_manager.export_targets_list(scanner) # Generate filename using export manager filename = export_manager.generate_filename( target=scanner.current_target or 'unknown', export_type='targets' ) file_obj = io.BytesIO(targets_txt.encode('utf-8')) return send_file( file_obj, as_attachment=True, download_name=filename, mimetype='text/plain' ) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Export failed: {str(e)}'}), 500 @app.route('/api/export/summary', methods=['GET']) def export_summary(): """Export an executive summary as a TXT file.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active scanner session found'}), 404 # Use export manager for summary generation summary_txt = export_manager.generate_executive_summary(scanner) # Generate filename using export manager filename = export_manager.generate_filename( target=scanner.current_target or 'unknown', export_type='summary' ) file_obj = io.BytesIO(summary_txt.encode('utf-8')) return send_file( file_obj, as_attachment=True, download_name=filename, mimetype='text/plain' ) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Export failed: {str(e)}'}), 500 @app.route('/api/config/api-keys', methods=['POST']) def set_api_keys(): """Set API keys for the current session.""" try: data = request.get_json() if data is None: return jsonify({'success': False, 'error': 'No API keys provided'}), 400 user_session_id, scanner = get_user_scanner() session_config = scanner.config updated_providers = [] for provider_name, api_key in data.items(): api_key_value = str(api_key or '').strip() success = session_config.set_api_key(provider_name.lower(), api_key_value) if success: updated_providers.append(provider_name) if updated_providers: scanner._initialize_providers() session_manager.update_session_scanner(user_session_id, scanner) return jsonify({ 'success': True, 'message': f'API keys updated for: {", ".join(updated_providers)}', 'user_session_id': user_session_id }) else: return jsonify({'success': False, 'error': 'No valid API keys were provided.'}), 400 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/providers', methods=['GET']) def get_providers(): """Get enhanced information about available providers including API key sources.""" try: user_session_id, scanner = get_user_scanner() base_provider_info = scanner.get_provider_info() # Enhance provider info with API key source information enhanced_provider_info = {} for provider_name, info in base_provider_info.items(): enhanced_info = dict(info) # Copy base info if info['requires_api_key']: # Determine API key source and configuration status api_key = scanner.config.get_api_key(provider_name) backend_api_key = os.getenv(f'{provider_name.upper()}_API_KEY') if backend_api_key: # API key configured via backend/environment enhanced_info.update({ 'api_key_configured': True, 'api_key_source': 'backend', 'api_key_help': f'API key configured via environment variable {provider_name.upper()}_API_KEY' }) elif api_key: # API key configured via web interface enhanced_info.update({ 'api_key_configured': True, 'api_key_source': 'frontend', 'api_key_help': f'API key set via web interface (session-only)' }) else: # No API key configured enhanced_info.update({ 'api_key_configured': False, 'api_key_source': None, 'api_key_help': f'Requires API key to enable {info["display_name"]} integration' }) else: # Provider doesn't require API key enhanced_info.update({ 'api_key_configured': True, # Always "configured" for non-API providers 'api_key_source': None, 'api_key_help': None }) enhanced_provider_info[provider_name] = enhanced_info return jsonify({ 'success': True, 'providers': enhanced_provider_info, 'user_session_id': user_session_id }) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/config/providers', methods=['POST']) def configure_providers(): """Configure provider settings (enable/disable).""" try: data = request.get_json() if data is None: return jsonify({'success': False, 'error': 'No provider settings provided'}), 400 user_session_id, scanner = get_user_scanner() session_config = scanner.config updated_providers = [] for provider_name, settings in data.items(): provider_name_clean = provider_name.lower().strip() if 'enabled' in settings: # Update the enabled state in session config session_config.enabled_providers[provider_name_clean] = settings['enabled'] updated_providers.append(provider_name_clean) if updated_providers: # Reinitialize providers with new settings scanner._initialize_providers() session_manager.update_session_scanner(user_session_id, scanner) return jsonify({ 'success': True, 'message': f'Provider settings updated for: {", ".join(updated_providers)}', 'user_session_id': user_session_id }) else: return jsonify({'success': False, 'error': 'No valid provider settings were provided.'}), 400 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.errorhandler(404) def not_found(error): """Handle 404 errors.""" return jsonify({'success': False, 'error': 'Endpoint not found'}), 404 @app.errorhandler(500) def internal_error(error): """Handle 500 errors.""" traceback.print_exc() return jsonify({'success': False, 'error': 'Internal server error'}), 500 if __name__ == '__main__': config.load_from_env() app.run( host=config.flask_host, port=config.flask_port, debug=config.flask_debug, threaded=True )