# dnsrecon/providers/shodan_provider.py import json import os from pathlib import Path from typing import List, Dict, Any, Tuple from datetime import datetime, timezone import requests from .base_provider import BaseProvider from utils.helpers import _is_valid_ip, _is_valid_domain class ShodanProvider(BaseProvider): """ Provider for querying Shodan API for IP address information. Now uses session-specific API keys, is limited to IP-only queries, and includes caching. """ def __init__(self, name=None, session_config=None): """Initialize Shodan provider with session-specific configuration.""" super().__init__( name="shodan", rate_limit=60, timeout=30, session_config=session_config ) self.base_url = "https://api.shodan.io" self.api_key = self.config.get_api_key('shodan') # Initialize cache directory self.cache_dir = Path('cache') / 'shodan' self.cache_dir.mkdir(parents=True, exist_ok=True) def is_available(self) -> bool: """Check if Shodan provider is available (has valid API key in this session).""" return self.api_key is not None and len(self.api_key.strip()) > 0 def get_name(self) -> str: """Return the provider name.""" return "shodan" def get_display_name(self) -> str: """Return the provider display name for the UI.""" return "Shodan" def requires_api_key(self) -> bool: """Return True if the provider requires an API key.""" return True def get_eligibility(self) -> Dict[str, bool]: """Return a dictionary indicating if the provider can query domains and/or IPs.""" return {'domains': False, 'ips': True} def _get_cache_file_path(self, ip: str) -> Path: """Generate cache file path for an IP address.""" safe_ip = ip.replace('.', '_').replace(':', '_') return self.cache_dir / f"{safe_ip}.json" def _get_cache_status(self, cache_file_path: Path) -> str: """ Check cache status for an IP. Returns: 'not_found', 'fresh', or 'stale' """ if not cache_file_path.exists(): return "not_found" try: with open(cache_file_path, 'r') as f: cache_data = json.load(f) last_query_str = cache_data.get("last_upstream_query") if not last_query_str: return "stale" last_query = datetime.fromisoformat(last_query_str.replace('Z', '+00:00')) hours_since_query = (datetime.now(timezone.utc) - last_query).total_seconds() / 3600 cache_timeout = self.config.cache_timeout_hours if hours_since_query < cache_timeout: return "fresh" else: return "stale" except (json.JSONDecodeError, ValueError, KeyError): return "stale" def _load_from_cache(self, cache_file_path: Path) -> Dict[str, Any]: """Load Shodan data from a cache file.""" try: with open(cache_file_path, 'r') as f: return json.load(f) except (json.JSONDecodeError, FileNotFoundError): return {} def _save_to_cache(self, cache_file_path: Path, data: Dict[str, Any]) -> None: """Save Shodan data to a cache file.""" try: data['last_upstream_query'] = datetime.now(timezone.utc).isoformat() with open(cache_file_path, 'w') as f: json.dump(data, f, separators=(',', ':')) except Exception as e: self.logger.logger.warning(f"Failed to save Shodan cache for {cache_file_path.name}: {e}") def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ Domain queries are no longer supported for the Shodan provider. """ return [] def query_ip(self, ip: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ Query Shodan for information about an IP address, with caching. """ if not _is_valid_ip(ip) or not self.is_available(): return [] cache_file = self._get_cache_file_path(ip) cache_status = self._get_cache_status(cache_file) data = {} try: if cache_status == "fresh": data = self._load_from_cache(cache_file) self.logger.logger.info(f"Using cached Shodan data for {ip}") else: # "stale" or "not_found" url = f"{self.base_url}/shodan/host/{ip}" params = {'key': self.api_key} response = self.make_request(url, method="GET", params=params, target_indicator=ip) if response and response.status_code == 200: data = response.json() self._save_to_cache(cache_file, data) elif cache_status == "stale": # If API fails on a stale cache, use the old data data = self._load_from_cache(cache_file) except requests.exceptions.RequestException as e: self.logger.logger.error(f"Shodan API query failed for {ip}: {e}") if cache_status == "stale": data = self._load_from_cache(cache_file) if not data: return [] return self._process_shodan_data(ip, data) def _process_shodan_data(self, ip: str, data: Dict[str, Any]) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ Process Shodan data to extract relationships. """ relationships = [] # Extract hostname relationships hostnames = data.get('hostnames', []) for hostname in hostnames: if _is_valid_domain(hostname): relationships.append(( ip, hostname, 'a_record', 0.8, data )) self.log_relationship_discovery( source_node=ip, target_node=hostname, relationship_type='a_record', confidence_score=0.8, raw_data=data, discovery_method="shodan_host_lookup" ) # Extract ASN relationship asn = data.get('asn') if asn: asn_name = f"AS{asn[2:]}" if isinstance(asn, str) and asn.startswith('AS') else f"AS{asn}" relationships.append(( ip, asn_name, 'asn_membership', 0.7, data )) self.log_relationship_discovery( source_node=ip, target_node=asn_name, relationship_type='asn_membership', confidence_score=0.7, raw_data=data, discovery_method="shodan_asn_lookup" ) return relationships