# dnsrecon-reduced/app.py """ Flask application entry point for DNSRecon web interface. Provides REST API endpoints and serves the web interface with user session support. """ import traceback from flask import Flask, render_template, request, jsonify, send_file, session from datetime import datetime, timezone, timedelta import io import os from core.session_manager import session_manager from flask_socketio import SocketIO from config import config from core.graph_manager import NodeType from utils.helpers import is_valid_target from utils.export_manager import export_manager from decimal import Decimal app = Flask(__name__) socketio = SocketIO(app) app.config['SECRET_KEY'] = config.flask_secret_key app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=config.flask_permanent_session_lifetime_hours) def get_user_scanner(): """ Retrieves the scanner for the current session, or creates a new one if none exists. """ current_flask_session_id = session.get('dnsrecon_session_id') if current_flask_session_id: existing_scanner = session_manager.get_session(current_flask_session_id) if existing_scanner: return current_flask_session_id, existing_scanner new_session_id = session_manager.create_session(socketio) new_scanner = session_manager.get_session(new_session_id) if not new_scanner: raise Exception("Failed to create new scanner session") session['dnsrecon_session_id'] = new_session_id session.permanent = True return new_session_id, new_scanner @app.route('/') def index(): """Serve the main web interface.""" return render_template('index.html') @app.route('/api/scan/start', methods=['POST']) def start_scan(): """ Starts a new reconnaissance scan. """ try: data = request.get_json() if not data or 'target' not in data: return jsonify({'success': False, 'error': 'Missing target in request'}), 400 target = data['target'].strip() max_depth = data.get('max_depth', config.default_recursion_depth) clear_graph = data.get('clear_graph', True) force_rescan_target = data.get('force_rescan_target', None) if not target: return jsonify({'success': False, 'error': 'Target cannot be empty'}), 400 if not is_valid_target(target): return jsonify({'success': False, 'error': 'Invalid target format.'}), 400 if not isinstance(max_depth, int) or not 1 <= max_depth <= 5: return jsonify({'success': False, 'error': 'Max depth must be an integer between 1 and 5'}), 400 user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'Failed to get scanner instance.'}), 500 success = scanner.start_scan(target, max_depth, clear_graph=clear_graph, force_rescan_target=force_rescan_target) if success: return jsonify({ 'success': True, 'message': 'Reconnaissance scan started successfully', 'scan_id': scanner.logger.session_id, 'user_session_id': user_session_id }) else: return jsonify({ 'success': False, 'error': f'Failed to start scan (scanner status: {scanner.status})', }), 409 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/scan/stop', methods=['POST']) def stop_scan(): """Stop the current scan.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No scanner found for session'}), 404 if not scanner.session_id: scanner.session_id = user_session_id scanner.stop_scan() session_manager.set_stop_signal(user_session_id) session_manager.update_scanner_status(user_session_id, 'stopped') session_manager.update_session_scanner(user_session_id, scanner) return jsonify({ 'success': True, 'message': 'Scan stop requested', 'user_session_id': user_session_id }) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @socketio.on('get_status') def get_scan_status(): """Get current scan status.""" try: user_session_id, scanner = get_user_scanner() if not scanner: status = { 'status': 'idle', 'target_domain': None, 'current_depth': 0, 'max_depth': 0, 'progress_percentage': 0.0, 'user_session_id': user_session_id } else: if not scanner.session_id: scanner.session_id = user_session_id status = scanner.get_scan_status() status['user_session_id'] = user_session_id socketio.emit('scan_update', status) except Exception as e: traceback.print_exc() socketio.emit('scan_update', { 'status': 'error', 'message': 'Failed to get status' }) @app.route('/api/graph', methods=['GET']) def get_graph_data(): """Get current graph data.""" try: user_session_id, scanner = get_user_scanner() empty_graph = { 'nodes': [], 'edges': [], 'statistics': {'node_count': 0, 'edge_count': 0} } if not scanner: return jsonify({'success': True, 'graph': empty_graph, 'user_session_id': user_session_id}) graph_data = scanner.get_graph_data() or empty_graph return jsonify({'success': True, 'graph': graph_data, 'user_session_id': user_session_id}) except Exception as e: traceback.print_exc() return jsonify({ 'success': False, 'error': f'Internal server error: {str(e)}', 'fallback_graph': {'nodes': [], 'edges': [], 'statistics': {}} }), 500 @app.route('/api/graph/large-entity/extract', methods=['POST']) def extract_from_large_entity(): """Extract a node from a large entity.""" try: data = request.get_json() large_entity_id = data.get('large_entity_id') node_id = data.get('node_id') if not large_entity_id or not node_id: return jsonify({'success': False, 'error': 'Missing required parameters'}), 400 user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active session found'}), 404 success = scanner.extract_node_from_large_entity(large_entity_id, node_id) if success: session_manager.update_session_scanner(user_session_id, scanner) return jsonify({'success': True, 'message': f'Node {node_id} extracted successfully.'}) else: return jsonify({'success': False, 'error': f'Failed to extract node {node_id}.'}), 500 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/graph/node/', methods=['DELETE']) def delete_graph_node(node_id): """Delete a node from the graph.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active session found'}), 404 success = scanner.graph.remove_node(node_id) if success: session_manager.update_session_scanner(user_session_id, scanner) return jsonify({'success': True, 'message': f'Node {node_id} deleted successfully.'}) else: return jsonify({'success': False, 'error': f'Node {node_id} not found.'}), 404 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/graph/revert', methods=['POST']) def revert_graph_action(): """Reverts a graph action, such as re-adding a deleted node.""" try: data = request.get_json() if not data or 'type' not in data or 'data' not in data: return jsonify({'success': False, 'error': 'Invalid revert request format'}), 400 user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active session found'}), 404 action_type = data['type'] action_data = data['data'] if action_type == 'delete': node_to_add = action_data.get('node') if node_to_add: scanner.graph.add_node( node_id=node_to_add['id'], node_type=NodeType(node_to_add['type']), attributes=node_to_add.get('attributes'), description=node_to_add.get('description'), metadata=node_to_add.get('metadata') ) edges_to_add = action_data.get('edges', []) for edge in edges_to_add: if scanner.graph.graph.has_node(edge['from']) and scanner.graph.graph.has_node(edge['to']): scanner.graph.add_edge( source_id=edge['from'], target_id=edge['to'], relationship_type=edge['metadata']['relationship_type'], confidence_score=edge['metadata']['confidence_score'], source_provider=edge['metadata']['source_provider'], raw_data=edge.get('raw_data', {}) ) session_manager.update_session_scanner(user_session_id, scanner) return jsonify({'success': True, 'message': 'Delete action reverted successfully.'}) return jsonify({'success': False, 'error': f'Unknown revert action type: {action_type}'}), 400 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/export', methods=['GET']) def export_results(): """Export scan results as a JSON file with improved error handling.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active scanner session found'}), 404 # Get export data using the new export manager try: results = export_manager.export_scan_results(scanner) except Exception as e: return jsonify({'success': False, 'error': f'Failed to gather export data: {str(e)}'}), 500 # Add user session metadata results['export_metadata']['user_session_id'] = user_session_id results['export_metadata']['forensic_integrity'] = 'maintained' # Generate filename filename = export_manager.generate_filename( target=scanner.current_target or 'unknown', export_type='json' ) # Serialize with export manager try: json_data = export_manager.serialize_to_json(results) except Exception as e: return jsonify({ 'success': False, 'error': f'JSON serialization failed: {str(e)}' }), 500 # Create file object file_obj = io.BytesIO(json_data.encode('utf-8')) return send_file( file_obj, as_attachment=True, download_name=filename, mimetype='application/json' ) except Exception as e: traceback.print_exc() return jsonify({ 'success': False, 'error': f'Export failed: {str(e)}', 'error_type': type(e).__name__ }), 500 @app.route('/api/export/targets', methods=['GET']) def export_targets(): """Export all discovered targets as a TXT file.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active scanner session found'}), 404 # Use export manager for targets export targets_txt = export_manager.export_targets_list(scanner) # Generate filename using export manager filename = export_manager.generate_filename( target=scanner.current_target or 'unknown', export_type='targets' ) file_obj = io.BytesIO(targets_txt.encode('utf-8')) return send_file( file_obj, as_attachment=True, download_name=filename, mimetype='text/plain' ) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Export failed: {str(e)}'}), 500 @app.route('/api/export/summary', methods=['GET']) def export_summary(): """Export an executive summary as a TXT file.""" try: user_session_id, scanner = get_user_scanner() if not scanner: return jsonify({'success': False, 'error': 'No active scanner session found'}), 404 # Use export manager for summary generation summary_txt = export_manager.generate_executive_summary(scanner) # Generate filename using export manager filename = export_manager.generate_filename( target=scanner.current_target or 'unknown', export_type='summary' ) file_obj = io.BytesIO(summary_txt.encode('utf-8')) return send_file( file_obj, as_attachment=True, download_name=filename, mimetype='text/plain' ) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Export failed: {str(e)}'}), 500 @app.route('/api/config/api-keys', methods=['POST']) def set_api_keys(): """Set API keys for the current session.""" try: data = request.get_json() if data is None: return jsonify({'success': False, 'error': 'No API keys provided'}), 400 user_session_id, scanner = get_user_scanner() session_config = scanner.config updated_providers = [] for provider_name, api_key in data.items(): api_key_value = str(api_key or '').strip() success = session_config.set_api_key(provider_name.lower(), api_key_value) if success: updated_providers.append(provider_name) if updated_providers: scanner._initialize_providers() session_manager.update_session_scanner(user_session_id, scanner) return jsonify({ 'success': True, 'message': f'API keys updated for: {", ".join(updated_providers)}', 'user_session_id': user_session_id }) else: return jsonify({'success': False, 'error': 'No valid API keys were provided.'}), 400 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/providers', methods=['GET']) def get_providers(): """Get enhanced information about available providers including API key sources.""" try: user_session_id, scanner = get_user_scanner() base_provider_info = scanner.get_provider_info() # Enhance provider info with API key source information enhanced_provider_info = {} for provider_name, info in base_provider_info.items(): enhanced_info = dict(info) # Copy base info if info['requires_api_key']: # Determine API key source and configuration status api_key = scanner.config.get_api_key(provider_name) backend_api_key = os.getenv(f'{provider_name.upper()}_API_KEY') if backend_api_key: # API key configured via backend/environment enhanced_info.update({ 'api_key_configured': True, 'api_key_source': 'backend', 'api_key_help': f'API key configured via environment variable {provider_name.upper()}_API_KEY' }) elif api_key: # API key configured via web interface enhanced_info.update({ 'api_key_configured': True, 'api_key_source': 'frontend', 'api_key_help': f'API key set via web interface (session-only)' }) else: # No API key configured enhanced_info.update({ 'api_key_configured': False, 'api_key_source': None, 'api_key_help': f'Requires API key to enable {info["display_name"]} integration' }) else: # Provider doesn't require API key enhanced_info.update({ 'api_key_configured': True, # Always "configured" for non-API providers 'api_key_source': None, 'api_key_help': None }) enhanced_provider_info[provider_name] = enhanced_info return jsonify({ 'success': True, 'providers': enhanced_provider_info, 'user_session_id': user_session_id }) except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/config/providers', methods=['POST']) def configure_providers(): """Configure provider settings (enable/disable).""" try: data = request.get_json() if data is None: return jsonify({'success': False, 'error': 'No provider settings provided'}), 400 user_session_id, scanner = get_user_scanner() session_config = scanner.config updated_providers = [] for provider_name, settings in data.items(): provider_name_clean = provider_name.lower().strip() if 'enabled' in settings: # Update the enabled state in session config session_config.enabled_providers[provider_name_clean] = settings['enabled'] updated_providers.append(provider_name_clean) if updated_providers: # Reinitialize providers with new settings scanner._initialize_providers() session_manager.update_session_scanner(user_session_id, scanner) return jsonify({ 'success': True, 'message': f'Provider settings updated for: {", ".join(updated_providers)}', 'user_session_id': user_session_id }) else: return jsonify({'success': False, 'error': 'No valid provider settings were provided.'}), 400 except Exception as e: traceback.print_exc() return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.errorhandler(404) def not_found(error): """Handle 404 errors.""" return jsonify({'success': False, 'error': 'Endpoint not found'}), 404 @app.errorhandler(500) def internal_error(error): """Handle 500 errors.""" traceback.print_exc() return jsonify({'success': False, 'error': 'Internal server error'}), 500 if __name__ == '__main__': config.load_from_env() socketio.run(app, host=config.flask_host, port=config.flask_port, debug=config.flask_debug)