286 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			286 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# dnsrecon/providers/dns_provider.py
 | 
						|
 | 
						|
from dns import resolver, reversename
 | 
						|
from typing import Dict
 | 
						|
from .base_provider import BaseProvider
 | 
						|
from core.provider_result import ProviderResult
 | 
						|
from utils.helpers import _is_valid_ip, _is_valid_domain, get_ip_version
 | 
						|
 | 
						|
 | 
						|
class DNSProvider(BaseProvider):
 | 
						|
    """
 | 
						|
    Provider for standard DNS resolution and reverse DNS lookups.
 | 
						|
    Now returns standardized ProviderResult objects with IPv4 and IPv6 support.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, name=None, session_config=None):
 | 
						|
        """Initialize DNS provider with session-specific configuration."""
 | 
						|
        super().__init__(
 | 
						|
            name="dns",
 | 
						|
            rate_limit=100,
 | 
						|
            timeout=10,
 | 
						|
            session_config=session_config
 | 
						|
        )
 | 
						|
 | 
						|
        # Configure DNS resolver
 | 
						|
        self.resolver = resolver.Resolver()
 | 
						|
        self.resolver.timeout = 5
 | 
						|
        self.resolver.lifetime = 10
 | 
						|
 | 
						|
    def get_name(self) -> str:
 | 
						|
        """Return the provider name."""
 | 
						|
        return "dns"
 | 
						|
 | 
						|
    def get_display_name(self) -> str:
 | 
						|
        """Return the provider display name for the UI."""
 | 
						|
        return "DNS"
 | 
						|
 | 
						|
    def requires_api_key(self) -> bool:
 | 
						|
        """Return True if the provider requires an API key."""
 | 
						|
        return False
 | 
						|
 | 
						|
    def get_eligibility(self) -> Dict[str, bool]:
 | 
						|
        """Return a dictionary indicating if the provider can query domains and/or IPs."""
 | 
						|
        return {'domains': True, 'ips': True}
 | 
						|
 | 
						|
    def is_available(self) -> bool:
 | 
						|
        """DNS is always available - no API key required."""
 | 
						|
        return True
 | 
						|
 | 
						|
    def query_domain(self, domain: str) -> ProviderResult:
 | 
						|
        """
 | 
						|
        Query DNS records for the domain to discover relationships and attributes.
 | 
						|
        FIXED: Now creates separate attributes for each DNS record type.
 | 
						|
        
 | 
						|
        Args:
 | 
						|
            domain: Domain to investigate
 | 
						|
            
 | 
						|
        Returns:
 | 
						|
            ProviderResult containing discovered relationships and attributes
 | 
						|
        """
 | 
						|
        if not _is_valid_domain(domain):
 | 
						|
            return ProviderResult()
 | 
						|
 | 
						|
        result = ProviderResult()
 | 
						|
 | 
						|
        # Query all record types - each gets its own attribute
 | 
						|
        for record_type in ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'SOA', 'TXT', 'SRV', 'CAA']:
 | 
						|
            try:
 | 
						|
                self._query_record(domain, record_type, result)
 | 
						|
            #except resolver.NoAnswer:
 | 
						|
                # This is not an error, just a confirmation that the record doesn't exist.
 | 
						|
                #self.logger.logger.debug(f"No {record_type} record found for {domain}")
 | 
						|
            except Exception as e:
 | 
						|
                self.failed_requests += 1
 | 
						|
                self.logger.logger.debug(f"{record_type} record query failed for {domain}: {e}")
 | 
						|
 | 
						|
        return result
 | 
						|
 | 
						|
    def query_ip(self, ip: str) -> ProviderResult:
 | 
						|
        """
 | 
						|
        Query reverse DNS for the IP address (supports both IPv4 and IPv6).
 | 
						|
 | 
						|
        Args:
 | 
						|
            ip: IP address to investigate (IPv4 or IPv6)
 | 
						|
 | 
						|
        Returns:
 | 
						|
            ProviderResult containing discovered relationships and attributes
 | 
						|
        """
 | 
						|
        if not _is_valid_ip(ip):
 | 
						|
            return ProviderResult()
 | 
						|
 | 
						|
        result = ProviderResult()
 | 
						|
        ip_version = get_ip_version(ip)
 | 
						|
 | 
						|
        try:
 | 
						|
            # Perform reverse DNS lookup (works for both IPv4 and IPv6)
 | 
						|
            self.total_requests += 1
 | 
						|
            reverse_name = reversename.from_address(ip)
 | 
						|
            response = self.resolver.resolve(reverse_name, 'PTR')
 | 
						|
            self.successful_requests += 1
 | 
						|
 | 
						|
            ptr_records = []
 | 
						|
            for ptr_record in response:
 | 
						|
                hostname = str(ptr_record).rstrip('.')
 | 
						|
 | 
						|
                if _is_valid_domain(hostname):
 | 
						|
                    # Determine appropriate forward relationship type based on IP version
 | 
						|
                    if ip_version == 6:
 | 
						|
                        relationship_type = 'dns_aaaa_record'
 | 
						|
                        record_prefix = 'AAAA'
 | 
						|
                    else:
 | 
						|
                        relationship_type = 'dns_a_record'
 | 
						|
                        record_prefix = 'A'
 | 
						|
                    
 | 
						|
                    # Add the relationship
 | 
						|
                    result.add_relationship(
 | 
						|
                        source_node=ip,
 | 
						|
                        target_node=hostname,
 | 
						|
                        relationship_type='dns_ptr_record',
 | 
						|
                        provider=self.name,
 | 
						|
                        confidence=0.8,
 | 
						|
                        raw_data={
 | 
						|
                            'query_type': 'PTR',
 | 
						|
                            'ip_address': ip,
 | 
						|
                            'ip_version': ip_version,
 | 
						|
                            'hostname': hostname,
 | 
						|
                            'ttl': response.ttl
 | 
						|
                        }
 | 
						|
                    )
 | 
						|
 | 
						|
                    # Add to PTR records list
 | 
						|
                    ptr_records.append(f"PTR: {hostname}")
 | 
						|
 | 
						|
                    # Log the relationship discovery
 | 
						|
                    self.log_relationship_discovery(
 | 
						|
                        source_node=ip,
 | 
						|
                        target_node=hostname,
 | 
						|
                        relationship_type='dns_ptr_record',
 | 
						|
                        confidence_score=0.8,
 | 
						|
                        raw_data={
 | 
						|
                            'query_type': 'PTR',
 | 
						|
                            'ip_address': ip,
 | 
						|
                            'ip_version': ip_version,
 | 
						|
                            'hostname': hostname,
 | 
						|
                            'ttl': response.ttl
 | 
						|
                        },
 | 
						|
                        discovery_method=f"reverse_dns_lookup_ipv{ip_version}"
 | 
						|
                    )
 | 
						|
 | 
						|
            # Add PTR records as separate attribute
 | 
						|
            if ptr_records:
 | 
						|
                result.add_attribute(
 | 
						|
                    target_node=ip,
 | 
						|
                    name='ptr_records',  # Specific name for PTR records
 | 
						|
                    value=ptr_records,
 | 
						|
                    attr_type='dns_record',
 | 
						|
                    provider=self.name,
 | 
						|
                    confidence=0.8,
 | 
						|
                    metadata={'ttl': response.ttl, 'ip_version': ip_version}
 | 
						|
                )
 | 
						|
 | 
						|
        except resolver.NXDOMAIN:
 | 
						|
            self.failed_requests += 1
 | 
						|
            self.logger.logger.debug(f"Reverse DNS lookup failed for {ip}: NXDOMAIN")
 | 
						|
        except Exception as e:
 | 
						|
            self.failed_requests += 1
 | 
						|
            self.logger.logger.debug(f"Reverse DNS lookup failed for {ip}: {e}")
 | 
						|
            # Re-raise the exception so the scanner can handle the failure
 | 
						|
            raise e
 | 
						|
 | 
						|
        return result
 | 
						|
 | 
						|
    def _query_record(self, domain: str, record_type: str, result: ProviderResult) -> None:
 | 
						|
        """
 | 
						|
        FIXED: Query DNS records with unique attribute names for each record type.
 | 
						|
        Enhanced to better handle IPv6 AAAA records.
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            self.total_requests += 1
 | 
						|
            response = self.resolver.resolve(domain, record_type)
 | 
						|
            self.successful_requests += 1
 | 
						|
 | 
						|
            dns_records = []
 | 
						|
            
 | 
						|
            for record in response:
 | 
						|
                target = ""
 | 
						|
                if record_type in ['A', 'AAAA']:
 | 
						|
                    target = str(record)
 | 
						|
                    # Validate that the IP address is properly formed
 | 
						|
                    if not _is_valid_ip(target):
 | 
						|
                        self.logger.logger.debug(f"Invalid IP address in {record_type} record: {target}")
 | 
						|
                        continue
 | 
						|
                elif record_type in ['CNAME', 'NS', 'PTR']:
 | 
						|
                    target = str(record.target).rstrip('.')
 | 
						|
                elif record_type == 'MX':
 | 
						|
                    target = str(record.exchange).rstrip('.')
 | 
						|
                elif record_type == 'SOA':
 | 
						|
                    target = str(record.mname).rstrip('.')
 | 
						|
                elif record_type in ['TXT']:
 | 
						|
                    # Keep raw TXT record value
 | 
						|
                    txt_value = str(record).strip('"')
 | 
						|
                    dns_records.append(txt_value)  # Just the value for TXT
 | 
						|
                    continue
 | 
						|
                elif record_type == 'SRV':
 | 
						|
                    target = str(record.target).rstrip('.')
 | 
						|
                elif record_type == 'CAA':
 | 
						|
                    # Keep raw CAA record format
 | 
						|
                    caa_value = f"{record.flags} {record.tag.decode('utf-8')} \"{record.value.decode('utf-8')}\""
 | 
						|
                    dns_records.append(caa_value)  # Just the value for CAA
 | 
						|
                    continue
 | 
						|
                else:
 | 
						|
                    target = str(record)
 | 
						|
 | 
						|
                if target:
 | 
						|
                    # Determine IP version for metadata if this is an IP record
 | 
						|
                    ip_version = None
 | 
						|
                    if record_type in ['A', 'AAAA'] and _is_valid_ip(target):
 | 
						|
                        ip_version = get_ip_version(target)
 | 
						|
                    
 | 
						|
                    raw_data = {
 | 
						|
                        'query_type': record_type,
 | 
						|
                        'domain': domain,
 | 
						|
                        'value': target,
 | 
						|
                        'ttl': response.ttl
 | 
						|
                    }
 | 
						|
                    
 | 
						|
                    if ip_version:
 | 
						|
                        raw_data['ip_version'] = ip_version
 | 
						|
                    
 | 
						|
                    relationship_type = f"dns_{record_type.lower()}_record"
 | 
						|
                    confidence = 0.8
 | 
						|
 | 
						|
                    # Add relationship
 | 
						|
                    result.add_relationship(
 | 
						|
                        source_node=domain,
 | 
						|
                        target_node=target,
 | 
						|
                        relationship_type=relationship_type,
 | 
						|
                        provider=self.name,
 | 
						|
                        confidence=confidence,
 | 
						|
                        raw_data=raw_data
 | 
						|
                    )
 | 
						|
 | 
						|
                    # Add target to records list
 | 
						|
                    dns_records.append(target)
 | 
						|
 | 
						|
                    # Log relationship discovery with IP version info
 | 
						|
                    discovery_method = f"dns_{record_type.lower()}_record"
 | 
						|
                    if ip_version:
 | 
						|
                        discovery_method += f"_ipv{ip_version}"
 | 
						|
                    
 | 
						|
                    self.log_relationship_discovery(
 | 
						|
                        source_node=domain,
 | 
						|
                        target_node=target,
 | 
						|
                        relationship_type=relationship_type,
 | 
						|
                        confidence_score=confidence,
 | 
						|
                        raw_data=raw_data,
 | 
						|
                        discovery_method=discovery_method
 | 
						|
                    )
 | 
						|
 | 
						|
            # FIXED: Create attribute with specific name for each record type
 | 
						|
            if dns_records:
 | 
						|
                # Use record type specific attribute name (e.g., 'a_records', 'mx_records', etc.)
 | 
						|
                attribute_name = f"{record_type.lower()}_records"
 | 
						|
                
 | 
						|
                metadata = {'record_type': record_type, 'ttl': response.ttl}
 | 
						|
                
 | 
						|
                # Add IP version info for A/AAAA records
 | 
						|
                if record_type in ['A', 'AAAA'] and dns_records:
 | 
						|
                    first_ip_version = get_ip_version(dns_records[0])
 | 
						|
                    if first_ip_version:
 | 
						|
                        metadata['ip_version'] = first_ip_version
 | 
						|
                
 | 
						|
                result.add_attribute(
 | 
						|
                    target_node=domain,
 | 
						|
                    name=attribute_name,  # UNIQUE name for each record type!
 | 
						|
                    value=dns_records,
 | 
						|
                    attr_type='dns_record_list',
 | 
						|
                    provider=self.name,
 | 
						|
                    confidence=0.8,
 | 
						|
                    metadata=metadata
 | 
						|
                )
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            self.failed_requests += 1
 | 
						|
            self.logger.logger.debug(f"{record_type} record query failed for {domain}: {e}")
 | 
						|
            raise e |