dnsrecon/src/shodan_client.py
overcuriousity 0c9cf00a3b progress
2025-09-09 14:54:02 +02:00

166 lines
6.2 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# File: src/shodan_client.py
"""Shodan API integration."""
import requests
import time
import logging
from typing import Optional, Dict, Any, List
from .data_structures import ShodanResult
from .config import Config
# Module logger
logger = logging.getLogger(__name__)
class ShodanClient:
"""Shodan API client."""
BASE_URL = "https://api.shodan.io"
def __init__(self, api_key: str, config: Config):
self.api_key = api_key
self.config = config
self.last_request = 0
logger.info(f"🕵️ Shodan client initialized with API key ending in: ...{api_key[-4:] if len(api_key) > 4 else api_key}")
def _rate_limit(self):
"""Apply rate limiting for Shodan."""
now = time.time()
time_since_last = now - self.last_request
min_interval = 1.0 / self.config.SHODAN_RATE_LIMIT
if time_since_last < min_interval:
sleep_time = min_interval - time_since_last
logger.debug(f"⏸️ Shodan rate limiting: sleeping for {sleep_time:.2f}s")
time.sleep(sleep_time)
self.last_request = time.time()
def lookup_ip(self, ip: str) -> Optional[ShodanResult]:
"""Lookup IP address information."""
self._rate_limit()
logger.debug(f"🔍 Querying Shodan for IP: {ip}")
try:
url = f"{self.BASE_URL}/shodan/host/{ip}"
params = {'key': self.api_key}
response = requests.get(
url,
params=params,
timeout=self.config.HTTP_TIMEOUT,
headers={'User-Agent': 'DNS-Recon-Tool/1.0'}
)
logger.debug(f"📡 Shodan API response for {ip}: {response.status_code}")
if response.status_code == 200:
data = response.json()
ports = []
services = {}
for service in data.get('data', []):
port = service.get('port')
if port:
ports.append(port)
services[str(port)] = {
'product': service.get('product', ''),
'version': service.get('version', ''),
'banner': service.get('data', '').strip()[:200] if service.get('data') else ''
}
result = ShodanResult(
ip=ip,
ports=sorted(list(set(ports))),
services=services,
organization=data.get('org'),
country=data.get('country_name')
)
logger.info(f"✅ Shodan result for {ip}: {len(result.ports)} ports, org: {result.organization}")
return result
elif response.status_code == 404:
logger.debug(f" IP {ip} not found in Shodan database")
return None
elif response.status_code == 401:
logger.error("❌ Shodan API key is invalid or expired")
return None
elif response.status_code == 429:
logger.warning("⚠️ Shodan API rate limit exceeded")
return None
else:
logger.warning(f"⚠️ Shodan API error for {ip}: HTTP {response.status_code}")
try:
error_data = response.json()
logger.debug(f"Shodan error details: {error_data}")
except:
pass
return None
except requests.exceptions.Timeout:
logger.warning(f"⏱️ Shodan query timeout for {ip}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"🌐 Shodan network error for {ip}: {e}")
return None
except Exception as e:
logger.error(f"❌ Unexpected error querying Shodan for {ip}: {e}")
return None
def search_domain(self, domain: str) -> List[str]:
"""Search for IPs associated with a domain."""
self._rate_limit()
logger.debug(f"🔍 Searching Shodan for domain: {domain}")
try:
url = f"{self.BASE_URL}/shodan/host/search"
params = {
'key': self.api_key,
'query': f'hostname:{domain}',
'limit': 100
}
response = requests.get(
url,
params=params,
timeout=self.config.HTTP_TIMEOUT,
headers={'User-Agent': 'DNS-Recon-Tool/1.0'}
)
logger.debug(f"📡 Shodan search response for {domain}: {response.status_code}")
if response.status_code == 200:
data = response.json()
ips = []
for match in data.get('matches', []):
ip = match.get('ip_str')
if ip:
ips.append(ip)
unique_ips = list(set(ips))
logger.info(f"🔍 Shodan search for {domain} found {len(unique_ips)} unique IPs")
return unique_ips
elif response.status_code == 401:
logger.error("❌ Shodan API key is invalid for search")
return []
elif response.status_code == 429:
logger.warning("⚠️ Shodan search rate limit exceeded")
return []
else:
logger.warning(f"⚠️ Shodan search error for {domain}: HTTP {response.status_code}")
return []
except requests.exceptions.Timeout:
logger.warning(f"⏱️ Shodan search timeout for {domain}")
return []
except requests.exceptions.RequestException as e:
logger.error(f"🌐 Shodan search network error for {domain}: {e}")
return []
except Exception as e:
logger.error(f"❌ Unexpected error searching Shodan for {domain}: {e}")
return []