# dnsrecon/providers/dns_provider.py from dns import resolver, reversename from typing import Dict from .base_provider import BaseProvider from core.provider_result import ProviderResult from utils.helpers import _is_valid_ip, _is_valid_domain, get_ip_version class DNSProvider(BaseProvider): """ Provider for standard DNS resolution and reverse DNS lookups. Now returns standardized ProviderResult objects with IPv4 and IPv6 support. """ def __init__(self, name=None, session_config=None): """Initialize DNS provider with session-specific configuration.""" super().__init__( name="dns", rate_limit=100, timeout=10, session_config=session_config ) # Configure DNS resolver self.resolver = resolver.Resolver() self.resolver.timeout = 5 self.resolver.lifetime = 10 def get_name(self) -> str: """Return the provider name.""" return "dns" def get_display_name(self) -> str: """Return the provider display name for the UI.""" return "DNS" def requires_api_key(self) -> bool: """Return True if the provider requires an API key.""" return False def get_eligibility(self) -> Dict[str, bool]: """Return a dictionary indicating if the provider can query domains and/or IPs.""" return {'domains': True, 'ips': True} def is_available(self) -> bool: """DNS is always available - no API key required.""" return True def query_domain(self, domain: str) -> ProviderResult: """ Query DNS records for the domain to discover relationships and attributes. FIXED: Now creates separate attributes for each DNS record type. Args: domain: Domain to investigate Returns: ProviderResult containing discovered relationships and attributes """ if not _is_valid_domain(domain): return ProviderResult() result = ProviderResult() # Query all record types - each gets its own attribute for record_type in ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'SOA', 'TXT', 'SRV', 'CAA']: try: self._query_record(domain, record_type, result) #except resolver.NoAnswer: # This is not an error, just a confirmation that the record doesn't exist. #self.logger.logger.debug(f"No {record_type} record found for {domain}") except Exception as e: self.failed_requests += 1 self.logger.logger.debug(f"{record_type} record query failed for {domain}: {e}") return result def query_ip(self, ip: str) -> ProviderResult: """ Query reverse DNS for the IP address (supports both IPv4 and IPv6). Args: ip: IP address to investigate (IPv4 or IPv6) Returns: ProviderResult containing discovered relationships and attributes """ if not _is_valid_ip(ip): return ProviderResult() result = ProviderResult() ip_version = get_ip_version(ip) try: # Perform reverse DNS lookup (works for both IPv4 and IPv6) self.total_requests += 1 reverse_name = reversename.from_address(ip) response = self.resolver.resolve(reverse_name, 'PTR') self.successful_requests += 1 ptr_records = [] for ptr_record in response: hostname = str(ptr_record).rstrip('.') if _is_valid_domain(hostname): # Determine appropriate forward relationship type based on IP version if ip_version == 6: relationship_type = 'dns_aaaa_record' record_prefix = 'AAAA' else: relationship_type = 'dns_a_record' record_prefix = 'A' # Add the relationship result.add_relationship( source_node=ip, target_node=hostname, relationship_type='dns_ptr_record', provider=self.name, confidence=0.8, raw_data={ 'query_type': 'PTR', 'ip_address': ip, 'ip_version': ip_version, 'hostname': hostname, 'ttl': response.ttl } ) # Add to PTR records list ptr_records.append(f"PTR: {hostname}") # Log the relationship discovery self.log_relationship_discovery( source_node=ip, target_node=hostname, relationship_type='dns_ptr_record', confidence_score=0.8, raw_data={ 'query_type': 'PTR', 'ip_address': ip, 'ip_version': ip_version, 'hostname': hostname, 'ttl': response.ttl }, discovery_method=f"reverse_dns_lookup_ipv{ip_version}" ) # Add PTR records as separate attribute if ptr_records: result.add_attribute( target_node=ip, name='ptr_records', # Specific name for PTR records value=ptr_records, attr_type='dns_record', provider=self.name, confidence=0.8, metadata={'ttl': response.ttl, 'ip_version': ip_version} ) except resolver.NXDOMAIN: self.failed_requests += 1 self.logger.logger.debug(f"Reverse DNS lookup failed for {ip}: NXDOMAIN") except Exception as e: self.failed_requests += 1 self.logger.logger.debug(f"Reverse DNS lookup failed for {ip}: {e}") # Re-raise the exception so the scanner can handle the failure raise e return result def _query_record(self, domain: str, record_type: str, result: ProviderResult) -> None: """ FIXED: Query DNS records with unique attribute names for each record type. Enhanced to better handle IPv6 AAAA records. """ try: self.total_requests += 1 response = self.resolver.resolve(domain, record_type) self.successful_requests += 1 dns_records = [] for record in response: target = "" if record_type in ['A', 'AAAA']: target = str(record) # Validate that the IP address is properly formed if not _is_valid_ip(target): self.logger.logger.debug(f"Invalid IP address in {record_type} record: {target}") continue elif record_type in ['CNAME', 'NS', 'PTR']: target = str(record.target).rstrip('.') elif record_type == 'MX': target = str(record.exchange).rstrip('.') elif record_type == 'SOA': target = str(record.mname).rstrip('.') elif record_type in ['TXT']: # Keep raw TXT record value txt_value = str(record).strip('"') dns_records.append(txt_value) # Just the value for TXT continue elif record_type == 'SRV': target = str(record.target).rstrip('.') elif record_type == 'CAA': # Keep raw CAA record format caa_value = f"{record.flags} {record.tag.decode('utf-8')} \"{record.value.decode('utf-8')}\"" dns_records.append(caa_value) # Just the value for CAA continue else: target = str(record) if target: # Determine IP version for metadata if this is an IP record ip_version = None if record_type in ['A', 'AAAA'] and _is_valid_ip(target): ip_version = get_ip_version(target) raw_data = { 'query_type': record_type, 'domain': domain, 'value': target, 'ttl': response.ttl } if ip_version: raw_data['ip_version'] = ip_version relationship_type = f"dns_{record_type.lower()}_record" confidence = 0.8 # Add relationship result.add_relationship( source_node=domain, target_node=target, relationship_type=relationship_type, provider=self.name, confidence=confidence, raw_data=raw_data ) # Add target to records list dns_records.append(target) # Log relationship discovery with IP version info discovery_method = f"dns_{record_type.lower()}_record" if ip_version: discovery_method += f"_ipv{ip_version}" self.log_relationship_discovery( source_node=domain, target_node=target, relationship_type=relationship_type, confidence_score=confidence, raw_data=raw_data, discovery_method=discovery_method ) # FIXED: Create attribute with specific name for each record type if dns_records: # Use record type specific attribute name (e.g., 'a_records', 'mx_records', etc.) attribute_name = f"{record_type.lower()}_records" metadata = {'record_type': record_type, 'ttl': response.ttl} # Add IP version info for A/AAAA records if record_type in ['A', 'AAAA'] and dns_records: first_ip_version = get_ip_version(dns_records[0]) if first_ip_version: metadata['ip_version'] = first_ip_version result.add_attribute( target_node=domain, name=attribute_name, # UNIQUE name for each record type! value=dns_records, attr_type='dns_record_list', provider=self.name, confidence=0.8, metadata=metadata ) except Exception as e: self.failed_requests += 1 self.logger.logger.debug(f"{record_type} record query failed for {domain}: {e}") raise e