fix large entity

This commit is contained in:
overcuriousity
2025-09-13 15:38:05 +02:00
parent 53baf2e291
commit 612f414d2a
11 changed files with 45 additions and 385 deletions

View File

@@ -7,15 +7,13 @@ from .base_provider import BaseProvider, RateLimiter
from .crtsh_provider import CrtShProvider
from .dns_provider import DNSProvider
from .shodan_provider import ShodanProvider
from .virustotal_provider import VirusTotalProvider
__all__ = [
'BaseProvider',
'RateLimiter',
'CrtShProvider',
'DNSProvider',
'ShodanProvider',
'VirusTotalProvider'
'ShodanProvider'
]
__version__ = "1.0.0-phase2"

View File

@@ -1,333 +0,0 @@
"""
VirusTotal provider for DNSRecon.
Discovers domain relationships through passive DNS and URL analysis.
"""
import json
from typing import List, Dict, Any, Tuple
from .base_provider import BaseProvider
from utils.helpers import _is_valid_ip, _is_valid_domain
from core.graph_manager import RelationshipType
class VirusTotalProvider(BaseProvider):
"""
Provider for querying VirusTotal API for passive DNS and domain reputation data.
Now uses session-specific API keys and rate limits.
"""
def __init__(self, session_config=None):
"""Initialize VirusTotal provider with session-specific configuration."""
super().__init__(
name="virustotal",
rate_limit=4, # Free tier: 4 requests per minute
timeout=30,
session_config=session_config
)
self.base_url = "https://www.virustotal.com/vtapi/v2"
self.api_key = self.config.get_api_key('virustotal')
def is_available(self) -> bool:
"""Check if VirusTotal provider is available (has valid API key in this session)."""
return self.api_key is not None and len(self.api_key.strip()) > 0
def get_name(self) -> str:
"""Return the provider name."""
return "virustotal"
def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
"""
Query VirusTotal for domain information including passive DNS.
Args:
domain: Domain to investigate
Returns:
List of relationships discovered from VirusTotal data
"""
if not _is_valid_domain(domain) or not self.is_available():
return []
relationships = []
# Query domain report
domain_relationships = self._query_domain_report(domain)
relationships.extend(domain_relationships)
# Query passive DNS for the domain
passive_dns_relationships = self._query_passive_dns_domain(domain)
relationships.extend(passive_dns_relationships)
return relationships
def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
"""
Query VirusTotal for IP address information including passive DNS.
Args:
ip: IP address to investigate
Returns:
List of relationships discovered from VirusTotal IP data
"""
if not _is_valid_ip(ip) or not self.is_available():
return []
relationships = []
# Query IP report
ip_relationships = self._query_ip_report(ip)
relationships.extend(ip_relationships)
# Query passive DNS for the IP
passive_dns_relationships = self._query_passive_dns_ip(ip)
relationships.extend(passive_dns_relationships)
return relationships
def _query_domain_report(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
"""Query VirusTotal domain report."""
relationships = []
try:
url = f"{self.base_url}/domain/report"
params = {
'apikey': self.api_key,
'domain': domain,
'allinfo': 1 # Get comprehensive information
}
response = self.make_request(url, method="GET", params=params, target_indicator=domain)
if not response or response.status_code != 200:
return []
data = response.json()
if data.get('response_code') != 1:
return []
# Extract resolved IPs
resolutions = data.get('resolutions', [])
for resolution in resolutions:
ip_address = resolution.get('ip_address')
last_resolved = resolution.get('last_resolved')
if ip_address and _is_valid_ip(ip_address):
raw_data = {
'domain': domain,
'ip_address': ip_address,
'last_resolved': last_resolved,
'source': 'virustotal_domain_report'
}
relationships.append((
domain,
ip_address,
RelationshipType.PASSIVE_DNS,
RelationshipType.PASSIVE_DNS.default_confidence,
raw_data
))
self.log_relationship_discovery(
source_node=domain,
target_node=ip_address,
relationship_type=RelationshipType.PASSIVE_DNS,
confidence_score=RelationshipType.PASSIVE_DNS.default_confidence,
raw_data=raw_data,
discovery_method="virustotal_domain_resolution"
)
# Extract subdomains
subdomains = data.get('subdomains', [])
for subdomain in subdomains:
if subdomain != domain and _is_valid_domain(subdomain):
raw_data = {
'parent_domain': domain,
'subdomain': subdomain,
'source': 'virustotal_subdomain_discovery'
}
relationships.append((
domain,
subdomain,
RelationshipType.PASSIVE_DNS,
0.7, # Medium-high confidence for subdomains
raw_data
))
self.log_relationship_discovery(
source_node=domain,
target_node=subdomain,
relationship_type=RelationshipType.PASSIVE_DNS,
confidence_score=0.7,
raw_data=raw_data,
discovery_method="virustotal_subdomain_discovery"
)
except json.JSONDecodeError as e:
self.logger.logger.error(f"Failed to parse JSON response from VirusTotal: {e}")
except Exception as e:
self.logger.logger.error(f"Error querying VirusTotal domain report for {domain}: {e}")
return relationships
def _query_ip_report(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
"""Query VirusTotal IP report."""
relationships = []
try:
url = f"{self.base_url}/ip-address/report"
params = {
'apikey': self.api_key,
'ip': ip
}
response = self.make_request(url, method="GET", params=params, target_indicator=ip)
if not response or response.status_code != 200:
return []
data = response.json()
if data.get('response_code') != 1:
return []
# Extract resolved domains
resolutions = data.get('resolutions', [])
for resolution in resolutions:
hostname = resolution.get('hostname')
last_resolved = resolution.get('last_resolved')
if hostname and _is_valid_domain(hostname):
raw_data = {
'ip_address': ip,
'hostname': hostname,
'last_resolved': last_resolved,
'source': 'virustotal_ip_report'
}
relationships.append((
ip,
hostname,
RelationshipType.PASSIVE_DNS,
RelationshipType.PASSIVE_DNS.default_confidence,
raw_data
))
self.log_relationship_discovery(
source_node=ip,
target_node=hostname,
relationship_type=RelationshipType.PASSIVE_DNS,
confidence_score=RelationshipType.PASSIVE_DNS.default_confidence,
raw_data=raw_data,
discovery_method="virustotal_ip_resolution"
)
except json.JSONDecodeError as e:
self.logger.logger.error(f"Failed to parse JSON response from VirusTotal: {e}")
except Exception as e:
self.logger.logger.error(f"Error querying VirusTotal IP report for {ip}: {e}")
return relationships
def _query_passive_dns_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
"""Query VirusTotal passive DNS for domain."""
# Note: VirusTotal's passive DNS API might require a premium subscription
# This is a placeholder for the endpoint structure
return []
def _query_passive_dns_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
"""Query VirusTotal passive DNS for IP."""
# Note: VirusTotal's passive DNS API might require a premium subscription
# This is a placeholder for the endpoint structure
return []
def get_domain_reputation(self, domain: str) -> Dict[str, Any]:
"""
Get domain reputation information from VirusTotal.
Args:
domain: Domain to check reputation for
Returns:
Dictionary containing reputation data
"""
if not _is_valid_domain(domain) or not self.is_available():
return {}
try:
url = f"{self.base_url}/domain/report"
params = {
'apikey': self.api_key,
'domain': domain
}
response = self.make_request(url, method="GET", params=params, target_indicator=domain)
if response and response.status_code == 200:
data = response.json()
if data.get('response_code') == 1:
return {
'positives': data.get('positives', 0),
'total': data.get('total', 0),
'scan_date': data.get('scan_date', ''),
'permalink': data.get('permalink', ''),
'reputation_score': self._calculate_reputation_score(data)
}
except Exception as e:
self.logger.logger.error(f"Error getting VirusTotal reputation for domain {domain}: {e}")
return {}
def get_ip_reputation(self, ip: str) -> Dict[str, Any]:
"""
Get IP reputation information from VirusTotal.
Args:
ip: IP address to check reputation for
Returns:
Dictionary containing reputation data
"""
if not _is_valid_ip(ip) or not self.is_available():
return {}
try:
url = f"{self.base_url}/ip-address/report"
params = {
'apikey': self.api_key,
'ip': ip
}
response = self.make_request(url, method="GET", params=params, target_indicator=ip)
if response and response.status_code == 200:
data = response.json()
if data.get('response_code') == 1:
return {
'positives': data.get('positives', 0),
'total': data.get('total', 0),
'scan_date': data.get('scan_date', ''),
'permalink': data.get('permalink', ''),
'reputation_score': self._calculate_reputation_score(data)
}
except Exception as e:
self.logger.logger.error(f"Error getting VirusTotal reputation for IP {ip}: {e}")
return {}
def _calculate_reputation_score(self, data: Dict[str, Any]) -> float:
"""Calculate a normalized reputation score (0.0 to 1.0)."""
positives = data.get('positives', 0)
total = data.get('total', 1) # Avoid division by zero
if total == 0:
return 1.0 # No data means neutral
# Score is inverse of detection ratio (lower detection = higher reputation)
return max(0.0, 1.0 - (positives / total))