""" Shodan provider for DNSRecon. Discovers IP relationships and infrastructure context through Shodan API. """ import json from typing import List, Dict, Any, Tuple from .base_provider import BaseProvider from utils.helpers import _is_valid_ip, _is_valid_domain class ShodanProvider(BaseProvider): """ Provider for querying Shodan API for IP address and hostname information. Now uses session-specific API keys. """ def __init__(self, name=None, session_config=None): """Initialize Shodan provider with session-specific configuration.""" super().__init__( name="shodan", rate_limit=60, timeout=30, session_config=session_config ) self.base_url = "https://api.shodan.io" self.api_key = self.config.get_api_key('shodan') def is_available(self) -> bool: """Check if Shodan provider is available (has valid API key in this session).""" return self.api_key is not None and len(self.api_key.strip()) > 0 def get_name(self) -> str: """Return the provider name.""" return "shodan" def get_display_name(self) -> str: """Return the provider display name for the UI.""" return "shodan" def requires_api_key(self) -> bool: """Return True if the provider requires an API key.""" return True def get_eligibility(self) -> Dict[str, bool]: """Return a dictionary indicating if the provider can query domains and/or IPs.""" return {'domains': True, 'ips': True} def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ Query Shodan for information about a domain. Uses Shodan's hostname search to find associated IPs. Args: domain: Domain to investigate Returns: List of relationships discovered from Shodan data """ if not _is_valid_domain(domain) or not self.is_available(): return [] relationships = [] try: # Search for hostname in Shodan search_query = f"hostname:{domain}" url = f"{self.base_url}/shodan/host/search" params = { 'key': self.api_key, 'query': search_query, 'minify': True # Get minimal data to reduce bandwidth } response = self.make_request(url, method="GET", params=params, target_indicator=domain) if not response or response.status_code != 200: return [] data = response.json() if 'matches' not in data: return [] # Process search results for match in data['matches']: ip_address = match.get('ip_str') hostnames = match.get('hostnames', []) if ip_address and domain in hostnames: raw_data = { 'ip_address': ip_address, 'hostnames': hostnames, 'country': match.get('location', {}).get('country_name', ''), 'city': match.get('location', {}).get('city', ''), 'isp': match.get('isp', ''), 'org': match.get('org', ''), 'ports': match.get('ports', []), 'last_update': match.get('last_update', '') } relationships.append(( domain, ip_address, 'a_record', # Domain resolves to IP 0.8, raw_data )) self.log_relationship_discovery( source_node=domain, target_node=ip_address, relationship_type='a_record', confidence_score=0.8, raw_data=raw_data, discovery_method="shodan_hostname_search" ) # Also create relationships to other hostnames on the same IP for hostname in hostnames: if hostname != domain and _is_valid_domain(hostname): hostname_raw_data = { 'shared_ip': ip_address, 'all_hostnames': hostnames, 'discovery_context': 'shared_hosting' } relationships.append(( domain, hostname, 'passive_dns', # Shared hosting relationship 0.6, # Lower confidence for shared hosting hostname_raw_data )) self.log_relationship_discovery( source_node=domain, target_node=hostname, relationship_type='passive_dns', confidence_score=0.6, raw_data=hostname_raw_data, discovery_method="shodan_shared_hosting" ) except json.JSONDecodeError as e: self.logger.logger.error(f"Failed to parse JSON response from Shodan: {e}") return relationships def query_ip(self, ip: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ Query Shodan for information about an IP address. Args: ip: IP address to investigate Returns: List of relationships discovered from Shodan IP data """ if not _is_valid_ip(ip) or not self.is_available(): return [] relationships = [] try: # Query Shodan host information url = f"{self.base_url}/shodan/host/{ip}" params = {'key': self.api_key} response = self.make_request(url, method="GET", params=params, target_indicator=ip) if not response or response.status_code != 200: return [] data = response.json() # Extract hostname relationships hostnames = data.get('hostnames', []) for hostname in hostnames: if _is_valid_domain(hostname): raw_data = { 'ip_address': ip, 'hostname': hostname, 'country': data.get('country_name', ''), 'city': data.get('city', ''), 'isp': data.get('isp', ''), 'org': data.get('org', ''), 'asn': data.get('asn', ''), 'ports': data.get('ports', []), 'last_update': data.get('last_update', ''), 'os': data.get('os', '') } relationships.append(( ip, hostname, 'a_record', # IP resolves to hostname 0.8, raw_data )) self.log_relationship_discovery( source_node=ip, target_node=hostname, relationship_type='a_record', confidence_score=0.8, raw_data=raw_data, discovery_method="shodan_host_lookup" ) # Extract ASN relationship if available asn = data.get('asn') if asn: # Ensure the ASN starts with "AS" if isinstance(asn, str) and asn.startswith('AS'): asn_name = asn asn_number = asn[2:] else: asn_name = f"AS{asn}" asn_number = str(asn) asn_raw_data = { 'ip_address': ip, 'asn': asn_number, 'isp': data.get('isp', ''), 'org': data.get('org', '') } relationships.append(( ip, asn_name, 'asn_membership', 0.7, asn_raw_data )) self.log_relationship_discovery( source_node=ip, target_node=asn_name, relationship_type='asn_membership', confidence_score=0.7, raw_data=asn_raw_data, discovery_method="shodan_asn_lookup" ) except json.JSONDecodeError as e: self.logger.logger.error(f"Failed to parse JSON response from Shodan: {e}") return relationships def search_by_organization(self, org_name: str) -> List[Dict[str, Any]]: """ Search Shodan for hosts belonging to a specific organization. Args: org_name: Organization name to search for Returns: List of host information dictionaries """ if not self.is_available(): return [] try: search_query = f"org:\"{org_name}\"" url = f"{self.base_url}/shodan/host/search" params = { 'key': self.api_key, 'query': search_query, 'minify': True } response = self.make_request(url, method="GET", params=params, target_indicator=org_name) if response and response.status_code == 200: data = response.json() return data.get('matches', []) except Exception as e: self.logger.logger.error(f"Error searching Shodan by organization {org_name}: {e}") return [] def get_host_services(self, ip: str) -> List[Dict[str, Any]]: """ Get service information for a specific IP address. Args: ip: IP address to query Returns: List of service information dictionaries """ if not _is_valid_ip(ip) or not self.is_available(): return [] try: url = f"{self.base_url}/shodan/host/{ip}" params = {'key': self.api_key} response = self.make_request(url, method="GET", params=params, target_indicator=ip) if response and response.status_code == 200: data = response.json() return data.get('data', []) # Service banners except Exception as e: self.logger.logger.error(f"Error getting Shodan services for IP {ip}: {e}") return []