# dnsrecon/providers/shodan_provider.py import json from pathlib import Path from typing import Dict, Any from datetime import datetime, timezone import requests from .base_provider import BaseProvider from core.provider_result import ProviderResult from utils.helpers import _is_valid_ip, _is_valid_domain class ShodanProvider(BaseProvider): """ Provider for querying Shodan API for IP address information. Now returns standardized ProviderResult objects with caching support. """ def __init__(self, name=None, session_config=None): """Initialize Shodan provider with session-specific configuration.""" super().__init__( name="shodan", rate_limit=60, timeout=30, session_config=session_config ) self.base_url = "https://api.shodan.io" self.api_key = self.config.get_api_key('shodan') # Initialize cache directory self.cache_dir = Path('cache') / 'shodan' self.cache_dir.mkdir(parents=True, exist_ok=True) def is_available(self) -> bool: """Check if Shodan provider is available (has valid API key in this session).""" return self.api_key is not None and len(self.api_key.strip()) > 0 def get_name(self) -> str: """Return the provider name.""" return "shodan" def get_display_name(self) -> str: """Return the provider display name for the UI.""" return "Shodan" def requires_api_key(self) -> bool: """Return True if the provider requires an API key.""" return True def get_eligibility(self) -> Dict[str, bool]: """Return a dictionary indicating if the provider can query domains and/or IPs.""" return {'domains': False, 'ips': True} def _get_cache_file_path(self, ip: str) -> Path: """Generate cache file path for an IP address.""" safe_ip = ip.replace('.', '_').replace(':', '_') return self.cache_dir / f"{safe_ip}.json" def _get_cache_status(self, cache_file_path: Path) -> str: """ Check cache status for an IP. Returns: 'not_found', 'fresh', or 'stale' """ if not cache_file_path.exists(): return "not_found" try: with open(cache_file_path, 'r') as f: cache_data = json.load(f) last_query_str = cache_data.get("last_upstream_query") if not last_query_str: return "stale" last_query = datetime.fromisoformat(last_query_str.replace('Z', '+00:00')) hours_since_query = (datetime.now(timezone.utc) - last_query).total_seconds() / 3600 cache_timeout = self.config.cache_timeout_hours if hours_since_query < cache_timeout: return "fresh" else: return "stale" except (json.JSONDecodeError, ValueError, KeyError): return "stale" def query_domain(self, domain: str) -> ProviderResult: """ Domain queries are no longer supported for the Shodan provider. Args: domain: Domain to investigate Returns: Empty ProviderResult """ return ProviderResult() def query_ip(self, ip: str) -> ProviderResult: """ Query Shodan for information about an IP address, with caching of processed data. Args: ip: IP address to investigate Returns: ProviderResult containing discovered relationships and attributes """ if not _is_valid_ip(ip) or not self.is_available(): return ProviderResult() cache_file = self._get_cache_file_path(ip) cache_status = self._get_cache_status(cache_file) result = ProviderResult() try: if cache_status == "fresh": result = self._load_from_cache(cache_file) self.logger.logger.info(f"Using cached Shodan data for {ip}") else: # "stale" or "not_found" url = f"{self.base_url}/shodan/host/{ip}" params = {'key': self.api_key} response = self.make_request(url, method="GET", params=params, target_indicator=ip) if response and response.status_code == 200: data = response.json() # Process the data into ProviderResult BEFORE caching result = self._process_shodan_data(ip, data) self._save_to_cache(cache_file, result, data) # Save both result and raw data elif cache_status == "stale": # If API fails on a stale cache, use the old data result = self._load_from_cache(cache_file) except requests.exceptions.RequestException as e: self.logger.logger.error(f"Shodan API query failed for {ip}: {e}") if cache_status == "stale": result = self._load_from_cache(cache_file) return result def _load_from_cache(self, cache_file_path: Path) -> ProviderResult: """Load processed Shodan data from a cache file.""" try: with open(cache_file_path, 'r') as f: cache_content = json.load(f) result = ProviderResult() # Reconstruct relationships for rel_data in cache_content.get("relationships", []): result.add_relationship( source_node=rel_data["source_node"], target_node=rel_data["target_node"], relationship_type=rel_data["relationship_type"], provider=rel_data["provider"], confidence=rel_data["confidence"], raw_data=rel_data.get("raw_data", {}) ) # Reconstruct attributes for attr_data in cache_content.get("attributes", []): result.add_attribute( target_node=attr_data["target_node"], name=attr_data["name"], value=attr_data["value"], attr_type=attr_data["type"], provider=attr_data["provider"], confidence=attr_data["confidence"], metadata=attr_data.get("metadata", {}) ) return result except (json.JSONDecodeError, FileNotFoundError, KeyError): return ProviderResult() def _save_to_cache(self, cache_file_path: Path, result: ProviderResult, raw_data: Dict[str, Any]) -> None: """Save processed Shodan data to a cache file.""" try: cache_data = { "last_upstream_query": datetime.now(timezone.utc).isoformat(), "raw_data": raw_data, # Preserve original for forensic purposes "relationships": [ { "source_node": rel.source_node, "target_node": rel.target_node, "relationship_type": rel.relationship_type, "confidence": rel.confidence, "provider": rel.provider, "raw_data": rel.raw_data } for rel in result.relationships ], "attributes": [ { "target_node": attr.target_node, "name": attr.name, "value": attr.value, "type": attr.type, "provider": attr.provider, "confidence": attr.confidence, "metadata": attr.metadata } for attr in result.attributes ] } with open(cache_file_path, 'w') as f: json.dump(cache_data, f, separators=(',', ':'), default=str) except Exception as e: self.logger.logger.warning(f"Failed to save Shodan cache for {cache_file_path.name}: {e}") def _process_shodan_data(self, ip: str, data: Dict[str, Any]) -> ProviderResult: """ UPDATED: Process Shodan data with raw attribute names and values. """ result = ProviderResult() isp_name = data.get('org') asn_value = data.get('asn') if isp_name and asn_value: result.add_relationship( source_node=ip, target_node=isp_name, relationship_type='ip_to_isp', provider=self.name, confidence=0.9, raw_data={'asn': asn_value} ) result.add_attribute( target_node=isp_name, name='asn', value=asn_value, attr_type='isp_info', provider=self.name, confidence=0.9 ) for key, value in data.items(): if key == 'hostnames': for hostname in value: if _is_valid_domain(hostname): result.add_relationship( source_node=ip, target_node=hostname, relationship_type='shodan_a_record', provider=self.name, confidence=0.8, raw_data=data ) self.log_relationship_discovery( source_node=ip, target_node=hostname, relationship_type='shodan_a_record', confidence_score=0.8, raw_data=data, discovery_method="shodan_host_lookup" ) elif key == 'ports': for port in value: result.add_attribute( target_node=ip, name='shodan_open_port', value=port, attr_type='shodan_network_info', provider=self.name, confidence=0.9 ) elif isinstance(value, (str, int, float, bool)) and value is not None: # UPDATED: Keep raw Shodan field names (no "shodan_" prefix) result.add_attribute( target_node=ip, name=key, # Raw field name from Shodan API value=value, attr_type='shodan_info', provider=self.name, confidence=0.9 ) return result