310 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			310 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
Shodan provider for DNSRecon.
 | 
						|
Discovers IP relationships and infrastructure context through Shodan API.
 | 
						|
"""
 | 
						|
 | 
						|
import json
 | 
						|
from typing import List, Dict, Any, Tuple
 | 
						|
from .base_provider import BaseProvider
 | 
						|
from utils.helpers import _is_valid_ip, _is_valid_domain
 | 
						|
 | 
						|
 | 
						|
class ShodanProvider(BaseProvider):
 | 
						|
    """
 | 
						|
    Provider for querying Shodan API for IP address and hostname information.
 | 
						|
    Now uses session-specific API keys.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, session_config=None):
 | 
						|
        """Initialize Shodan provider with session-specific configuration."""
 | 
						|
        super().__init__(
 | 
						|
            name="shodan",
 | 
						|
            rate_limit=60,
 | 
						|
            timeout=30,
 | 
						|
            session_config=session_config
 | 
						|
        )
 | 
						|
        self.base_url = "https://api.shodan.io"
 | 
						|
        self.api_key = self.config.get_api_key('shodan')
 | 
						|
 | 
						|
    def is_available(self) -> bool:
 | 
						|
        """Check if Shodan provider is available (has valid API key in this session)."""
 | 
						|
        return self.api_key is not None and len(self.api_key.strip()) > 0
 | 
						|
 | 
						|
    def get_name(self) -> str:
 | 
						|
        """Return the provider name."""
 | 
						|
        return "shodan"
 | 
						|
 | 
						|
    def get_display_name(self) -> str:
 | 
						|
        """Return the provider display name for the UI."""
 | 
						|
        return "shodan"
 | 
						|
 | 
						|
    def requires_api_key(self) -> bool:
 | 
						|
        """Return True if the provider requires an API key."""
 | 
						|
        return True
 | 
						|
 | 
						|
    def get_eligibility(self) -> Dict[str, bool]:
 | 
						|
        """Return a dictionary indicating if the provider can query domains and/or IPs."""
 | 
						|
        return {'domains': True, 'ips': True}
 | 
						|
 | 
						|
    def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
 | 
						|
        """
 | 
						|
        Query Shodan for information about a domain.
 | 
						|
        Uses Shodan's hostname search to find associated IPs.
 | 
						|
 | 
						|
        Args:
 | 
						|
            domain: Domain to investigate
 | 
						|
 | 
						|
        Returns:
 | 
						|
            List of relationships discovered from Shodan data
 | 
						|
        """
 | 
						|
        if not _is_valid_domain(domain) or not self.is_available():
 | 
						|
            return []
 | 
						|
 | 
						|
        relationships = []
 | 
						|
 | 
						|
        try:
 | 
						|
            # Search for hostname in Shodan
 | 
						|
            search_query = f"hostname:{domain}"
 | 
						|
            url = f"{self.base_url}/shodan/host/search"
 | 
						|
            params = {
 | 
						|
                'key': self.api_key,
 | 
						|
                'query': search_query,
 | 
						|
                'minify': True  # Get minimal data to reduce bandwidth
 | 
						|
            }
 | 
						|
 | 
						|
            response = self.make_request(url, method="GET", params=params, target_indicator=domain)
 | 
						|
 | 
						|
            if not response or response.status_code != 200:
 | 
						|
                return []
 | 
						|
 | 
						|
            data = response.json()
 | 
						|
 | 
						|
            if 'matches' not in data:
 | 
						|
                return []
 | 
						|
 | 
						|
            # Process search results
 | 
						|
            for match in data['matches']:
 | 
						|
                ip_address = match.get('ip_str')
 | 
						|
                hostnames = match.get('hostnames', [])
 | 
						|
 | 
						|
                if ip_address and domain in hostnames:
 | 
						|
                    raw_data = {
 | 
						|
                        'ip_address': ip_address,
 | 
						|
                        'hostnames': hostnames,
 | 
						|
                        'country': match.get('location', {}).get('country_name', ''),
 | 
						|
                        'city': match.get('location', {}).get('city', ''),
 | 
						|
                        'isp': match.get('isp', ''),
 | 
						|
                        'org': match.get('org', ''),
 | 
						|
                        'ports': match.get('ports', []),
 | 
						|
                        'last_update': match.get('last_update', '')
 | 
						|
                    }
 | 
						|
 | 
						|
                    relationships.append((
 | 
						|
                        domain,
 | 
						|
                        ip_address,
 | 
						|
                        'a_record',  # Domain resolves to IP
 | 
						|
                        0.8,
 | 
						|
                        raw_data
 | 
						|
                    ))
 | 
						|
 | 
						|
                    self.log_relationship_discovery(
 | 
						|
                        source_node=domain,
 | 
						|
                        target_node=ip_address,
 | 
						|
                        relationship_type='a_record',
 | 
						|
                        confidence_score=0.8,
 | 
						|
                        raw_data=raw_data,
 | 
						|
                        discovery_method="shodan_hostname_search"
 | 
						|
                    )
 | 
						|
 | 
						|
                    # Also create relationships to other hostnames on the same IP
 | 
						|
                    for hostname in hostnames:
 | 
						|
                        if hostname != domain and _is_valid_domain(hostname):
 | 
						|
                            hostname_raw_data = {
 | 
						|
                                'shared_ip': ip_address,
 | 
						|
                                'all_hostnames': hostnames,
 | 
						|
                                'discovery_context': 'shared_hosting'
 | 
						|
                            }
 | 
						|
 | 
						|
                            relationships.append((
 | 
						|
                                domain,
 | 
						|
                                hostname,
 | 
						|
                                'passive_dns',  # Shared hosting relationship
 | 
						|
                                0.6,  # Lower confidence for shared hosting
 | 
						|
                                hostname_raw_data
 | 
						|
                            ))
 | 
						|
 | 
						|
                            self.log_relationship_discovery(
 | 
						|
                                source_node=domain,
 | 
						|
                                target_node=hostname,
 | 
						|
                                relationship_type='passive_dns',
 | 
						|
                                confidence_score=0.6,
 | 
						|
                                raw_data=hostname_raw_data,
 | 
						|
                                discovery_method="shodan_shared_hosting"
 | 
						|
                            )
 | 
						|
 | 
						|
        except json.JSONDecodeError as e:
 | 
						|
            self.logger.logger.error(f"Failed to parse JSON response from Shodan: {e}")
 | 
						|
 | 
						|
        return relationships
 | 
						|
 | 
						|
    def query_ip(self, ip: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
 | 
						|
        """
 | 
						|
        Query Shodan for information about an IP address.
 | 
						|
 | 
						|
        Args:
 | 
						|
            ip: IP address to investigate
 | 
						|
 | 
						|
        Returns:
 | 
						|
            List of relationships discovered from Shodan IP data
 | 
						|
        """
 | 
						|
        if not _is_valid_ip(ip) or not self.is_available():
 | 
						|
            return []
 | 
						|
 | 
						|
        relationships = []
 | 
						|
 | 
						|
        try:
 | 
						|
            # Query Shodan host information
 | 
						|
            url = f"{self.base_url}/shodan/host/{ip}"
 | 
						|
            params = {'key': self.api_key}
 | 
						|
 | 
						|
            response = self.make_request(url, method="GET", params=params, target_indicator=ip)
 | 
						|
 | 
						|
            if not response or response.status_code != 200:
 | 
						|
                return []
 | 
						|
 | 
						|
            data = response.json()
 | 
						|
 | 
						|
            # Extract hostname relationships
 | 
						|
            hostnames = data.get('hostnames', [])
 | 
						|
            for hostname in hostnames:
 | 
						|
                if _is_valid_domain(hostname):
 | 
						|
                    raw_data = {
 | 
						|
                        'ip_address': ip,
 | 
						|
                        'hostname': hostname,
 | 
						|
                        'country': data.get('country_name', ''),
 | 
						|
                        'city': data.get('city', ''),
 | 
						|
                        'isp': data.get('isp', ''),
 | 
						|
                        'org': data.get('org', ''),
 | 
						|
                        'asn': data.get('asn', ''),
 | 
						|
                        'ports': data.get('ports', []),
 | 
						|
                        'last_update': data.get('last_update', ''),
 | 
						|
                        'os': data.get('os', '')
 | 
						|
                    }
 | 
						|
 | 
						|
                    relationships.append((
 | 
						|
                        ip,
 | 
						|
                        hostname,
 | 
						|
                        'a_record',  # IP resolves to hostname
 | 
						|
                        0.8,
 | 
						|
                        raw_data
 | 
						|
                    ))
 | 
						|
 | 
						|
                    self.log_relationship_discovery(
 | 
						|
                        source_node=ip,
 | 
						|
                        target_node=hostname,
 | 
						|
                        relationship_type='a_record',
 | 
						|
                        confidence_score=0.8,
 | 
						|
                        raw_data=raw_data,
 | 
						|
                        discovery_method="shodan_host_lookup"
 | 
						|
                    )
 | 
						|
 | 
						|
            # Extract ASN relationship if available
 | 
						|
            asn = data.get('asn')
 | 
						|
            if asn:
 | 
						|
                # Ensure the ASN starts with "AS"
 | 
						|
                if isinstance(asn, str) and asn.startswith('AS'):
 | 
						|
                    asn_name = asn
 | 
						|
                    asn_number = asn[2:]
 | 
						|
                else:
 | 
						|
                    asn_name = f"AS{asn}"
 | 
						|
                    asn_number = str(asn)
 | 
						|
 | 
						|
                asn_raw_data = {
 | 
						|
                    'ip_address': ip,
 | 
						|
                    'asn': asn_number,
 | 
						|
                    'isp': data.get('isp', ''),
 | 
						|
                    'org': data.get('org', '')
 | 
						|
                }
 | 
						|
 | 
						|
                relationships.append((
 | 
						|
                    ip,
 | 
						|
                    asn_name,
 | 
						|
                    'asn_membership',
 | 
						|
                    0.7,
 | 
						|
                    asn_raw_data
 | 
						|
                ))
 | 
						|
 | 
						|
                self.log_relationship_discovery(
 | 
						|
                    source_node=ip,
 | 
						|
                    target_node=asn_name,
 | 
						|
                    relationship_type='asn_membership',
 | 
						|
                    confidence_score=0.7,
 | 
						|
                    raw_data=asn_raw_data,
 | 
						|
                    discovery_method="shodan_asn_lookup"
 | 
						|
                )
 | 
						|
 | 
						|
        except json.JSONDecodeError as e:
 | 
						|
            self.logger.logger.error(f"Failed to parse JSON response from Shodan: {e}")
 | 
						|
 | 
						|
        return relationships
 | 
						|
 | 
						|
    def search_by_organization(self, org_name: str) -> List[Dict[str, Any]]:
 | 
						|
        """
 | 
						|
        Search Shodan for hosts belonging to a specific organization.
 | 
						|
 | 
						|
        Args:
 | 
						|
            org_name: Organization name to search for
 | 
						|
 | 
						|
        Returns:
 | 
						|
            List of host information dictionaries
 | 
						|
        """
 | 
						|
        if not self.is_available():
 | 
						|
            return []
 | 
						|
 | 
						|
        try:
 | 
						|
            search_query = f"org:\"{org_name}\""
 | 
						|
            url = f"{self.base_url}/shodan/host/search"
 | 
						|
            params = {
 | 
						|
                'key': self.api_key,
 | 
						|
                'query': search_query,
 | 
						|
                'minify': True
 | 
						|
            }
 | 
						|
 | 
						|
            response = self.make_request(url, method="GET", params=params, target_indicator=org_name)
 | 
						|
 | 
						|
            if response and response.status_code == 200:
 | 
						|
                data = response.json()
 | 
						|
                return data.get('matches', [])
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            self.logger.logger.error(f"Error searching Shodan by organization {org_name}: {e}")
 | 
						|
 | 
						|
        return []
 | 
						|
 | 
						|
    def get_host_services(self, ip: str) -> List[Dict[str, Any]]:
 | 
						|
        """
 | 
						|
        Get service information for a specific IP address.
 | 
						|
 | 
						|
        Args:
 | 
						|
            ip: IP address to query
 | 
						|
 | 
						|
        Returns:
 | 
						|
            List of service information dictionaries
 | 
						|
        """
 | 
						|
        if not _is_valid_ip(ip) or not self.is_available():
 | 
						|
            return []
 | 
						|
 | 
						|
        try:
 | 
						|
            url = f"{self.base_url}/shodan/host/{ip}"
 | 
						|
            params = {'key': self.api_key}
 | 
						|
 | 
						|
            response = self.make_request(url, method="GET", params=params, target_indicator=ip)
 | 
						|
 | 
						|
            if response and response.status_code == 200:
 | 
						|
                data = response.json()
 | 
						|
                return data.get('data', [])  # Service banners
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            self.logger.logger.error(f"Error getting Shodan services for IP {ip}: {e}")
 | 
						|
 | 
						|
        return [] |