""" Shodan provider for DNSRecon. Discovers IP relationships and infrastructure context through Shodan API. """ import json from typing import List, Dict, Any, Tuple, Optional from urllib.parse import quote from .base_provider import BaseProvider from core.graph_manager import RelationshipType from config import config class ShodanProvider(BaseProvider): """ Provider for querying Shodan API for IP address and hostname information. Requires valid API key and respects Shodan's rate limits. """ def __init__(self): """Initialize Shodan provider with appropriate rate limiting.""" super().__init__( name="shodan", rate_limit=60, # Shodan API has various rate limits depending on plan timeout=30 ) self.base_url = "https://api.shodan.io" self.api_key = config.get_api_key('shodan') def get_name(self) -> str: """Return the provider name.""" return "shodan" def is_available(self) -> bool: """ Check if Shodan provider is available (has valid API key). """ return self.api_key is not None and len(self.api_key.strip()) > 0 def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]: """ Query Shodan for information about a domain. Uses Shodan's hostname search to find associated IPs. Args: domain: Domain to investigate Returns: List of relationships discovered from Shodan data """ if not self._is_valid_domain(domain) or not self.is_available(): return [] relationships = [] try: # Search for hostname in Shodan search_query = f"hostname:{domain}" url = f"{self.base_url}/shodan/host/search" params = { 'key': self.api_key, 'query': search_query, 'minify': True # Get minimal data to reduce bandwidth } response = self.make_request(url, method="GET", params=params, target_indicator=domain) if not response or response.status_code != 200: return [] data = response.json() if 'matches' not in data: return [] # Process search results for match in data['matches']: ip_address = match.get('ip_str') hostnames = match.get('hostnames', []) if ip_address and domain in hostnames: raw_data = { 'ip_address': ip_address, 'hostnames': hostnames, 'country': match.get('location', {}).get('country_name', ''), 'city': match.get('location', {}).get('city', ''), 'isp': match.get('isp', ''), 'org': match.get('org', ''), 'ports': match.get('ports', []), 'last_update': match.get('last_update', '') } relationships.append(( domain, ip_address, RelationshipType.A_RECORD, # Domain resolves to IP RelationshipType.A_RECORD.default_confidence, raw_data )) self.log_relationship_discovery( source_node=domain, target_node=ip_address, relationship_type=RelationshipType.A_RECORD, confidence_score=RelationshipType.A_RECORD.default_confidence, raw_data=raw_data, discovery_method="shodan_hostname_search" ) # Also create relationships to other hostnames on the same IP for hostname in hostnames: if hostname != domain and self._is_valid_domain(hostname): hostname_raw_data = { 'shared_ip': ip_address, 'all_hostnames': hostnames, 'discovery_context': 'shared_hosting' } relationships.append(( domain, hostname, RelationshipType.PASSIVE_DNS, # Shared hosting relationship 0.6, # Lower confidence for shared hosting hostname_raw_data )) self.log_relationship_discovery( source_node=domain, target_node=hostname, relationship_type=RelationshipType.PASSIVE_DNS, confidence_score=0.6, raw_data=hostname_raw_data, discovery_method="shodan_shared_hosting" ) except json.JSONDecodeError as e: self.logger.logger.error(f"Failed to parse JSON response from Shodan: {e}") except Exception as e: self.logger.logger.error(f"Error querying Shodan for domain {domain}: {e}") return relationships def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]: """ Query Shodan for information about an IP address. Args: ip: IP address to investigate Returns: List of relationships discovered from Shodan IP data """ if not self._is_valid_ip(ip) or not self.is_available(): return [] relationships = [] try: # Query Shodan host information url = f"{self.base_url}/shodan/host/{ip}" params = {'key': self.api_key} response = self.make_request(url, method="GET", params=params, target_indicator=ip) if not response or response.status_code != 200: return [] data = response.json() # Extract hostname relationships hostnames = data.get('hostnames', []) for hostname in hostnames: if self._is_valid_domain(hostname): raw_data = { 'ip_address': ip, 'hostname': hostname, 'country': data.get('country_name', ''), 'city': data.get('city', ''), 'isp': data.get('isp', ''), 'org': data.get('org', ''), 'asn': data.get('asn', ''), 'ports': data.get('ports', []), 'last_update': data.get('last_update', ''), 'os': data.get('os', '') } relationships.append(( ip, hostname, RelationshipType.A_RECORD, # IP resolves to hostname RelationshipType.A_RECORD.default_confidence, raw_data )) self.log_relationship_discovery( source_node=ip, target_node=hostname, relationship_type=RelationshipType.A_RECORD, confidence_score=RelationshipType.A_RECORD.default_confidence, raw_data=raw_data, discovery_method="shodan_host_lookup" ) # Extract ASN relationship if available asn = data.get('asn') if asn: asn_name = f"AS{asn}" asn_raw_data = { 'ip_address': ip, 'asn': asn, 'isp': data.get('isp', ''), 'org': data.get('org', '') } relationships.append(( ip, asn_name, RelationshipType.ASN_MEMBERSHIP, RelationshipType.ASN_MEMBERSHIP.default_confidence, asn_raw_data )) self.log_relationship_discovery( source_node=ip, target_node=asn_name, relationship_type=RelationshipType.ASN_MEMBERSHIP, confidence_score=RelationshipType.ASN_MEMBERSHIP.default_confidence, raw_data=asn_raw_data, discovery_method="shodan_asn_lookup" ) except json.JSONDecodeError as e: self.logger.logger.error(f"Failed to parse JSON response from Shodan: {e}") except Exception as e: self.logger.logger.error(f"Error querying Shodan for IP {ip}: {e}") return relationships def search_by_organization(self, org_name: str) -> List[Dict[str, Any]]: """ Search Shodan for hosts belonging to a specific organization. Args: org_name: Organization name to search for Returns: List of host information dictionaries """ if not self.is_available(): return [] try: search_query = f"org:\"{org_name}\"" url = f"{self.base_url}/shodan/host/search" params = { 'key': self.api_key, 'query': search_query, 'minify': True } response = self.make_request(url, method="GET", params=params, target_indicator=org_name) if response and response.status_code == 200: data = response.json() return data.get('matches', []) except Exception as e: self.logger.logger.error(f"Error searching Shodan by organization {org_name}: {e}") return [] def get_host_services(self, ip: str) -> List[Dict[str, Any]]: """ Get service information for a specific IP address. Args: ip: IP address to query Returns: List of service information dictionaries """ if not self._is_valid_ip(ip) or not self.is_available(): return [] try: url = f"{self.base_url}/shodan/host/{ip}" params = {'key': self.api_key} response = self.make_request(url, method="GET", params=params, target_indicator=ip) if response and response.status_code == 200: data = response.json() return data.get('data', []) # Service banners except Exception as e: self.logger.logger.error(f"Error getting Shodan services for IP {ip}: {e}") return []