# dnsrecon/providers/dns_provider.py import dns.resolver import dns.reversename from typing import List, Dict, Any, Tuple from .base_provider import BaseProvider from utils.helpers import _is_valid_ip, _is_valid_domain from core.graph_manager import RelationshipType class DNSProvider(BaseProvider): """ Provider for standard DNS resolution and reverse DNS lookups. Now uses session-specific configuration. """ def __init__(self, session_config=None): """Initialize DNS provider with session-specific configuration.""" super().__init__( name="dns", rate_limit=100, timeout=10, session_config=session_config ) # Configure DNS resolver self.resolver = dns.resolver.Resolver() self.resolver.timeout = 5 self.resolver.lifetime = 10 def get_name(self) -> str: """Return the provider name.""" return "dns" def is_available(self) -> bool: """DNS is always available - no API key required.""" return True def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]: """ Query DNS records for the domain to discover relationships. Args: domain: Domain to investigate Returns: List of relationships discovered from DNS analysis """ if not _is_valid_domain(domain): return [] relationships = [] # Query all record types for record_type in ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'SOA', 'TXT', 'SRV', 'CAA', 'DNSKEY', 'DS', 'RRSIG', 'SSHFP', 'TLSA', 'NAPTR', 'SPF']: relationships.extend(self._query_record(domain, record_type)) return relationships def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]: """ Query reverse DNS for the IP address. Args: ip: IP address to investigate Returns: List of relationships discovered from reverse DNS """ if not _is_valid_ip(ip): return [] relationships = [] try: # Perform reverse DNS lookup self.total_requests += 1 reverse_name = dns.reversename.from_address(ip) response = self.resolver.resolve(reverse_name, 'PTR') self.successful_requests += 1 for ptr_record in response: hostname = str(ptr_record).rstrip('.') if _is_valid_domain(hostname): raw_data = { 'query_type': 'PTR', 'ip_address': ip, 'hostname': hostname, 'ttl': response.ttl } relationships.append(( ip, hostname, RelationshipType.PTR_RECORD, RelationshipType.PTR_RECORD.default_confidence, raw_data )) self.log_relationship_discovery( source_node=ip, target_node=hostname, relationship_type=RelationshipType.PTR_RECORD, confidence_score=RelationshipType.PTR_RECORD.default_confidence, raw_data=raw_data, discovery_method="reverse_dns_lookup" ) except Exception as e: self.failed_requests += 1 self.logger.logger.debug(f"Reverse DNS lookup failed for {ip}: {e}") return relationships def _query_record(self, domain: str, record_type: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]: """ Query a specific type of DNS record for the domain. """ relationships = [] try: self.total_requests += 1 response = self.resolver.resolve(domain, record_type) self.successful_requests += 1 for record in response: target = "" if record_type in ['A', 'AAAA']: target = str(record) elif record_type in ['CNAME', 'NS', 'PTR']: target = str(record.target).rstrip('.') elif record_type == 'MX': target = str(record.exchange).rstrip('.') elif record_type == 'SOA': target = str(record.mname).rstrip('.') elif record_type in ['TXT', 'SPF']: target = b' '.join(record.strings).decode('utf-8', 'ignore') elif record_type == 'SRV': target = str(record.target).rstrip('.') elif record_type == 'CAA': target = f"{record.flags} {record.tag.decode('utf-8')} \"{record.value.decode('utf-8')}\"" else: target = str(record) if target: raw_data = { 'query_type': record_type, 'domain': domain, 'value': target, 'ttl': response.ttl } try: relationship_type_enum = getattr(RelationshipType, f"{record_type}_RECORD") relationships.append(( domain, target, relationship_type_enum, relationship_type_enum.default_confidence, raw_data )) self.log_relationship_discovery( source_node=domain, target_node=target, relationship_type=relationship_type_enum, confidence_score=relationship_type_enum.default_confidence, raw_data=raw_data, discovery_method=f"dns_{record_type.lower()}_record" ) except AttributeError: self.logger.logger.error(f"Unsupported record type '{record_type}' encountered for domain {domain}") except Exception as e: self.failed_requests += 1 self.logger.logger.debug(f"{record_type} record query failed for {domain}: {e}") return relationships