333 lines
13 KiB
Python
333 lines
13 KiB
Python
"""
|
|
VirusTotal provider for DNSRecon.
|
|
Discovers domain relationships through passive DNS and URL analysis.
|
|
"""
|
|
|
|
import json
|
|
from typing import List, Dict, Any, Tuple
|
|
from .base_provider import BaseProvider
|
|
from utils.helpers import _is_valid_ip, _is_valid_domain
|
|
from core.graph_manager import RelationshipType
|
|
|
|
|
|
class VirusTotalProvider(BaseProvider):
|
|
"""
|
|
Provider for querying VirusTotal API for passive DNS and domain reputation data.
|
|
Now uses session-specific API keys and rate limits.
|
|
"""
|
|
|
|
def __init__(self, session_config=None):
|
|
"""Initialize VirusTotal provider with session-specific configuration."""
|
|
super().__init__(
|
|
name="virustotal",
|
|
rate_limit=4, # Free tier: 4 requests per minute
|
|
timeout=30,
|
|
session_config=session_config
|
|
)
|
|
self.base_url = "https://www.virustotal.com/vtapi/v2"
|
|
self.api_key = self.config.get_api_key('virustotal')
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if VirusTotal provider is available (has valid API key in this session)."""
|
|
return self.api_key is not None and len(self.api_key.strip()) > 0
|
|
|
|
def get_name(self) -> str:
|
|
"""Return the provider name."""
|
|
return "virustotal"
|
|
|
|
def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
|
|
"""
|
|
Query VirusTotal for domain information including passive DNS.
|
|
|
|
Args:
|
|
domain: Domain to investigate
|
|
|
|
Returns:
|
|
List of relationships discovered from VirusTotal data
|
|
"""
|
|
if not _is_valid_domain(domain) or not self.is_available():
|
|
return []
|
|
|
|
relationships = []
|
|
|
|
# Query domain report
|
|
domain_relationships = self._query_domain_report(domain)
|
|
relationships.extend(domain_relationships)
|
|
|
|
# Query passive DNS for the domain
|
|
passive_dns_relationships = self._query_passive_dns_domain(domain)
|
|
relationships.extend(passive_dns_relationships)
|
|
|
|
return relationships
|
|
|
|
def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
|
|
"""
|
|
Query VirusTotal for IP address information including passive DNS.
|
|
|
|
Args:
|
|
ip: IP address to investigate
|
|
|
|
Returns:
|
|
List of relationships discovered from VirusTotal IP data
|
|
"""
|
|
if not _is_valid_ip(ip) or not self.is_available():
|
|
return []
|
|
|
|
relationships = []
|
|
|
|
# Query IP report
|
|
ip_relationships = self._query_ip_report(ip)
|
|
relationships.extend(ip_relationships)
|
|
|
|
# Query passive DNS for the IP
|
|
passive_dns_relationships = self._query_passive_dns_ip(ip)
|
|
relationships.extend(passive_dns_relationships)
|
|
|
|
return relationships
|
|
|
|
def _query_domain_report(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
|
|
"""Query VirusTotal domain report."""
|
|
relationships = []
|
|
|
|
try:
|
|
url = f"{self.base_url}/domain/report"
|
|
params = {
|
|
'apikey': self.api_key,
|
|
'domain': domain,
|
|
'allinfo': 1 # Get comprehensive information
|
|
}
|
|
|
|
response = self.make_request(url, method="GET", params=params, target_indicator=domain)
|
|
|
|
if not response or response.status_code != 200:
|
|
return []
|
|
|
|
data = response.json()
|
|
|
|
if data.get('response_code') != 1:
|
|
return []
|
|
|
|
# Extract resolved IPs
|
|
resolutions = data.get('resolutions', [])
|
|
for resolution in resolutions:
|
|
ip_address = resolution.get('ip_address')
|
|
last_resolved = resolution.get('last_resolved')
|
|
|
|
if ip_address and _is_valid_ip(ip_address):
|
|
raw_data = {
|
|
'domain': domain,
|
|
'ip_address': ip_address,
|
|
'last_resolved': last_resolved,
|
|
'source': 'virustotal_domain_report'
|
|
}
|
|
|
|
relationships.append((
|
|
domain,
|
|
ip_address,
|
|
RelationshipType.PASSIVE_DNS,
|
|
RelationshipType.PASSIVE_DNS.default_confidence,
|
|
raw_data
|
|
))
|
|
|
|
self.log_relationship_discovery(
|
|
source_node=domain,
|
|
target_node=ip_address,
|
|
relationship_type=RelationshipType.PASSIVE_DNS,
|
|
confidence_score=RelationshipType.PASSIVE_DNS.default_confidence,
|
|
raw_data=raw_data,
|
|
discovery_method="virustotal_domain_resolution"
|
|
)
|
|
|
|
# Extract subdomains
|
|
subdomains = data.get('subdomains', [])
|
|
for subdomain in subdomains:
|
|
if subdomain != domain and _is_valid_domain(subdomain):
|
|
raw_data = {
|
|
'parent_domain': domain,
|
|
'subdomain': subdomain,
|
|
'source': 'virustotal_subdomain_discovery'
|
|
}
|
|
|
|
relationships.append((
|
|
domain,
|
|
subdomain,
|
|
RelationshipType.PASSIVE_DNS,
|
|
0.7, # Medium-high confidence for subdomains
|
|
raw_data
|
|
))
|
|
|
|
self.log_relationship_discovery(
|
|
source_node=domain,
|
|
target_node=subdomain,
|
|
relationship_type=RelationshipType.PASSIVE_DNS,
|
|
confidence_score=0.7,
|
|
raw_data=raw_data,
|
|
discovery_method="virustotal_subdomain_discovery"
|
|
)
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.logger.error(f"Failed to parse JSON response from VirusTotal: {e}")
|
|
except Exception as e:
|
|
self.logger.logger.error(f"Error querying VirusTotal domain report for {domain}: {e}")
|
|
|
|
return relationships
|
|
|
|
def _query_ip_report(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
|
|
"""Query VirusTotal IP report."""
|
|
relationships = []
|
|
|
|
try:
|
|
url = f"{self.base_url}/ip-address/report"
|
|
params = {
|
|
'apikey': self.api_key,
|
|
'ip': ip
|
|
}
|
|
|
|
response = self.make_request(url, method="GET", params=params, target_indicator=ip)
|
|
|
|
if not response or response.status_code != 200:
|
|
return []
|
|
|
|
data = response.json()
|
|
|
|
if data.get('response_code') != 1:
|
|
return []
|
|
|
|
# Extract resolved domains
|
|
resolutions = data.get('resolutions', [])
|
|
for resolution in resolutions:
|
|
hostname = resolution.get('hostname')
|
|
last_resolved = resolution.get('last_resolved')
|
|
|
|
if hostname and _is_valid_domain(hostname):
|
|
raw_data = {
|
|
'ip_address': ip,
|
|
'hostname': hostname,
|
|
'last_resolved': last_resolved,
|
|
'source': 'virustotal_ip_report'
|
|
}
|
|
|
|
relationships.append((
|
|
ip,
|
|
hostname,
|
|
RelationshipType.PASSIVE_DNS,
|
|
RelationshipType.PASSIVE_DNS.default_confidence,
|
|
raw_data
|
|
))
|
|
|
|
self.log_relationship_discovery(
|
|
source_node=ip,
|
|
target_node=hostname,
|
|
relationship_type=RelationshipType.PASSIVE_DNS,
|
|
confidence_score=RelationshipType.PASSIVE_DNS.default_confidence,
|
|
raw_data=raw_data,
|
|
discovery_method="virustotal_ip_resolution"
|
|
)
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.logger.error(f"Failed to parse JSON response from VirusTotal: {e}")
|
|
except Exception as e:
|
|
self.logger.logger.error(f"Error querying VirusTotal IP report for {ip}: {e}")
|
|
|
|
return relationships
|
|
|
|
def _query_passive_dns_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
|
|
"""Query VirusTotal passive DNS for domain."""
|
|
# Note: VirusTotal's passive DNS API might require a premium subscription
|
|
# This is a placeholder for the endpoint structure
|
|
return []
|
|
|
|
def _query_passive_dns_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
|
|
"""Query VirusTotal passive DNS for IP."""
|
|
# Note: VirusTotal's passive DNS API might require a premium subscription
|
|
# This is a placeholder for the endpoint structure
|
|
return []
|
|
|
|
def get_domain_reputation(self, domain: str) -> Dict[str, Any]:
|
|
"""
|
|
Get domain reputation information from VirusTotal.
|
|
|
|
Args:
|
|
domain: Domain to check reputation for
|
|
|
|
Returns:
|
|
Dictionary containing reputation data
|
|
"""
|
|
if not _is_valid_domain(domain) or not self.is_available():
|
|
return {}
|
|
|
|
try:
|
|
url = f"{self.base_url}/domain/report"
|
|
params = {
|
|
'apikey': self.api_key,
|
|
'domain': domain
|
|
}
|
|
|
|
response = self.make_request(url, method="GET", params=params, target_indicator=domain)
|
|
|
|
if response and response.status_code == 200:
|
|
data = response.json()
|
|
|
|
if data.get('response_code') == 1:
|
|
return {
|
|
'positives': data.get('positives', 0),
|
|
'total': data.get('total', 0),
|
|
'scan_date': data.get('scan_date', ''),
|
|
'permalink': data.get('permalink', ''),
|
|
'reputation_score': self._calculate_reputation_score(data)
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.logger.error(f"Error getting VirusTotal reputation for domain {domain}: {e}")
|
|
|
|
return {}
|
|
|
|
def get_ip_reputation(self, ip: str) -> Dict[str, Any]:
|
|
"""
|
|
Get IP reputation information from VirusTotal.
|
|
|
|
Args:
|
|
ip: IP address to check reputation for
|
|
|
|
Returns:
|
|
Dictionary containing reputation data
|
|
"""
|
|
if not _is_valid_ip(ip) or not self.is_available():
|
|
return {}
|
|
|
|
try:
|
|
url = f"{self.base_url}/ip-address/report"
|
|
params = {
|
|
'apikey': self.api_key,
|
|
'ip': ip
|
|
}
|
|
|
|
response = self.make_request(url, method="GET", params=params, target_indicator=ip)
|
|
|
|
if response and response.status_code == 200:
|
|
data = response.json()
|
|
|
|
if data.get('response_code') == 1:
|
|
return {
|
|
'positives': data.get('positives', 0),
|
|
'total': data.get('total', 0),
|
|
'scan_date': data.get('scan_date', ''),
|
|
'permalink': data.get('permalink', ''),
|
|
'reputation_score': self._calculate_reputation_score(data)
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.logger.error(f"Error getting VirusTotal reputation for IP {ip}: {e}")
|
|
|
|
return {}
|
|
|
|
def _calculate_reputation_score(self, data: Dict[str, Any]) -> float:
|
|
"""Calculate a normalized reputation score (0.0 to 1.0)."""
|
|
positives = data.get('positives', 0)
|
|
total = data.get('total', 1) # Avoid division by zero
|
|
|
|
if total == 0:
|
|
return 1.0 # No data means neutral
|
|
|
|
# Score is inverse of detection ratio (lower detection = higher reputation)
|
|
return max(0.0, 1.0 - (positives / total)) |