diff --git a/providers/crtsh_provider.py b/providers/crtsh_provider.py index d34ba3a..bfa2c51 100644 --- a/providers/crtsh_provider.py +++ b/providers/crtsh_provider.py @@ -121,62 +121,6 @@ class CrtShProvider(BaseProvider): return certificates - def _create_cache_file(self, cache_file_path: Path, domain: str, certificates: List[Dict[str, Any]]) -> None: - """Create new cache file with certificates.""" - try: - cache_data = { - "domain": domain, - "first_cached": datetime.now(timezone.utc).isoformat(), - "last_upstream_query": datetime.now(timezone.utc).isoformat(), - "upstream_query_count": 1, - "certificates": certificates - } - - cache_file_path.parent.mkdir(parents=True, exist_ok=True) - with open(cache_file_path, 'w') as f: - json.dump(cache_data, f, separators=(',', ':')) - - self.logger.logger.info(f"Created cache file for {domain} with {len(certificates)} certificates") - - except Exception as e: - self.logger.logger.warning(f"Failed to create cache file for {domain}: {e}") - - def _append_to_cache(self, cache_file_path: Path, new_certificates: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """Append new certificates to existing cache and return all certificates.""" - try: - # Load existing cache - with open(cache_file_path, 'r') as f: - cache_data = json.load(f) - - # Track existing certificate IDs to avoid duplicates - existing_ids = {cert.get('id') for cert in cache_data.get('certificates', [])} - - # Add only new certificates - added_count = 0 - for cert in new_certificates: - cert_id = cert.get('id') - if cert_id and cert_id not in existing_ids: - cache_data['certificates'].append(cert) - existing_ids.add(cert_id) - added_count += 1 - - # Update metadata - cache_data['last_upstream_query'] = datetime.now(timezone.utc).isoformat() - cache_data['upstream_query_count'] = cache_data.get('upstream_query_count', 0) + 1 - - # Write updated cache - with open(cache_file_path, 'w') as f: - json.dump(cache_data, f, separators=(',', ':')) - - total_certs = len(cache_data['certificates']) - self.logger.logger.info(f"Appended {added_count} new certificates to cache. Total: {total_certs}") - - return cache_data['certificates'] - - except Exception as e: - self.logger.logger.warning(f"Failed to append to cache: {e}") - return new_certificates # Fallback to new certificates only - def _parse_issuer_organization(self, issuer_dn: str) -> str: """ Parse the issuer Distinguished Name to extract just the organization name. @@ -332,71 +276,87 @@ class CrtShProvider(BaseProvider): if not _is_valid_domain(domain): return [] - # Check for cancellation before starting if self._stop_event and self._stop_event.is_set(): - print(f"CrtSh query cancelled before start for domain: {domain}") return [] - - # === CACHING LOGIC === + cache_file = self._get_cache_file_path(domain) cache_status = self._get_cache_status(cache_file) - certificates = [] - + processed_certificates = [] + try: if cache_status == "fresh": - # Use cached data - certificates = self._load_cached_certificates(cache_file) - self.logger.logger.info(f"Using cached data for {domain} ({len(certificates)} certificates)") - - elif cache_status == "not_found": - # Fresh query, create new cache - certificates = self._query_crtsh_api(domain) - if certificates: # Only cache if we got results - self._create_cache_file(cache_file, domain, certificates) - self.logger.logger.info(f"Cached fresh data for {domain} ({len(certificates)} certificates)") - else: - self.logger.logger.info(f"No certificates found for {domain}, not caching") - - elif cache_status == "stale": - # Append query, update existing cache - try: - new_certificates = self._query_crtsh_api(domain) - if new_certificates: - certificates = self._append_to_cache(cache_file, new_certificates) - self.logger.logger.info(f"Refreshed and appended cache for {domain}") - else: - # Use existing cache if API returns no results - certificates = self._load_cached_certificates(cache_file) - self.logger.logger.info(f"API returned no new results, using existing cache for {domain}") - except requests.exceptions.RequestException: - # If API call fails for stale cache, use cached data and re-raise for retry logic - certificates = self._load_cached_certificates(cache_file) - if certificates: - self.logger.logger.warning(f"API call failed for {domain}, using stale cache data ({len(certificates)} certificates)") - # Don't re-raise here, just use cached data - else: - # No cached data and API failed - re-raise for retry - raise + processed_certificates = self._load_cached_certificates(cache_file) + self.logger.logger.info(f"Using cached processed data for {domain} ({len(processed_certificates)} certificates)") + else: # "stale" or "not_found" + raw_certificates = self._query_crtsh_api(domain) + + if self._stop_event and self._stop_event.is_set(): + return [] + + # Process raw data into the application's expected format + current_processed_certs = [self._extract_certificate_metadata(cert) for cert in raw_certificates] + + if cache_status == "stale": + # Append new processed certs to existing ones + processed_certificates = self._append_to_cache(cache_file, current_processed_certs) + self.logger.logger.info(f"Refreshed and appended cache for {domain}") + else: # "not_found" + # Create a new cache file with the processed certs, even if empty + self._create_cache_file(cache_file, domain, current_processed_certs) + processed_certificates = current_processed_certs + self.logger.logger.info(f"Cached fresh data for {domain} ({len(processed_certificates)} certificates)") + + except requests.exceptions.RequestException as e: - # Network/API errors should be re-raised so core logic can retry self.logger.logger.error(f"API query failed for {domain}: {e}") - raise e - except json.JSONDecodeError as e: - # JSON parsing errors should also be raised for retry - self.logger.logger.error(f"Failed to parse JSON response from crt.sh for {domain}: {e}") - raise e - - # Check for cancellation after cache operations - if self._stop_event and self._stop_event.is_set(): - print(f"CrtSh query cancelled after cache operations for domain: {domain}") + if cache_status != "not_found": + processed_certificates = self._load_cached_certificates(cache_file) + self.logger.logger.warning(f"Using stale cache for {domain} due to API failure.") + else: + raise e # Re-raise if there's no cache to fall back on + + if not processed_certificates: return [] - if not certificates: - return [] - - return self._process_certificates_to_relationships(domain, certificates) + return self._process_certificates_to_relationships(domain, processed_certificates) + + def _create_cache_file(self, cache_file_path: Path, domain: str, processed_certificates: List[Dict[str, Any]]) -> None: + """Create new cache file with processed certificates.""" + try: + cache_data = { + "domain": domain, + "last_upstream_query": datetime.now(timezone.utc).isoformat(), + "certificates": processed_certificates # Store processed data + } + cache_file_path.parent.mkdir(parents=True, exist_ok=True) + with open(cache_file_path, 'w') as f: + json.dump(cache_data, f, separators=(',', ':')) + except Exception as e: + self.logger.logger.warning(f"Failed to create cache file for {domain}: {e}") + + def _append_to_cache(self, cache_file_path: Path, new_processed_certificates: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Append new processed certificates to existing cache and return all certificates.""" + try: + with open(cache_file_path, 'r') as f: + cache_data = json.load(f) + + existing_ids = {cert.get('certificate_id') for cert in cache_data.get('certificates', [])} + + for cert in new_processed_certificates: + if cert.get('certificate_id') not in existing_ids: + cache_data['certificates'].append(cert) + + cache_data['last_upstream_query'] = datetime.now(timezone.utc).isoformat() + + with open(cache_file_path, 'w') as f: + json.dump(cache_data, f, separators=(',', ':')) + + return cache_data['certificates'] + except Exception as e: + self.logger.logger.warning(f"Failed to append to cache: {e}") + return new_processed_certificates def _process_certificates_to_relationships(self, domain: str, certificates: List[Dict[str, Any]]) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ diff --git a/providers/shodan_provider.py b/providers/shodan_provider.py index 2c84beb..30c48f5 100644 --- a/providers/shodan_provider.py +++ b/providers/shodan_provider.py @@ -85,23 +85,6 @@ class ShodanProvider(BaseProvider): except (json.JSONDecodeError, ValueError, KeyError): return "stale" - def _load_from_cache(self, cache_file_path: Path) -> Dict[str, Any]: - """Load Shodan data from a cache file.""" - try: - with open(cache_file_path, 'r') as f: - return json.load(f) - except (json.JSONDecodeError, FileNotFoundError): - return {} - - def _save_to_cache(self, cache_file_path: Path, data: Dict[str, Any]) -> None: - """Save Shodan data to a cache file.""" - try: - data['last_upstream_query'] = datetime.now(timezone.utc).isoformat() - with open(cache_file_path, 'w') as f: - json.dump(data, f, separators=(',', ':')) - except Exception as e: - self.logger.logger.warning(f"Failed to save Shodan cache for {cache_file_path.name}: {e}") - def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ Domain queries are no longer supported for the Shodan provider. @@ -110,7 +93,7 @@ class ShodanProvider(BaseProvider): def query_ip(self, ip: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ - Query Shodan for information about an IP address, with caching. + Query Shodan for information about an IP address, with caching of processed relationships. """ if not _is_valid_ip(ip) or not self.is_available(): return [] @@ -118,12 +101,12 @@ class ShodanProvider(BaseProvider): cache_file = self._get_cache_file_path(ip) cache_status = self._get_cache_status(cache_file) - data = {} + relationships = [] try: if cache_status == "fresh": - data = self._load_from_cache(cache_file) - self.logger.logger.info(f"Using cached Shodan data for {ip}") + relationships = self._load_from_cache(cache_file) + self.logger.logger.info(f"Using cached Shodan relationships for {ip}") else: # "stale" or "not_found" url = f"{self.base_url}/shodan/host/{ip}" params = {'key': self.api_key} @@ -131,20 +114,41 @@ class ShodanProvider(BaseProvider): if response and response.status_code == 200: data = response.json() - self._save_to_cache(cache_file, data) + # Process the data into relationships BEFORE caching + relationships = self._process_shodan_data(ip, data) + self._save_to_cache(cache_file, relationships) # Save the processed relationships elif cache_status == "stale": # If API fails on a stale cache, use the old data - data = self._load_from_cache(cache_file) + relationships = self._load_from_cache(cache_file) except requests.exceptions.RequestException as e: self.logger.logger.error(f"Shodan API query failed for {ip}: {e}") if cache_status == "stale": - data = self._load_from_cache(cache_file) + relationships = self._load_from_cache(cache_file) - if not data: + return relationships + + def _load_from_cache(self, cache_file_path: Path) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: + """Load processed Shodan relationships from a cache file.""" + try: + with open(cache_file_path, 'r') as f: + cache_content = json.load(f) + # The entire file content is the list of relationships + return cache_content.get("relationships", []) + except (json.JSONDecodeError, FileNotFoundError, KeyError): return [] - return self._process_shodan_data(ip, data) + def _save_to_cache(self, cache_file_path: Path, relationships: List[Tuple[str, str, str, float, Dict[str, Any]]]) -> None: + """Save processed Shodan relationships to a cache file.""" + try: + cache_data = { + "last_upstream_query": datetime.now(timezone.utc).isoformat(), + "relationships": relationships + } + with open(cache_file_path, 'w') as f: + json.dump(cache_data, f, separators=(',', ':')) + except Exception as e: + self.logger.logger.warning(f"Failed to save Shodan cache for {cache_file_path.name}: {e}") def _process_shodan_data(self, ip: str, data: Dict[str, Any]) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """