288 lines
12 KiB
Python
288 lines
12 KiB
Python
# DNScope/providers/dns_provider.py
|
|
|
|
from dns import resolver, reversename
|
|
from typing import Dict
|
|
from datetime import datetime, timezone
|
|
from .base_provider import BaseProvider
|
|
from core.provider_result import ProviderResult
|
|
from utils.helpers import _is_valid_ip, _is_valid_domain, get_ip_version
|
|
|
|
|
|
class DNSProvider(BaseProvider):
|
|
"""
|
|
Provider for standard DNS resolution and reverse DNS lookups.
|
|
Now returns standardized ProviderResult objects with IPv4 and IPv6 support.
|
|
UPDATED: Enhanced with discovery timestamps for time-based edge coloring.
|
|
"""
|
|
|
|
def __init__(self, name=None, session_config=None):
|
|
"""Initialize DNS provider with session-specific configuration."""
|
|
super().__init__(
|
|
name="dns",
|
|
rate_limit=100,
|
|
timeout=10,
|
|
session_config=session_config
|
|
)
|
|
|
|
# Configure DNS resolver
|
|
self.resolver = resolver.Resolver()
|
|
self.resolver.timeout = 5
|
|
self.resolver.lifetime = 10
|
|
|
|
def get_name(self) -> str:
|
|
"""Return the provider name."""
|
|
return "dns"
|
|
|
|
def get_display_name(self) -> str:
|
|
"""Return the provider display name for the UI."""
|
|
return "DNS"
|
|
|
|
def requires_api_key(self) -> bool:
|
|
"""Return True if the provider requires an API key."""
|
|
return False
|
|
|
|
def get_eligibility(self) -> Dict[str, bool]:
|
|
"""Return a dictionary indicating if the provider can query domains and/or IPs."""
|
|
return {'domains': True, 'ips': True}
|
|
|
|
def is_available(self) -> bool:
|
|
"""DNS is always available - no API key required."""
|
|
return True
|
|
|
|
def query_domain(self, domain: str) -> ProviderResult:
|
|
"""
|
|
Query DNS records for the domain to discover relationships and attributes.
|
|
FIXED: Now creates separate attributes for each DNS record type.
|
|
UPDATED: Enhanced with discovery timestamps for time-based edge coloring.
|
|
|
|
Args:
|
|
domain: Domain to investigate
|
|
|
|
Returns:
|
|
ProviderResult containing discovered relationships and attributes
|
|
"""
|
|
if not _is_valid_domain(domain):
|
|
return ProviderResult()
|
|
|
|
result = ProviderResult()
|
|
discovery_time = datetime.now(timezone.utc)
|
|
|
|
# Query all record types - each gets its own attribute
|
|
for record_type in ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'SOA', 'TXT', 'SRV', 'CAA']:
|
|
try:
|
|
self._query_record(domain, record_type, result, discovery_time)
|
|
#except resolver.NoAnswer:
|
|
# This is not an error, just a confirmation that the record doesn't exist.
|
|
#self.logger.logger.debug(f"No {record_type} record found for {domain}")
|
|
except Exception as e:
|
|
self.failed_requests += 1
|
|
self.logger.logger.debug(f"{record_type} record query failed for {domain}: {e}")
|
|
|
|
return result
|
|
|
|
def query_ip(self, ip: str) -> ProviderResult:
|
|
"""
|
|
Query reverse DNS for the IP address (supports both IPv4 and IPv6).
|
|
UPDATED: Enhanced with discovery timestamps for time-based edge coloring.
|
|
|
|
Args:
|
|
ip: IP address to investigate (IPv4 or IPv6)
|
|
|
|
Returns:
|
|
ProviderResult containing discovered relationships and attributes
|
|
"""
|
|
if not _is_valid_ip(ip):
|
|
return ProviderResult()
|
|
|
|
result = ProviderResult()
|
|
ip_version = get_ip_version(ip)
|
|
discovery_time = datetime.now(timezone.utc)
|
|
|
|
try:
|
|
# Perform reverse DNS lookup (works for both IPv4 and IPv6)
|
|
self.total_requests += 1
|
|
reverse_name = reversename.from_address(ip)
|
|
response = self.resolver.resolve(reverse_name, 'PTR')
|
|
self.successful_requests += 1
|
|
|
|
ptr_records = []
|
|
for ptr_record in response:
|
|
hostname = str(ptr_record).rstrip('.')
|
|
|
|
if _is_valid_domain(hostname):
|
|
# Determine appropriate forward relationship type based on IP version
|
|
if ip_version == 6:
|
|
relationship_type = 'dns_aaaa_record'
|
|
record_prefix = 'AAAA'
|
|
else:
|
|
relationship_type = 'dns_a_record'
|
|
record_prefix = 'A'
|
|
|
|
# Enhanced raw_data with discovery timestamp for time-based edge coloring
|
|
raw_data = {
|
|
'query_type': 'PTR',
|
|
'ip_address': ip,
|
|
'ip_version': ip_version,
|
|
'hostname': hostname,
|
|
'ttl': response.ttl,
|
|
'discovery_timestamp': discovery_time.isoformat(),
|
|
'relevance_timestamp': discovery_time.isoformat() # DNS data is "fresh" when discovered
|
|
}
|
|
|
|
# Add the relationship
|
|
result.add_relationship(
|
|
source_node=ip,
|
|
target_node=hostname,
|
|
relationship_type='dns_ptr_record',
|
|
provider=self.name,
|
|
raw_data=raw_data
|
|
)
|
|
|
|
# Add to PTR records list
|
|
ptr_records.append(f"PTR: {hostname}")
|
|
|
|
# Log the relationship discovery
|
|
self.log_relationship_discovery(
|
|
source_node=ip,
|
|
target_node=hostname,
|
|
relationship_type='dns_ptr_record',
|
|
raw_data=raw_data,
|
|
discovery_method=f"reverse_dns_lookup_ipv{ip_version}"
|
|
)
|
|
|
|
# Add PTR records as separate attribute
|
|
if ptr_records:
|
|
result.add_attribute(
|
|
target_node=ip,
|
|
name='ptr_records', # Specific name for PTR records
|
|
value=ptr_records,
|
|
attr_type='dns_record',
|
|
provider=self.name,
|
|
metadata={'ttl': response.ttl, 'ip_version': ip_version}
|
|
)
|
|
|
|
except resolver.NXDOMAIN:
|
|
self.failed_requests += 1
|
|
self.logger.logger.debug(f"Reverse DNS lookup failed for {ip}: NXDOMAIN")
|
|
except Exception as e:
|
|
self.failed_requests += 1
|
|
self.logger.logger.debug(f"Reverse DNS lookup failed for {ip}: {e}")
|
|
# Re-raise the exception so the scanner can handle the failure
|
|
raise e
|
|
|
|
return result
|
|
|
|
def _query_record(self, domain: str, record_type: str, result: ProviderResult, discovery_time: datetime) -> None:
|
|
"""
|
|
FIXED: Query DNS records with unique attribute names for each record type.
|
|
Enhanced to better handle IPv6 AAAA records.
|
|
UPDATED: Enhanced with discovery timestamps for time-based edge coloring.
|
|
"""
|
|
try:
|
|
self.total_requests += 1
|
|
response = self.resolver.resolve(domain, record_type)
|
|
self.successful_requests += 1
|
|
|
|
dns_records = []
|
|
|
|
for record in response:
|
|
target = ""
|
|
if record_type in ['A', 'AAAA']:
|
|
target = str(record)
|
|
# Validate that the IP address is properly formed
|
|
if not _is_valid_ip(target):
|
|
self.logger.logger.debug(f"Invalid IP address in {record_type} record: {target}")
|
|
continue
|
|
elif record_type in ['CNAME', 'NS', 'PTR']:
|
|
target = str(record.target).rstrip('.')
|
|
elif record_type == 'MX':
|
|
target = str(record.exchange).rstrip('.')
|
|
elif record_type == 'SOA':
|
|
target = str(record.mname).rstrip('.')
|
|
elif record_type in ['TXT']:
|
|
# Keep raw TXT record value
|
|
txt_value = str(record).strip('"')
|
|
dns_records.append(txt_value) # Just the value for TXT
|
|
continue
|
|
elif record_type == 'SRV':
|
|
target = str(record.target).rstrip('.')
|
|
elif record_type == 'CAA':
|
|
# Keep raw CAA record format
|
|
caa_value = f"{record.flags} {record.tag.decode('utf-8')} \"{record.value.decode('utf-8')}\""
|
|
dns_records.append(caa_value) # Just the value for CAA
|
|
continue
|
|
else:
|
|
target = str(record)
|
|
|
|
if target:
|
|
# Determine IP version for metadata if this is an IP record
|
|
ip_version = None
|
|
if record_type in ['A', 'AAAA'] and _is_valid_ip(target):
|
|
ip_version = get_ip_version(target)
|
|
|
|
# Enhanced raw_data with discovery timestamp for time-based edge coloring
|
|
raw_data = {
|
|
'query_type': record_type,
|
|
'domain': domain,
|
|
'value': target,
|
|
'ttl': response.ttl,
|
|
'discovery_timestamp': discovery_time.isoformat(),
|
|
'relevance_timestamp': discovery_time.isoformat() # DNS data is "fresh" when discovered
|
|
}
|
|
|
|
if ip_version:
|
|
raw_data['ip_version'] = ip_version
|
|
|
|
relationship_type = f"dns_{record_type.lower()}_record"
|
|
|
|
# Add relationship
|
|
result.add_relationship(
|
|
source_node=domain,
|
|
target_node=target,
|
|
relationship_type=relationship_type,
|
|
provider=self.name,
|
|
raw_data=raw_data
|
|
)
|
|
|
|
# Add target to records list
|
|
dns_records.append(target)
|
|
|
|
# Log relationship discovery with IP version info
|
|
discovery_method = f"dns_{record_type.lower()}_record"
|
|
if ip_version:
|
|
discovery_method += f"_ipv{ip_version}"
|
|
|
|
self.log_relationship_discovery(
|
|
source_node=domain,
|
|
target_node=target,
|
|
relationship_type=relationship_type,
|
|
raw_data=raw_data,
|
|
discovery_method=discovery_method
|
|
)
|
|
|
|
# FIXED: Create attribute with specific name for each record type
|
|
if dns_records:
|
|
# Use record type specific attribute name (e.g., 'a_records', 'mx_records', etc.)
|
|
attribute_name = f"{record_type.lower()}_records"
|
|
|
|
metadata = {'record_type': record_type, 'ttl': response.ttl}
|
|
|
|
# Add IP version info for A/AAAA records
|
|
if record_type in ['A', 'AAAA'] and dns_records:
|
|
first_ip_version = get_ip_version(dns_records[0])
|
|
if first_ip_version:
|
|
metadata['ip_version'] = first_ip_version
|
|
|
|
result.add_attribute(
|
|
target_node=domain,
|
|
name=attribute_name, # UNIQUE name for each record type!
|
|
value=dns_records,
|
|
attr_type='dns_record_list',
|
|
provider=self.name,
|
|
metadata=metadata
|
|
)
|
|
|
|
except Exception as e:
|
|
self.failed_requests += 1
|
|
self.logger.logger.debug(f"{record_type} record query failed for {domain}: {e}")
|
|
raise e |