307 lines
11 KiB
Python
307 lines
11 KiB
Python
# dnsrecon/providers/shodan_provider.py
|
|
|
|
import json
|
|
from typing import List, Dict, Any, Tuple
|
|
from .base_provider import BaseProvider
|
|
from utils.helpers import _is_valid_ip, _is_valid_domain
|
|
|
|
|
|
class ShodanProvider(BaseProvider):
|
|
"""
|
|
Provider for querying Shodan API for IP address and hostname information.
|
|
Now uses session-specific API keys.
|
|
"""
|
|
|
|
def __init__(self, name=None, session_config=None):
|
|
"""Initialize Shodan provider with session-specific configuration."""
|
|
super().__init__(
|
|
name="shodan",
|
|
rate_limit=60,
|
|
timeout=30,
|
|
session_config=session_config
|
|
)
|
|
self.base_url = "https://api.shodan.io"
|
|
self.api_key = self.config.get_api_key('shodan')
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if Shodan provider is available (has valid API key in this session)."""
|
|
return self.api_key is not None and len(self.api_key.strip()) > 0
|
|
|
|
def get_name(self) -> str:
|
|
"""Return the provider name."""
|
|
return "shodan"
|
|
|
|
def get_display_name(self) -> str:
|
|
"""Return the provider display name for the UI."""
|
|
return "shodan"
|
|
|
|
def requires_api_key(self) -> bool:
|
|
"""Return True if the provider requires an API key."""
|
|
return True
|
|
|
|
def get_eligibility(self) -> Dict[str, bool]:
|
|
"""Return a dictionary indicating if the provider can query domains and/or IPs."""
|
|
return {'domains': True, 'ips': True}
|
|
|
|
def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
|
|
"""
|
|
Query Shodan for information about a domain.
|
|
Uses Shodan's hostname search to find associated IPs.
|
|
|
|
Args:
|
|
domain: Domain to investigate
|
|
|
|
Returns:
|
|
List of relationships discovered from Shodan data
|
|
"""
|
|
if not _is_valid_domain(domain) or not self.is_available():
|
|
return []
|
|
|
|
relationships = []
|
|
|
|
try:
|
|
# Search for hostname in Shodan
|
|
search_query = f"hostname:{domain}"
|
|
url = f"{self.base_url}/shodan/host/search"
|
|
params = {
|
|
'key': self.api_key,
|
|
'query': search_query,
|
|
'minify': True # Get minimal data to reduce bandwidth
|
|
}
|
|
|
|
response = self.make_request(url, method="GET", params=params, target_indicator=domain)
|
|
|
|
if not response or response.status_code != 200:
|
|
return []
|
|
|
|
data = response.json()
|
|
|
|
if 'matches' not in data:
|
|
return []
|
|
|
|
# Process search results
|
|
for match in data['matches']:
|
|
ip_address = match.get('ip_str')
|
|
hostnames = match.get('hostnames', [])
|
|
|
|
if ip_address and domain in hostnames:
|
|
raw_data = {
|
|
'ip_address': ip_address,
|
|
'hostnames': hostnames,
|
|
'country': match.get('location', {}).get('country_name', ''),
|
|
'city': match.get('location', {}).get('city', ''),
|
|
'isp': match.get('isp', ''),
|
|
'org': match.get('org', ''),
|
|
'ports': match.get('ports', []),
|
|
'last_update': match.get('last_update', '')
|
|
}
|
|
|
|
relationships.append((
|
|
domain,
|
|
ip_address,
|
|
'a_record', # Domain resolves to IP
|
|
0.8,
|
|
raw_data
|
|
))
|
|
|
|
self.log_relationship_discovery(
|
|
source_node=domain,
|
|
target_node=ip_address,
|
|
relationship_type='a_record',
|
|
confidence_score=0.8,
|
|
raw_data=raw_data,
|
|
discovery_method="shodan_hostname_search"
|
|
)
|
|
|
|
# Also create relationships to other hostnames on the same IP
|
|
for hostname in hostnames:
|
|
if hostname != domain and _is_valid_domain(hostname):
|
|
hostname_raw_data = {
|
|
'shared_ip': ip_address,
|
|
'all_hostnames': hostnames,
|
|
'discovery_context': 'shared_hosting'
|
|
}
|
|
|
|
relationships.append((
|
|
domain,
|
|
hostname,
|
|
'passive_dns', # Shared hosting relationship
|
|
0.6, # Lower confidence for shared hosting
|
|
hostname_raw_data
|
|
))
|
|
|
|
self.log_relationship_discovery(
|
|
source_node=domain,
|
|
target_node=hostname,
|
|
relationship_type='passive_dns',
|
|
confidence_score=0.6,
|
|
raw_data=hostname_raw_data,
|
|
discovery_method="shodan_shared_hosting"
|
|
)
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.logger.error(f"Failed to parse JSON response from Shodan: {e}")
|
|
|
|
return relationships
|
|
|
|
def query_ip(self, ip: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
|
|
"""
|
|
Query Shodan for information about an IP address.
|
|
|
|
Args:
|
|
ip: IP address to investigate
|
|
|
|
Returns:
|
|
List of relationships discovered from Shodan IP data
|
|
"""
|
|
if not _is_valid_ip(ip) or not self.is_available():
|
|
return []
|
|
|
|
relationships = []
|
|
|
|
try:
|
|
# Query Shodan host information
|
|
url = f"{self.base_url}/shodan/host/{ip}"
|
|
params = {'key': self.api_key}
|
|
|
|
response = self.make_request(url, method="GET", params=params, target_indicator=ip)
|
|
|
|
if not response or response.status_code != 200:
|
|
return []
|
|
|
|
data = response.json()
|
|
|
|
# Extract hostname relationships
|
|
hostnames = data.get('hostnames', [])
|
|
for hostname in hostnames:
|
|
if _is_valid_domain(hostname):
|
|
raw_data = {
|
|
'ip_address': ip,
|
|
'hostname': hostname,
|
|
'country': data.get('country_name', ''),
|
|
'city': data.get('city', ''),
|
|
'isp': data.get('isp', ''),
|
|
'org': data.get('org', ''),
|
|
'asn': data.get('asn', ''),
|
|
'ports': data.get('ports', []),
|
|
'last_update': data.get('last_update', ''),
|
|
'os': data.get('os', '')
|
|
}
|
|
|
|
relationships.append((
|
|
ip,
|
|
hostname,
|
|
'a_record', # IP resolves to hostname
|
|
0.8,
|
|
raw_data
|
|
))
|
|
|
|
self.log_relationship_discovery(
|
|
source_node=ip,
|
|
target_node=hostname,
|
|
relationship_type='a_record',
|
|
confidence_score=0.8,
|
|
raw_data=raw_data,
|
|
discovery_method="shodan_host_lookup"
|
|
)
|
|
|
|
# Extract ASN relationship if available
|
|
asn = data.get('asn')
|
|
if asn:
|
|
# Ensure the ASN starts with "AS"
|
|
if isinstance(asn, str) and asn.startswith('AS'):
|
|
asn_name = asn
|
|
asn_number = asn[2:]
|
|
else:
|
|
asn_name = f"AS{asn}"
|
|
asn_number = str(asn)
|
|
|
|
asn_raw_data = {
|
|
'ip_address': ip,
|
|
'asn': asn_number,
|
|
'isp': data.get('isp', ''),
|
|
'org': data.get('org', '')
|
|
}
|
|
|
|
relationships.append((
|
|
ip,
|
|
asn_name,
|
|
'asn_membership',
|
|
0.7,
|
|
asn_raw_data
|
|
))
|
|
|
|
self.log_relationship_discovery(
|
|
source_node=ip,
|
|
target_node=asn_name,
|
|
relationship_type='asn_membership',
|
|
confidence_score=0.7,
|
|
raw_data=asn_raw_data,
|
|
discovery_method="shodan_asn_lookup"
|
|
)
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.logger.error(f"Failed to parse JSON response from Shodan: {e}")
|
|
|
|
return relationships
|
|
|
|
def search_by_organization(self, org_name: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Search Shodan for hosts belonging to a specific organization.
|
|
|
|
Args:
|
|
org_name: Organization name to search for
|
|
|
|
Returns:
|
|
List of host information dictionaries
|
|
"""
|
|
if not self.is_available():
|
|
return []
|
|
|
|
try:
|
|
search_query = f"org:\"{org_name}\""
|
|
url = f"{self.base_url}/shodan/host/search"
|
|
params = {
|
|
'key': self.api_key,
|
|
'query': search_query,
|
|
'minify': True
|
|
}
|
|
|
|
response = self.make_request(url, method="GET", params=params, target_indicator=org_name)
|
|
|
|
if response and response.status_code == 200:
|
|
data = response.json()
|
|
return data.get('matches', [])
|
|
|
|
except Exception as e:
|
|
self.logger.logger.error(f"Error searching Shodan by organization {org_name}: {e}")
|
|
|
|
return []
|
|
|
|
def get_host_services(self, ip: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get service information for a specific IP address.
|
|
|
|
Args:
|
|
ip: IP address to query
|
|
|
|
Returns:
|
|
List of service information dictionaries
|
|
"""
|
|
if not _is_valid_ip(ip) or not self.is_available():
|
|
return []
|
|
|
|
try:
|
|
url = f"{self.base_url}/shodan/host/{ip}"
|
|
params = {'key': self.api_key}
|
|
|
|
response = self.make_request(url, method="GET", params=params, target_indicator=ip)
|
|
|
|
if response and response.status_code == 200:
|
|
data = response.json()
|
|
return data.get('data', []) # Service banners
|
|
|
|
except Exception as e:
|
|
self.logger.logger.error(f"Error getting Shodan services for IP {ip}: {e}")
|
|
|
|
return [] |