dnscope/app.py
2025-09-18 00:13:37 +02:00

618 lines
23 KiB
Python

# dnsrecon-reduced/app.py
"""
Flask application entry point for DNSRecon web interface.
Provides REST API endpoints and serves the web interface with user session support.
"""
import json
import traceback
from flask import Flask, render_template, request, jsonify, send_file, session
from datetime import datetime, timezone, timedelta
import io
import os
from core.session_manager import session_manager
from config import config
from core.graph_manager import NodeType
from utils.helpers import is_valid_target
from decimal import Decimal
app = Flask(__name__)
app.config['SECRET_KEY'] = config.flask_secret_key
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=config.flask_permanent_session_lifetime_hours)
def get_user_scanner():
"""
Retrieves the scanner for the current session, or creates a new one if none exists.
"""
current_flask_session_id = session.get('dnsrecon_session_id')
if current_flask_session_id:
existing_scanner = session_manager.get_session(current_flask_session_id)
if existing_scanner:
return current_flask_session_id, existing_scanner
new_session_id = session_manager.create_session()
new_scanner = session_manager.get_session(new_session_id)
if not new_scanner:
raise Exception("Failed to create new scanner session")
session['dnsrecon_session_id'] = new_session_id
session.permanent = True
return new_session_id, new_scanner
class CustomJSONEncoder(json.JSONEncoder):
"""Custom JSON encoder to handle non-serializable objects."""
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, set):
return list(obj)
elif isinstance(obj, Decimal):
return float(obj)
elif hasattr(obj, '__dict__'):
# For custom objects, try to serialize their dict representation
try:
return obj.__dict__
except:
return str(obj)
elif hasattr(obj, 'value') and hasattr(obj, 'name'):
# For enum objects
return obj.value
else:
# For any other non-serializable object, convert to string
return str(obj)
@app.route('/')
def index():
"""Serve the main web interface."""
return render_template('index.html')
@app.route('/api/scan/start', methods=['POST'])
def start_scan():
"""
Starts a new reconnaissance scan.
"""
try:
data = request.get_json()
if not data or 'target' not in data:
return jsonify({'success': False, 'error': 'Missing target in request'}), 400
target = data['target'].strip()
max_depth = data.get('max_depth', config.default_recursion_depth)
clear_graph = data.get('clear_graph', True)
force_rescan_target = data.get('force_rescan_target', None)
if not target:
return jsonify({'success': False, 'error': 'Target cannot be empty'}), 400
if not is_valid_target(target):
return jsonify({'success': False, 'error': 'Invalid target format.'}), 400
if not isinstance(max_depth, int) or not 1 <= max_depth <= 5:
return jsonify({'success': False, 'error': 'Max depth must be an integer between 1 and 5'}), 400
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'Failed to get scanner instance.'}), 500
success = scanner.start_scan(target, max_depth, clear_graph=clear_graph, force_rescan_target=force_rescan_target)
if success:
return jsonify({
'success': True,
'message': 'Scan started successfully',
'scan_id': scanner.logger.session_id,
'user_session_id': user_session_id
})
else:
return jsonify({
'success': False,
'error': f'Failed to start scan (scanner status: {scanner.status})',
}), 409
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/scan/stop', methods=['POST'])
def stop_scan():
"""Stop the current scan."""
try:
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No scanner found for session'}), 404
if not scanner.session_id:
scanner.session_id = user_session_id
scanner.stop_scan()
session_manager.set_stop_signal(user_session_id)
session_manager.update_scanner_status(user_session_id, 'stopped')
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({
'success': True,
'message': 'Scan stop requested',
'user_session_id': user_session_id
})
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/scan/status', methods=['GET'])
def get_scan_status():
"""Get current scan status."""
try:
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({
'success': True,
'status': {
'status': 'idle', 'target_domain': None, 'current_depth': 0,
'max_depth': 0, 'progress_percentage': 0.0,
'user_session_id': user_session_id
}
})
if not scanner.session_id:
scanner.session_id = user_session_id
status = scanner.get_scan_status()
status['user_session_id'] = user_session_id
return jsonify({'success': True, 'status': status})
except Exception as e:
traceback.print_exc()
return jsonify({
'success': False, 'error': f'Internal server error: {str(e)}',
'fallback_status': {'status': 'error', 'progress_percentage': 0.0}
}), 500
@app.route('/api/graph', methods=['GET'])
def get_graph_data():
"""Get current graph data."""
try:
user_session_id, scanner = get_user_scanner()
empty_graph = {
'nodes': [], 'edges': [],
'statistics': {'node_count': 0, 'edge_count': 0}
}
if not scanner:
return jsonify({'success': True, 'graph': empty_graph, 'user_session_id': user_session_id})
graph_data = scanner.get_graph_data() or empty_graph
return jsonify({'success': True, 'graph': graph_data, 'user_session_id': user_session_id})
except Exception as e:
traceback.print_exc()
return jsonify({
'success': False, 'error': f'Internal server error: {str(e)}',
'fallback_graph': {'nodes': [], 'edges': [], 'statistics': {}}
}), 500
@app.route('/api/graph/large-entity/extract', methods=['POST'])
def extract_from_large_entity():
"""Extract a node from a large entity."""
try:
data = request.get_json()
large_entity_id = data.get('large_entity_id')
node_id = data.get('node_id')
if not large_entity_id or not node_id:
return jsonify({'success': False, 'error': 'Missing required parameters'}), 400
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No active session found'}), 404
success = scanner.extract_node_from_large_entity(large_entity_id, node_id)
if success:
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({'success': True, 'message': f'Node {node_id} extracted successfully.'})
else:
return jsonify({'success': False, 'error': f'Failed to extract node {node_id}.'}), 500
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/graph/node/<node_id>', methods=['DELETE'])
def delete_graph_node(node_id):
"""Delete a node from the graph."""
try:
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No active session found'}), 404
success = scanner.graph.remove_node(node_id)
if success:
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({'success': True, 'message': f'Node {node_id} deleted successfully.'})
else:
return jsonify({'success': False, 'error': f'Node {node_id} not found.'}), 404
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/graph/revert', methods=['POST'])
def revert_graph_action():
"""Reverts a graph action, such as re-adding a deleted node."""
try:
data = request.get_json()
if not data or 'type' not in data or 'data' not in data:
return jsonify({'success': False, 'error': 'Invalid revert request format'}), 400
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No active session found'}), 404
action_type = data['type']
action_data = data['data']
if action_type == 'delete':
node_to_add = action_data.get('node')
if node_to_add:
scanner.graph.add_node(
node_id=node_to_add['id'],
node_type=NodeType(node_to_add['type']),
attributes=node_to_add.get('attributes'),
description=node_to_add.get('description'),
metadata=node_to_add.get('metadata')
)
edges_to_add = action_data.get('edges', [])
for edge in edges_to_add:
if scanner.graph.graph.has_node(edge['from']) and scanner.graph.graph.has_node(edge['to']):
scanner.graph.add_edge(
source_id=edge['from'], target_id=edge['to'],
relationship_type=edge['metadata']['relationship_type'],
confidence_score=edge['metadata']['confidence_score'],
source_provider=edge['metadata']['source_provider'],
raw_data=edge.get('raw_data', {})
)
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({'success': True, 'message': 'Delete action reverted successfully.'})
return jsonify({'success': False, 'error': f'Unknown revert action type: {action_type}'}), 400
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/export', methods=['GET'])
def export_results():
"""Export scan results as a JSON file with improved error handling."""
try:
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No active scanner session found'}), 404
# Get export data with error handling
try:
results = scanner.export_results()
except Exception as e:
return jsonify({'success': False, 'error': f'Failed to gather export data: {str(e)}'}), 500
# Add export metadata
results['export_metadata'] = {
'user_session_id': user_session_id,
'export_timestamp': datetime.now(timezone.utc).isoformat(),
'export_version': '1.0.0',
'forensic_integrity': 'maintained'
}
# Generate filename with forensic naming convention
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
target = scanner.current_target or 'unknown'
# Sanitize target for filename
safe_target = "".join(c for c in target if c.isalnum() or c in ('-', '_', '.')).rstrip()
filename = f"dnsrecon_{safe_target}_{timestamp}.json"
# Serialize with custom encoder and error handling
try:
json_data = json.dumps(results, indent=2, cls=CustomJSONEncoder, ensure_ascii=False)
except Exception as e:
# If custom encoder fails, try a more aggressive approach
try:
# Convert problematic objects to strings recursively
cleaned_results = _clean_for_json(results)
json_data = json.dumps(cleaned_results, indent=2, ensure_ascii=False)
except Exception as e2:
return jsonify({
'success': False,
'error': f'JSON serialization failed: {str(e2)}'
}), 500
# Create file object
file_obj = io.BytesIO(json_data.encode('utf-8'))
return send_file(
file_obj,
as_attachment=True,
download_name=filename,
mimetype='application/json'
)
except Exception as e:
traceback.print_exc()
return jsonify({
'success': False,
'error': f'Export failed: {str(e)}',
'error_type': type(e).__name__
}), 500
@app.route('/api/export/targets', methods=['GET'])
def export_targets():
"""Export all discovered targets as a TXT file."""
try:
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No active scanner session found'}), 404
targets_txt = scanner.export_targets_txt()
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
safe_target = "".join(c for c in (scanner.current_target or 'unknown') if c.isalnum() or c in ('-', '_', '.')).rstrip()
filename = f"dnsrecon_targets_{safe_target}_{timestamp}.txt"
file_obj = io.BytesIO(targets_txt.encode('utf-8'))
return send_file(
file_obj,
as_attachment=True,
download_name=filename,
mimetype='text/plain'
)
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Export failed: {str(e)}'}), 500
@app.route('/api/export/summary', methods=['GET'])
def export_summary():
"""Export an executive summary as a TXT file."""
try:
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No active scanner session found'}), 404
summary_txt = scanner.generate_executive_summary()
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
safe_target = "".join(c for c in (scanner.current_target or 'unknown') if c.isalnum() or c in ('-', '_', '.')).rstrip()
filename = f"dnsrecon_summary_{safe_target}_{timestamp}.txt"
file_obj = io.BytesIO(summary_txt.encode('utf-8'))
return send_file(
file_obj,
as_attachment=True,
download_name=filename,
mimetype='text/plain'
)
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Export failed: {str(e)}'}), 500
def _clean_for_json(obj, max_depth=10, current_depth=0):
"""
Recursively clean an object to make it JSON serializable.
Handles circular references and problematic object types.
"""
if current_depth > max_depth:
return f"<max_depth_exceeded_{type(obj).__name__}>"
if obj is None or isinstance(obj, (bool, int, float, str)):
return obj
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, (set, frozenset)):
return list(obj)
elif isinstance(obj, dict):
cleaned = {}
for key, value in obj.items():
try:
# Ensure key is string
clean_key = str(key) if not isinstance(key, str) else key
cleaned[clean_key] = _clean_for_json(value, max_depth, current_depth + 1)
except Exception:
cleaned[str(key)] = f"<serialization_error_{type(value).__name__}>"
return cleaned
elif isinstance(obj, (list, tuple)):
cleaned = []
for item in obj:
try:
cleaned.append(_clean_for_json(item, max_depth, current_depth + 1))
except Exception:
cleaned.append(f"<serialization_error_{type(item).__name__}>")
return cleaned
elif hasattr(obj, '__dict__'):
try:
return _clean_for_json(obj.__dict__, max_depth, current_depth + 1)
except Exception:
return str(obj)
elif hasattr(obj, 'value'):
# For enum-like objects
return obj.value
else:
return str(obj)
@app.route('/api/config/api-keys', methods=['POST'])
def set_api_keys():
"""Set API keys for the current session."""
try:
data = request.get_json()
if data is None:
return jsonify({'success': False, 'error': 'No API keys provided'}), 400
user_session_id, scanner = get_user_scanner()
session_config = scanner.config
updated_providers = []
for provider_name, api_key in data.items():
api_key_value = str(api_key or '').strip()
success = session_config.set_api_key(provider_name.lower(), api_key_value)
if success:
updated_providers.append(provider_name)
if updated_providers:
scanner._initialize_providers()
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({
'success': True,
'message': f'API keys updated for: {", ".join(updated_providers)}',
'user_session_id': user_session_id
})
else:
return jsonify({'success': False, 'error': 'No valid API keys were provided.'}), 400
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/providers', methods=['GET'])
def get_providers():
"""Get enhanced information about available providers including API key sources."""
try:
user_session_id, scanner = get_user_scanner()
base_provider_info = scanner.get_provider_info()
# Enhance provider info with API key source information
enhanced_provider_info = {}
for provider_name, info in base_provider_info.items():
enhanced_info = dict(info) # Copy base info
if info['requires_api_key']:
# Determine API key source and configuration status
api_key = scanner.config.get_api_key(provider_name)
backend_api_key = os.getenv(f'{provider_name.upper()}_API_KEY')
if backend_api_key:
# API key configured via backend/environment
enhanced_info.update({
'api_key_configured': True,
'api_key_source': 'backend',
'api_key_help': f'API key configured via environment variable {provider_name.upper()}_API_KEY'
})
elif api_key:
# API key configured via web interface
enhanced_info.update({
'api_key_configured': True,
'api_key_source': 'frontend',
'api_key_help': f'API key set via web interface (session-only)'
})
else:
# No API key configured
enhanced_info.update({
'api_key_configured': False,
'api_key_source': None,
'api_key_help': f'Requires API key to enable {info["display_name"]} integration'
})
else:
# Provider doesn't require API key
enhanced_info.update({
'api_key_configured': True, # Always "configured" for non-API providers
'api_key_source': None,
'api_key_help': None
})
enhanced_provider_info[provider_name] = enhanced_info
return jsonify({
'success': True,
'providers': enhanced_provider_info,
'user_session_id': user_session_id
})
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/config/providers', methods=['POST'])
def configure_providers():
"""Configure provider settings (enable/disable)."""
try:
data = request.get_json()
if data is None:
return jsonify({'success': False, 'error': 'No provider settings provided'}), 400
user_session_id, scanner = get_user_scanner()
session_config = scanner.config
updated_providers = []
for provider_name, settings in data.items():
provider_name_clean = provider_name.lower().strip()
if 'enabled' in settings:
# Update the enabled state in session config
session_config.enabled_providers[provider_name_clean] = settings['enabled']
updated_providers.append(provider_name_clean)
if updated_providers:
# Reinitialize providers with new settings
scanner._initialize_providers()
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({
'success': True,
'message': f'Provider settings updated for: {", ".join(updated_providers)}',
'user_session_id': user_session_id
})
else:
return jsonify({'success': False, 'error': 'No valid provider settings were provided.'}), 400
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.errorhandler(404)
def not_found(error):
"""Handle 404 errors."""
return jsonify({'success': False, 'error': 'Endpoint not found'}), 404
@app.errorhandler(500)
def internal_error(error):
"""Handle 500 errors."""
traceback.print_exc()
return jsonify({'success': False, 'error': 'Internal server error'}), 500
if __name__ == '__main__':
config.load_from_env()
app.run(
host=config.flask_host,
port=config.flask_port,
debug=config.flask_debug,
threaded=True
)