dnscope/app.py
2025-09-17 21:12:11 +02:00

464 lines
17 KiB
Python

# dnsrecon-reduced/app.py
"""
Flask application entry point for DNSRecon web interface.
Provides REST API endpoints and serves the web interface with user session support.
"""
import json
import traceback
from flask import Flask, render_template, request, jsonify, send_file, session
from datetime import datetime, timezone, timedelta
import io
import os
from core.session_manager import session_manager
from config import config
from core.graph_manager import NodeType
from utils.helpers import is_valid_target
app = Flask(__name__)
app.config['SECRET_KEY'] = config.flask_secret_key
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=config.flask_permanent_session_lifetime_hours)
def get_user_scanner():
"""
Retrieves the scanner for the current session, or creates a new one if none exists.
"""
current_flask_session_id = session.get('dnsrecon_session_id')
if current_flask_session_id:
existing_scanner = session_manager.get_session(current_flask_session_id)
if existing_scanner:
return current_flask_session_id, existing_scanner
new_session_id = session_manager.create_session()
new_scanner = session_manager.get_session(new_session_id)
if not new_scanner:
raise Exception("Failed to create new scanner session")
session['dnsrecon_session_id'] = new_session_id
session.permanent = True
return new_session_id, new_scanner
@app.route('/')
def index():
"""Serve the main web interface."""
return render_template('index.html')
@app.route('/api/scan/start', methods=['POST'])
def start_scan():
"""
Starts a new reconnaissance scan.
"""
try:
data = request.get_json()
if not data or 'target' not in data:
return jsonify({'success': False, 'error': 'Missing target in request'}), 400
target = data['target'].strip()
max_depth = data.get('max_depth', config.default_recursion_depth)
clear_graph = data.get('clear_graph', True)
force_rescan_target = data.get('force_rescan_target', None)
if not target:
return jsonify({'success': False, 'error': 'Target cannot be empty'}), 400
if not is_valid_target(target):
return jsonify({'success': False, 'error': 'Invalid target format.'}), 400
if not isinstance(max_depth, int) or not 1 <= max_depth <= 5:
return jsonify({'success': False, 'error': 'Max depth must be an integer between 1 and 5'}), 400
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'Failed to get scanner instance.'}), 500
success = scanner.start_scan(target, max_depth, clear_graph=clear_graph, force_rescan_target=force_rescan_target)
if success:
return jsonify({
'success': True,
'message': 'Scan started successfully',
'scan_id': scanner.logger.session_id,
'user_session_id': user_session_id
})
else:
return jsonify({
'success': False,
'error': f'Failed to start scan (scanner status: {scanner.status})',
}), 409
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/scan/stop', methods=['POST'])
def stop_scan():
"""Stop the current scan."""
try:
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No scanner found for session'}), 404
if not scanner.session_id:
scanner.session_id = user_session_id
scanner.stop_scan()
session_manager.set_stop_signal(user_session_id)
session_manager.update_scanner_status(user_session_id, 'stopped')
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({
'success': True,
'message': 'Scan stop requested',
'user_session_id': user_session_id
})
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/scan/status', methods=['GET'])
def get_scan_status():
"""Get current scan status."""
try:
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({
'success': True,
'status': {
'status': 'idle', 'target_domain': None, 'current_depth': 0,
'max_depth': 0, 'progress_percentage': 0.0,
'user_session_id': user_session_id
}
})
if not scanner.session_id:
scanner.session_id = user_session_id
status = scanner.get_scan_status()
status['user_session_id'] = user_session_id
return jsonify({'success': True, 'status': status})
except Exception as e:
traceback.print_exc()
return jsonify({
'success': False, 'error': f'Internal server error: {str(e)}',
'fallback_status': {'status': 'error', 'progress_percentage': 0.0}
}), 500
@app.route('/api/graph', methods=['GET'])
def get_graph_data():
"""Get current graph data."""
try:
user_session_id, scanner = get_user_scanner()
empty_graph = {
'nodes': [], 'edges': [],
'statistics': {'node_count': 0, 'edge_count': 0}
}
if not scanner:
return jsonify({'success': True, 'graph': empty_graph, 'user_session_id': user_session_id})
graph_data = scanner.get_graph_data() or empty_graph
return jsonify({'success': True, 'graph': graph_data, 'user_session_id': user_session_id})
except Exception as e:
traceback.print_exc()
return jsonify({
'success': False, 'error': f'Internal server error: {str(e)}',
'fallback_graph': {'nodes': [], 'edges': [], 'statistics': {}}
}), 500
@app.route('/api/graph/large-entity/extract', methods=['POST'])
def extract_from_large_entity():
"""Extract a node from a large entity."""
try:
data = request.get_json()
large_entity_id = data.get('large_entity_id')
node_id = data.get('node_id')
if not large_entity_id or not node_id:
return jsonify({'success': False, 'error': 'Missing required parameters'}), 400
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No active session found'}), 404
success = scanner.extract_node_from_large_entity(large_entity_id, node_id)
if success:
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({'success': True, 'message': f'Node {node_id} extracted successfully.'})
else:
return jsonify({'success': False, 'error': f'Failed to extract node {node_id}.'}), 500
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/graph/node/<node_id>', methods=['DELETE'])
def delete_graph_node(node_id):
"""Delete a node from the graph."""
try:
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No active session found'}), 404
success = scanner.graph.remove_node(node_id)
if success:
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({'success': True, 'message': f'Node {node_id} deleted successfully.'})
else:
return jsonify({'success': False, 'error': f'Node {node_id} not found.'}), 404
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/graph/revert', methods=['POST'])
def revert_graph_action():
"""Reverts a graph action, such as re-adding a deleted node."""
try:
data = request.get_json()
if not data or 'type' not in data or 'data' not in data:
return jsonify({'success': False, 'error': 'Invalid revert request format'}), 400
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No active session found'}), 404
action_type = data['type']
action_data = data['data']
if action_type == 'delete':
node_to_add = action_data.get('node')
if node_to_add:
scanner.graph.add_node(
node_id=node_to_add['id'],
node_type=NodeType(node_to_add['type']),
attributes=node_to_add.get('attributes'),
description=node_to_add.get('description'),
metadata=node_to_add.get('metadata')
)
edges_to_add = action_data.get('edges', [])
for edge in edges_to_add:
if scanner.graph.graph.has_node(edge['from']) and scanner.graph.graph.has_node(edge['to']):
scanner.graph.add_edge(
source_id=edge['from'], target_id=edge['to'],
relationship_type=edge['metadata']['relationship_type'],
confidence_score=edge['metadata']['confidence_score'],
source_provider=edge['metadata']['source_provider'],
raw_data=edge.get('raw_data', {})
)
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({'success': True, 'message': 'Delete action reverted successfully.'})
return jsonify({'success': False, 'error': f'Unknown revert action type: {action_type}'}), 400
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/export', methods=['GET'])
def export_results():
"""Export scan results as a JSON file."""
try:
user_session_id, scanner = get_user_scanner()
results = scanner.export_results()
results['export_metadata'] = {
'user_session_id': user_session_id,
'export_timestamp': datetime.now(timezone.utc).isoformat(),
}
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
target = scanner.current_target or 'unknown'
filename = f"dnsrecon_{target}_{timestamp}.json"
json_data = json.dumps(results, indent=2)
file_obj = io.BytesIO(json_data.encode('utf-8'))
return send_file(
file_obj, as_attachment=True,
download_name=filename, mimetype='application/json'
)
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Export failed: {str(e)}'}), 500
@app.route('/api/config/api-keys', methods=['POST'])
def set_api_keys():
"""Set API keys for the current session."""
try:
data = request.get_json()
if data is None:
return jsonify({'success': False, 'error': 'No API keys provided'}), 400
user_session_id, scanner = get_user_scanner()
session_config = scanner.config
updated_providers = []
for provider_name, api_key in data.items():
api_key_value = str(api_key or '').strip()
success = session_config.set_api_key(provider_name.lower(), api_key_value)
if success:
updated_providers.append(provider_name)
if updated_providers:
scanner._initialize_providers()
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({
'success': True,
'message': f'API keys updated for: {", ".join(updated_providers)}',
'user_session_id': user_session_id
})
else:
return jsonify({'success': False, 'error': 'No valid API keys were provided.'}), 400
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/providers', methods=['GET'])
def get_providers():
"""Get enhanced information about available providers including API key sources."""
try:
user_session_id, scanner = get_user_scanner()
base_provider_info = scanner.get_provider_info()
# Enhance provider info with API key source information
enhanced_provider_info = {}
for provider_name, info in base_provider_info.items():
enhanced_info = dict(info) # Copy base info
if info['requires_api_key']:
# Determine API key source and configuration status
api_key = scanner.config.get_api_key(provider_name)
backend_api_key = os.getenv(f'{provider_name.upper()}_API_KEY')
if backend_api_key:
# API key configured via backend/environment
enhanced_info.update({
'api_key_configured': True,
'api_key_source': 'backend',
'api_key_help': f'API key configured via environment variable {provider_name.upper()}_API_KEY'
})
elif api_key:
# API key configured via web interface
enhanced_info.update({
'api_key_configured': True,
'api_key_source': 'frontend',
'api_key_help': f'API key set via web interface (session-only)'
})
else:
# No API key configured
enhanced_info.update({
'api_key_configured': False,
'api_key_source': None,
'api_key_help': f'Requires API key to enable {info["display_name"]} integration'
})
else:
# Provider doesn't require API key
enhanced_info.update({
'api_key_configured': True, # Always "configured" for non-API providers
'api_key_source': None,
'api_key_help': None
})
enhanced_provider_info[provider_name] = enhanced_info
return jsonify({
'success': True,
'providers': enhanced_provider_info,
'user_session_id': user_session_id
})
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/config/providers', methods=['POST'])
def configure_providers():
"""Configure provider settings (enable/disable)."""
try:
data = request.get_json()
if data is None:
return jsonify({'success': False, 'error': 'No provider settings provided'}), 400
user_session_id, scanner = get_user_scanner()
session_config = scanner.config
updated_providers = []
for provider_name, settings in data.items():
provider_name_clean = provider_name.lower().strip()
if 'enabled' in settings:
# Update the enabled state in session config
session_config.enabled_providers[provider_name_clean] = settings['enabled']
updated_providers.append(provider_name_clean)
if updated_providers:
# Reinitialize providers with new settings
scanner._initialize_providers()
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({
'success': True,
'message': f'Provider settings updated for: {", ".join(updated_providers)}',
'user_session_id': user_session_id
})
else:
return jsonify({'success': False, 'error': 'No valid provider settings were provided.'}), 400
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.errorhandler(404)
def not_found(error):
"""Handle 404 errors."""
return jsonify({'success': False, 'error': 'Endpoint not found'}), 404
@app.errorhandler(500)
def internal_error(error):
"""Handle 500 errors."""
traceback.print_exc()
return jsonify({'success': False, 'error': 'Internal server error'}), 500
if __name__ == '__main__':
config.load_from_env()
app.run(
host=config.flask_host,
port=config.flask_port,
debug=config.flask_debug,
threaded=True
)