185 lines
6.7 KiB
Python
185 lines
6.7 KiB
Python
# dnsrecon/providers/dns_provider.py
|
|
|
|
import dns.resolver
|
|
import dns.reversename
|
|
from typing import List, Dict, Any, Tuple
|
|
from .base_provider import BaseProvider
|
|
from utils.helpers import _is_valid_ip, _is_valid_domain
|
|
from core.graph_manager import RelationshipType
|
|
|
|
|
|
class DNSProvider(BaseProvider):
|
|
"""
|
|
Provider for standard DNS resolution and reverse DNS lookups.
|
|
Now uses session-specific configuration.
|
|
"""
|
|
|
|
def __init__(self, session_config=None):
|
|
"""Initialize DNS provider with session-specific configuration."""
|
|
super().__init__(
|
|
name="dns",
|
|
rate_limit=100,
|
|
timeout=10,
|
|
session_config=session_config
|
|
)
|
|
|
|
# Configure DNS resolver
|
|
self.resolver = dns.resolver.Resolver()
|
|
self.resolver.timeout = 5
|
|
self.resolver.lifetime = 10
|
|
#self.resolver.nameservers = ['127.0.0.1']
|
|
|
|
def get_name(self) -> str:
|
|
"""Return the provider name."""
|
|
return "dns"
|
|
|
|
def is_available(self) -> bool:
|
|
"""DNS is always available - no API key required."""
|
|
return True
|
|
|
|
def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
|
|
"""
|
|
Query DNS records for the domain to discover relationships.
|
|
|
|
Args:
|
|
domain: Domain to investigate
|
|
|
|
Returns:
|
|
List of relationships discovered from DNS analysis
|
|
"""
|
|
if not _is_valid_domain(domain):
|
|
return []
|
|
|
|
relationships = []
|
|
|
|
# Query all record types
|
|
for record_type in ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'SOA', 'TXT', 'SRV', 'CAA']:
|
|
relationships.extend(self._query_record(domain, record_type))
|
|
|
|
return relationships
|
|
|
|
def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
|
|
"""
|
|
Query reverse DNS for the IP address.
|
|
|
|
Args:
|
|
ip: IP address to investigate
|
|
|
|
Returns:
|
|
List of relationships discovered from reverse DNS
|
|
"""
|
|
if not _is_valid_ip(ip):
|
|
return []
|
|
|
|
relationships = []
|
|
|
|
try:
|
|
# Perform reverse DNS lookup
|
|
self.total_requests += 1
|
|
reverse_name = dns.reversename.from_address(ip)
|
|
response = self.resolver.resolve(reverse_name, 'PTR')
|
|
self.successful_requests += 1
|
|
|
|
for ptr_record in response:
|
|
hostname = str(ptr_record).rstrip('.')
|
|
|
|
if _is_valid_domain(hostname):
|
|
raw_data = {
|
|
'query_type': 'PTR',
|
|
'ip_address': ip,
|
|
'hostname': hostname,
|
|
'ttl': response.ttl
|
|
}
|
|
|
|
relationships.append((
|
|
ip,
|
|
hostname,
|
|
RelationshipType.PTR_RECORD,
|
|
RelationshipType.PTR_RECORD.default_confidence,
|
|
raw_data
|
|
))
|
|
|
|
self.log_relationship_discovery(
|
|
source_node=ip,
|
|
target_node=hostname,
|
|
relationship_type=RelationshipType.PTR_RECORD,
|
|
confidence_score=RelationshipType.PTR_RECORD.default_confidence,
|
|
raw_data=raw_data,
|
|
discovery_method="reverse_dns_lookup"
|
|
)
|
|
|
|
except Exception as e:
|
|
self.failed_requests += 1
|
|
self.logger.logger.debug(f"Reverse DNS lookup failed for {ip}: {e}")
|
|
|
|
return relationships
|
|
|
|
def _query_record(self, domain: str, record_type: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
|
|
"""
|
|
Query a specific type of DNS record for the domain.
|
|
"""
|
|
relationships = []
|
|
try:
|
|
self.total_requests += 1
|
|
response = self.resolver.resolve(domain, record_type)
|
|
self.successful_requests += 1
|
|
|
|
for record in response:
|
|
target = ""
|
|
if record_type in ['A', 'AAAA']:
|
|
target = str(record)
|
|
elif record_type in ['CNAME', 'NS', 'PTR']:
|
|
target = str(record.target).rstrip('.')
|
|
elif record_type == 'MX':
|
|
target = str(record.exchange).rstrip('.')
|
|
elif record_type == 'SOA':
|
|
target = str(record.mname).rstrip('.')
|
|
elif record_type in ['TXT']:
|
|
target = b' '.join(record.strings).decode('utf-8', 'ignore')
|
|
elif record_type == 'SRV':
|
|
target = str(record.target).rstrip('.')
|
|
elif record_type == 'CAA':
|
|
target = f"{record.flags} {record.tag.decode('utf-8')} \"{record.value.decode('utf-8')}\""
|
|
else:
|
|
target = str(record)
|
|
|
|
|
|
if target:
|
|
raw_data = {
|
|
'query_type': record_type,
|
|
'domain': domain,
|
|
'value': target,
|
|
'ttl': response.ttl
|
|
}
|
|
try:
|
|
relationship_type_enum_name = f"{record_type}_RECORD"
|
|
# Handle TXT records as metadata, not relationships
|
|
if record_type == 'TXT':
|
|
relationship_type_enum = RelationshipType.A_RECORD # Dummy value, won't be used
|
|
else:
|
|
relationship_type_enum = getattr(RelationshipType, relationship_type_enum_name)
|
|
|
|
relationships.append((
|
|
domain,
|
|
target,
|
|
relationship_type_enum,
|
|
relationship_type_enum.default_confidence,
|
|
raw_data
|
|
))
|
|
|
|
self.log_relationship_discovery(
|
|
source_node=domain,
|
|
target_node=target,
|
|
relationship_type=relationship_type_enum,
|
|
confidence_score=relationship_type_enum.default_confidence,
|
|
raw_data=raw_data,
|
|
discovery_method=f"dns_{record_type.lower()}_record"
|
|
)
|
|
except AttributeError:
|
|
self.logger.logger.error(f"Unsupported record type '{record_type}' encountered for domain {domain}")
|
|
|
|
except Exception as e:
|
|
self.failed_requests += 1
|
|
self.logger.logger.debug(f"{record_type} record query failed for {domain}: {e}")
|
|
|
|
return relationships |