modularize, shodan qs

This commit is contained in:
overcuriousity
2025-09-13 17:14:16 +02:00
parent 2925512a4d
commit 930fdca500
10 changed files with 275 additions and 147 deletions

View File

@@ -16,4 +16,4 @@ __all__ = [
'ShodanProvider'
]
__version__ = "1.0.0-phase2"
__version__ = "0.0.0-rc"

View File

@@ -126,6 +126,21 @@ class BaseProvider(ABC):
"""Return the provider name."""
pass
@abstractmethod
def get_display_name(self) -> str:
"""Return the provider display name for the UI."""
pass
@abstractmethod
def requires_api_key(self) -> bool:
"""Return True if the provider requires an API key."""
pass
@abstractmethod
def get_eligibility(self) -> Dict[str, bool]:
"""Return a dictionary indicating if the provider can query domains and/or IPs."""
pass
@abstractmethod
def is_available(self) -> bool:
"""Check if the provider is available and properly configured."""

View File

@@ -36,6 +36,18 @@ class CrtShProvider(BaseProvider):
"""Return the provider name."""
return "crtsh"
def get_display_name(self) -> str:
"""Return the provider display name for the UI."""
return "crt.sh"
def requires_api_key(self) -> bool:
"""Return True if the provider requires an API key."""
return False
def get_eligibility(self) -> Dict[str, bool]:
"""Return a dictionary indicating if the provider can query domains and/or IPs."""
return {'domains': True, 'ips': False}
def is_available(self) -> bool:
"""
Check if the provider is configured to be used.

View File

@@ -33,6 +33,18 @@ class DNSProvider(BaseProvider):
"""Return the provider name."""
return "dns"
def get_display_name(self) -> str:
"""Return the provider display name for the UI."""
return "DNS"
def requires_api_key(self) -> bool:
"""Return True if the provider requires an API key."""
return False
def get_eligibility(self) -> Dict[str, bool]:
"""Return a dictionary indicating if the provider can query domains and/or IPs."""
return {'domains': True, 'ips': True}
def is_available(self) -> bool:
"""DNS is always available - no API key required."""
return True

View File

@@ -15,7 +15,7 @@ class ShodanProvider(BaseProvider):
Provider for querying Shodan API for IP address and hostname information.
Now uses session-specific API keys.
"""
def __init__(self, session_config=None):
"""Initialize Shodan provider with session-specific configuration."""
super().__init__(
@@ -26,32 +26,43 @@ class ShodanProvider(BaseProvider):
)
self.base_url = "https://api.shodan.io"
self.api_key = self.config.get_api_key('shodan')
def is_available(self) -> bool:
"""Check if Shodan provider is available (has valid API key in this session)."""
return self.api_key is not None and len(self.api_key.strip()) > 0
def get_name(self) -> str:
"""Return the provider name."""
return "shodan"
def get_display_name(self) -> str:
"""Return the provider display name for the UI."""
return "shodan"
def requires_api_key(self) -> bool:
"""Return True if the provider requires an API key."""
return True
def get_eligibility(self) -> Dict[str, bool]:
"""Return a dictionary indicating if the provider can query domains and/or IPs."""
return {'domains': True, 'ips': True}
def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
"""
Query Shodan for information about a domain.
Uses Shodan's hostname search to find associated IPs.
Args:
domain: Domain to investigate
Returns:
List of relationships discovered from Shodan data
"""
if not _is_valid_domain(domain) or not self.is_available():
return []
relationships = []
try:
# Search for hostname in Shodan
search_query = f"hostname:{domain}"
@@ -61,22 +72,22 @@ class ShodanProvider(BaseProvider):
'query': search_query,
'minify': True # Get minimal data to reduce bandwidth
}
response = self.make_request(url, method="GET", params=params, target_indicator=domain)
if not response or response.status_code != 200:
return []
data = response.json()
if 'matches' not in data:
return []
# Process search results
for match in data['matches']:
ip_address = match.get('ip_str')
hostnames = match.get('hostnames', [])
if ip_address and domain in hostnames:
raw_data = {
'ip_address': ip_address,
@@ -88,7 +99,7 @@ class ShodanProvider(BaseProvider):
'ports': match.get('ports', []),
'last_update': match.get('last_update', '')
}
relationships.append((
domain,
ip_address,
@@ -96,7 +107,7 @@ class ShodanProvider(BaseProvider):
RelationshipType.A_RECORD.default_confidence,
raw_data
))
self.log_relationship_discovery(
source_node=domain,
target_node=ip_address,
@@ -105,7 +116,7 @@ class ShodanProvider(BaseProvider):
raw_data=raw_data,
discovery_method="shodan_hostname_search"
)
# Also create relationships to other hostnames on the same IP
for hostname in hostnames:
if hostname != domain and _is_valid_domain(hostname):
@@ -114,7 +125,7 @@ class ShodanProvider(BaseProvider):
'all_hostnames': hostnames,
'discovery_context': 'shared_hosting'
}
relationships.append((
domain,
hostname,
@@ -122,7 +133,7 @@ class ShodanProvider(BaseProvider):
0.6, # Lower confidence for shared hosting
hostname_raw_data
))
self.log_relationship_discovery(
source_node=domain,
target_node=hostname,
@@ -131,39 +142,39 @@ class ShodanProvider(BaseProvider):
raw_data=hostname_raw_data,
discovery_method="shodan_shared_hosting"
)
except json.JSONDecodeError as e:
self.logger.logger.error(f"Failed to parse JSON response from Shodan: {e}")
return relationships
def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
"""
Query Shodan for information about an IP address.
Args:
ip: IP address to investigate
Returns:
List of relationships discovered from Shodan IP data
"""
if not _is_valid_ip(ip) or not self.is_available():
return []
relationships = []
try:
# Query Shodan host information
url = f"{self.base_url}/shodan/host/{ip}"
params = {'key': self.api_key}
response = self.make_request(url, method="GET", params=params, target_indicator=ip)
if not response or response.status_code != 200:
return []
data = response.json()
# Extract hostname relationships
hostnames = data.get('hostnames', [])
for hostname in hostnames:
@@ -180,7 +191,7 @@ class ShodanProvider(BaseProvider):
'last_update': data.get('last_update', ''),
'os': data.get('os', '')
}
relationships.append((
ip,
hostname,
@@ -188,7 +199,7 @@ class ShodanProvider(BaseProvider):
RelationshipType.A_RECORD.default_confidence,
raw_data
))
self.log_relationship_discovery(
source_node=ip,
target_node=hostname,
@@ -197,19 +208,25 @@ class ShodanProvider(BaseProvider):
raw_data=raw_data,
discovery_method="shodan_host_lookup"
)
# Extract ASN relationship if available
asn = data.get('asn')
if asn:
asn_name = f"AS{asn}"
# Ensure the ASN starts with "AS"
if isinstance(asn, str) and asn.startswith('AS'):
asn_name = asn
asn_number = asn[2:]
else:
asn_name = f"AS{asn}"
asn_number = str(asn)
asn_raw_data = {
'ip_address': ip,
'asn': asn,
'asn': asn_number,
'isp': data.get('isp', ''),
'org': data.get('org', '')
}
relationships.append((
ip,
asn_name,
@@ -217,7 +234,7 @@ class ShodanProvider(BaseProvider):
RelationshipType.ASN_MEMBERSHIP.default_confidence,
asn_raw_data
))
self.log_relationship_discovery(
source_node=ip,
target_node=asn_name,
@@ -226,25 +243,25 @@ class ShodanProvider(BaseProvider):
raw_data=asn_raw_data,
discovery_method="shodan_asn_lookup"
)
except json.JSONDecodeError as e:
self.logger.logger.error(f"Failed to parse JSON response from Shodan: {e}")
return relationships
def search_by_organization(self, org_name: str) -> List[Dict[str, Any]]:
"""
Search Shodan for hosts belonging to a specific organization.
Args:
org_name: Organization name to search for
Returns:
List of host information dictionaries
"""
if not self.is_available():
return []
try:
search_query = f"org:\"{org_name}\""
url = f"{self.base_url}/shodan/host/search"
@@ -253,42 +270,42 @@ class ShodanProvider(BaseProvider):
'query': search_query,
'minify': True
}
response = self.make_request(url, method="GET", params=params, target_indicator=org_name)
if response and response.status_code == 200:
data = response.json()
return data.get('matches', [])
except Exception as e:
self.logger.logger.error(f"Error searching Shodan by organization {org_name}: {e}")
return []
def get_host_services(self, ip: str) -> List[Dict[str, Any]]:
"""
Get service information for a specific IP address.
Args:
ip: IP address to query
Returns:
List of service information dictionaries
"""
if not _is_valid_ip(ip) or not self.is_available():
return []
try:
url = f"{self.base_url}/shodan/host/{ip}"
params = {'key': self.api_key}
response = self.make_request(url, method="GET", params=params, target_indicator=ip)
if response and response.status_code == 200:
data = response.json()
return data.get('data', []) # Service banners
except Exception as e:
self.logger.logger.error(f"Error getting Shodan services for IP {ip}: {e}")
return []