dnscope/app.py
overcuriousity d0ee415f0d enhancements
2025-09-17 19:42:14 +02:00

378 lines
14 KiB
Python

# dnsrecon-reduced/app.py
"""
Flask application entry point for DNSRecon web interface.
Provides REST API endpoints and serves the web interface with user session support.
"""
import json
import traceback
from flask import Flask, render_template, request, jsonify, send_file, session
from datetime import datetime, timezone, timedelta
import io
from core.session_manager import session_manager
from config import config
from core.graph_manager import NodeType
from utils.helpers import is_valid_target
app = Flask(__name__)
app.config['SECRET_KEY'] = config.flask_secret_key
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=config.flask_permanent_session_lifetime_hours)
def get_user_scanner():
"""
Retrieves the scanner for the current session, or creates a new one if none exists.
"""
current_flask_session_id = session.get('dnsrecon_session_id')
if current_flask_session_id:
existing_scanner = session_manager.get_session(current_flask_session_id)
if existing_scanner:
return current_flask_session_id, existing_scanner
new_session_id = session_manager.create_session()
new_scanner = session_manager.get_session(new_session_id)
if not new_scanner:
raise Exception("Failed to create new scanner session")
session['dnsrecon_session_id'] = new_session_id
session.permanent = True
return new_session_id, new_scanner
@app.route('/')
def index():
"""Serve the main web interface."""
return render_template('index.html')
@app.route('/api/scan/start', methods=['POST'])
def start_scan():
"""
Starts a new reconnaissance scan.
"""
try:
data = request.get_json()
if not data or 'target' not in data:
return jsonify({'success': False, 'error': 'Missing target in request'}), 400
target = data['target'].strip()
max_depth = data.get('max_depth', config.default_recursion_depth)
clear_graph = data.get('clear_graph', True)
force_rescan_target = data.get('force_rescan_target', None)
if not target:
return jsonify({'success': False, 'error': 'Target cannot be empty'}), 400
if not is_valid_target(target):
return jsonify({'success': False, 'error': 'Invalid target format.'}), 400
if not isinstance(max_depth, int) or not 1 <= max_depth <= 5:
return jsonify({'success': False, 'error': 'Max depth must be an integer between 1 and 5'}), 400
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'Failed to get scanner instance.'}), 500
success = scanner.start_scan(target, max_depth, clear_graph=clear_graph, force_rescan_target=force_rescan_target)
if success:
return jsonify({
'success': True,
'message': 'Scan started successfully',
'scan_id': scanner.logger.session_id,
'user_session_id': user_session_id
})
else:
return jsonify({
'success': False,
'error': f'Failed to start scan (scanner status: {scanner.status})',
}), 409
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/scan/stop', methods=['POST'])
def stop_scan():
"""Stop the current scan."""
try:
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No scanner found for session'}), 404
if not scanner.session_id:
scanner.session_id = user_session_id
scanner.stop_scan()
session_manager.set_stop_signal(user_session_id)
session_manager.update_scanner_status(user_session_id, 'stopped')
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({
'success': True,
'message': 'Scan stop requested',
'user_session_id': user_session_id
})
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/scan/status', methods=['GET'])
def get_scan_status():
"""Get current scan status."""
try:
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({
'success': True,
'status': {
'status': 'idle', 'target_domain': None, 'current_depth': 0,
'max_depth': 0, 'progress_percentage': 0.0,
'user_session_id': user_session_id
}
})
if not scanner.session_id:
scanner.session_id = user_session_id
status = scanner.get_scan_status()
status['user_session_id'] = user_session_id
return jsonify({'success': True, 'status': status})
except Exception as e:
traceback.print_exc()
return jsonify({
'success': False, 'error': f'Internal server error: {str(e)}',
'fallback_status': {'status': 'error', 'progress_percentage': 0.0}
}), 500
@app.route('/api/graph', methods=['GET'])
def get_graph_data():
"""Get current graph data."""
try:
user_session_id, scanner = get_user_scanner()
empty_graph = {
'nodes': [], 'edges': [],
'statistics': {'node_count': 0, 'edge_count': 0}
}
if not scanner:
return jsonify({'success': True, 'graph': empty_graph, 'user_session_id': user_session_id})
graph_data = scanner.get_graph_data() or empty_graph
return jsonify({'success': True, 'graph': graph_data, 'user_session_id': user_session_id})
except Exception as e:
traceback.print_exc()
return jsonify({
'success': False, 'error': f'Internal server error: {str(e)}',
'fallback_graph': {'nodes': [], 'edges': [], 'statistics': {}}
}), 500
@app.route('/api/graph/large-entity/extract', methods=['POST'])
def extract_from_large_entity():
"""Extract a node from a large entity."""
try:
data = request.get_json()
large_entity_id = data.get('large_entity_id')
node_id = data.get('node_id')
if not large_entity_id or not node_id:
return jsonify({'success': False, 'error': 'Missing required parameters'}), 400
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No active session found'}), 404
success = scanner.extract_node_from_large_entity(large_entity_id, node_id)
if success:
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({'success': True, 'message': f'Node {node_id} extracted successfully.'})
else:
return jsonify({'success': False, 'error': f'Failed to extract node {node_id}.'}), 500
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/graph/node/<node_id>', methods=['DELETE'])
def delete_graph_node(node_id):
"""Delete a node from the graph."""
try:
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No active session found'}), 404
success = scanner.graph.remove_node(node_id)
if success:
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({'success': True, 'message': f'Node {node_id} deleted successfully.'})
else:
return jsonify({'success': False, 'error': f'Node {node_id} not found.'}), 404
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/graph/revert', methods=['POST'])
def revert_graph_action():
"""Reverts a graph action, such as re-adding a deleted node."""
try:
data = request.get_json()
if not data or 'type' not in data or 'data' not in data:
return jsonify({'success': False, 'error': 'Invalid revert request format'}), 400
user_session_id, scanner = get_user_scanner()
if not scanner:
return jsonify({'success': False, 'error': 'No active session found'}), 404
action_type = data['type']
action_data = data['data']
if action_type == 'delete':
node_to_add = action_data.get('node')
if node_to_add:
scanner.graph.add_node(
node_id=node_to_add['id'],
node_type=NodeType(node_to_add['type']),
attributes=node_to_add.get('attributes'),
description=node_to_add.get('description'),
metadata=node_to_add.get('metadata')
)
edges_to_add = action_data.get('edges', [])
for edge in edges_to_add:
if scanner.graph.graph.has_node(edge['from']) and scanner.graph.graph.has_node(edge['to']):
scanner.graph.add_edge(
source_id=edge['from'], target_id=edge['to'],
relationship_type=edge['metadata']['relationship_type'],
confidence_score=edge['metadata']['confidence_score'],
source_provider=edge['metadata']['source_provider'],
raw_data=edge.get('raw_data', {})
)
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({'success': True, 'message': 'Delete action reverted successfully.'})
return jsonify({'success': False, 'error': f'Unknown revert action type: {action_type}'}), 400
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/export', methods=['GET'])
def export_results():
"""Export scan results as a JSON file."""
try:
user_session_id, scanner = get_user_scanner()
results = scanner.export_results()
results['export_metadata'] = {
'user_session_id': user_session_id,
'export_timestamp': datetime.now(timezone.utc).isoformat(),
}
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
target = scanner.current_target or 'unknown'
filename = f"dnsrecon_{target}_{timestamp}.json"
json_data = json.dumps(results, indent=2)
file_obj = io.BytesIO(json_data.encode('utf-8'))
return send_file(
file_obj, as_attachment=True,
download_name=filename, mimetype='application/json'
)
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Export failed: {str(e)}'}), 500
@app.route('/api/providers', methods=['GET'])
def get_providers():
"""Get information about available providers."""
try:
user_session_id, scanner = get_user_scanner()
provider_info = scanner.get_provider_info()
return jsonify({'success': True, 'providers': provider_info, 'user_session_id': user_session_id})
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.route('/api/config/api-keys', methods=['POST'])
def set_api_keys():
"""Set API keys for the current session."""
try:
data = request.get_json()
if data is None:
return jsonify({'success': False, 'error': 'No API keys provided'}), 400
user_session_id, scanner = get_user_scanner()
session_config = scanner.config
updated_providers = []
for provider_name, api_key in data.items():
api_key_value = str(api_key or '').strip()
success = session_config.set_api_key(provider_name.lower(), api_key_value)
if success:
updated_providers.append(provider_name)
if updated_providers:
scanner._initialize_providers()
session_manager.update_session_scanner(user_session_id, scanner)
return jsonify({
'success': True,
'message': f'API keys updated for: {", ".join(updated_providers)}',
'user_session_id': user_session_id
})
else:
return jsonify({'success': False, 'error': 'No valid API keys were provided.'}), 400
except Exception as e:
traceback.print_exc()
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500
@app.errorhandler(404)
def not_found(error):
"""Handle 404 errors."""
return jsonify({'success': False, 'error': 'Endpoint not found'}), 404
@app.errorhandler(500)
def internal_error(error):
"""Handle 500 errors."""
traceback.print_exc()
return jsonify({'success': False, 'error': 'Internal server error'}), 500
if __name__ == '__main__':
config.load_from_env()
app.run(
host=config.flask_host,
port=config.flask_port,
debug=config.flask_debug,
threaded=True
)