dnscope/src/virustotal_client.py
overcuriousity 0c9cf00a3b progress
2025-09-09 14:54:02 +02:00

214 lines
9.1 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# File: src/virustotal_client.py
"""VirusTotal API integration."""
import requests
import time
import logging
from datetime import datetime
from typing import Optional
from .data_structures import VirusTotalResult
from .config import Config
# Module logger
logger = logging.getLogger(__name__)
class VirusTotalClient:
"""VirusTotal API client."""
BASE_URL = "https://www.virustotal.com/vtapi/v2"
def __init__(self, api_key: str, config: Config):
self.api_key = api_key
self.config = config
self.last_request = 0
logger.info(f"🛡️ VirusTotal client initialized with API key ending in: ...{api_key[-4:] if len(api_key) > 4 else api_key}")
def _rate_limit(self):
"""Apply rate limiting for VirusTotal."""
now = time.time()
time_since_last = now - self.last_request
min_interval = 1.0 / self.config.VIRUSTOTAL_RATE_LIMIT
if time_since_last < min_interval:
sleep_time = min_interval - time_since_last
logger.debug(f"⏸️ VirusTotal rate limiting: sleeping for {sleep_time:.2f}s")
time.sleep(sleep_time)
self.last_request = time.time()
def lookup_ip(self, ip: str) -> Optional[VirusTotalResult]:
"""Lookup IP address reputation."""
self._rate_limit()
logger.debug(f"🔍 Querying VirusTotal for IP: {ip}")
try:
url = f"{self.BASE_URL}/ip-address/report"
params = {
'apikey': self.api_key,
'ip': ip
}
response = requests.get(
url,
params=params,
timeout=self.config.HTTP_TIMEOUT,
headers={'User-Agent': 'DNS-Recon-Tool/1.0'}
)
logger.debug(f"📡 VirusTotal API response for IP {ip}: {response.status_code}")
if response.status_code == 200:
data = response.json()
logger.debug(f"VirusTotal IP response data keys: {data.keys()}")
if data.get('response_code') == 1:
# Count detected URLs
detected_urls = data.get('detected_urls', [])
positives = sum(1 for url in detected_urls if url.get('positives', 0) > 0)
total = len(detected_urls)
# Parse scan date
scan_date = datetime.now()
if data.get('scan_date'):
try:
scan_date = datetime.fromisoformat(data['scan_date'].replace('Z', '+00:00'))
except ValueError:
try:
scan_date = datetime.strptime(data['scan_date'], '%Y-%m-%d %H:%M:%S')
except ValueError:
logger.debug(f"Could not parse scan_date: {data.get('scan_date')}")
result = VirusTotalResult(
resource=ip,
positives=positives,
total=total,
scan_date=scan_date,
permalink=data.get('permalink', f'https://www.virustotal.com/gui/ip-address/{ip}')
)
logger.info(f"✅ VirusTotal result for IP {ip}: {result.positives}/{result.total} detections")
return result
elif data.get('response_code') == 0:
logger.debug(f" IP {ip} not found in VirusTotal database")
return None
else:
logger.debug(f"VirusTotal returned response_code: {data.get('response_code')}")
return None
elif response.status_code == 204:
logger.warning("⚠️ VirusTotal API rate limit exceeded")
return None
elif response.status_code == 403:
logger.error("❌ VirusTotal API key is invalid or lacks permissions")
return None
else:
logger.warning(f"⚠️ VirusTotal API error for IP {ip}: HTTP {response.status_code}")
try:
error_data = response.json()
logger.debug(f"VirusTotal error details: {error_data}")
except:
pass
return None
except requests.exceptions.Timeout:
logger.warning(f"⏱️ VirusTotal query timeout for IP {ip}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"🌐 VirusTotal network error for IP {ip}: {e}")
return None
except Exception as e:
logger.error(f"❌ Unexpected error querying VirusTotal for IP {ip}: {e}")
return None
def lookup_domain(self, domain: str) -> Optional[VirusTotalResult]:
"""Lookup domain reputation."""
self._rate_limit()
logger.debug(f"🔍 Querying VirusTotal for domain: {domain}")
try:
url = f"{self.BASE_URL}/domain/report"
params = {
'apikey': self.api_key,
'domain': domain
}
response = requests.get(
url,
params=params,
timeout=self.config.HTTP_TIMEOUT,
headers={'User-Agent': 'DNS-Recon-Tool/1.0'}
)
logger.debug(f"📡 VirusTotal API response for domain {domain}: {response.status_code}")
if response.status_code == 200:
data = response.json()
logger.debug(f"VirusTotal domain response data keys: {data.keys()}")
if data.get('response_code') == 1:
# Count detected URLs
detected_urls = data.get('detected_urls', [])
positives = sum(1 for url in detected_urls if url.get('positives', 0) > 0)
total = len(detected_urls)
# Also check for malicious/suspicious categories
categories = data.get('categories', [])
if any(cat in ['malicious', 'suspicious', 'phishing', 'malware']
for cat in categories):
positives += 1
# Parse scan date
scan_date = datetime.now()
if data.get('scan_date'):
try:
scan_date = datetime.fromisoformat(data['scan_date'].replace('Z', '+00:00'))
except ValueError:
try:
scan_date = datetime.strptime(data['scan_date'], '%Y-%m-%d %H:%M:%S')
except ValueError:
logger.debug(f"Could not parse scan_date: {data.get('scan_date')}")
result = VirusTotalResult(
resource=domain,
positives=positives,
total=max(total, 1), # Ensure total is at least 1
scan_date=scan_date,
permalink=data.get('permalink', f'https://www.virustotal.com/gui/domain/{domain}')
)
logger.info(f"✅ VirusTotal result for domain {domain}: {result.positives}/{result.total} detections")
return result
elif data.get('response_code') == 0:
logger.debug(f" Domain {domain} not found in VirusTotal database")
return None
else:
logger.debug(f"VirusTotal returned response_code: {data.get('response_code')}")
return None
elif response.status_code == 204:
logger.warning("⚠️ VirusTotal API rate limit exceeded")
return None
elif response.status_code == 403:
logger.error("❌ VirusTotal API key is invalid or lacks permissions")
return None
else:
logger.warning(f"⚠️ VirusTotal API error for domain {domain}: HTTP {response.status_code}")
try:
error_data = response.json()
logger.debug(f"VirusTotal error details: {error_data}")
except:
pass
return None
except requests.exceptions.Timeout:
logger.warning(f"⏱️ VirusTotal query timeout for domain {domain}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"🌐 VirusTotal network error for domain {domain}: {e}")
return None
except Exception as e:
logger.error(f"❌ Unexpected error querying VirusTotal for domain {domain}: {e}")
return None