fetch_tx_details/fetch_tx_details.py
overcuriousity b96b876e98 initial
2025-09-08 10:37:35 +02:00

781 lines
32 KiB
Python

import requests
import argparse
import sys
import json
import math
from datetime import datetime
from collections import Counter, defaultdict
from typing import Dict, List, Optional, Tuple, Any
import time
# --- Configuration ---
BASE_URL_V1 = "https://mempool.mikoshi.de/api/v1"
BASE_URL_DEFAULT = "https://mempool.mikoshi.de/api"
SESSION = requests.Session()
SESSION.headers.update({
'User-Agent': 'Bitcoin-Forensic-Analyzer/2.0'
})
# Rate limiting (even though not needed, good practice)
REQUEST_DELAY = 0.1
class TransactionAnalyzer:
def __init__(self):
self.mempool_stats = None
self.fee_recommendations = None
self._load_network_context()
def _load_network_context(self):
"""Load current network context for analysis."""
try:
# Get current mempool state
mempool_resp = SESSION.get(f"{BASE_URL_DEFAULT}/mempool", timeout=10)
if mempool_resp.status_code == 200:
self.mempool_stats = mempool_resp.json()
# Get current fee recommendations
fee_resp = SESSION.get(f"{BASE_URL_V1}/fees/recommended", timeout=10)
if fee_resp.status_code == 200:
self.fee_recommendations = fee_resp.json()
except requests.RequestException:
print(" - Warning: Could not load network context")
def analyze_round_numbers(self, value_sats: int) -> Tuple[float, str]:
"""Analyze if value appears to be a round number (indicates payment)."""
value_btc = value_sats / 100_000_000
# Very round BTC amounts (strong payment indicators)
round_btc_amounts = [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0]
for amount in round_btc_amounts:
if abs(value_btc - amount) < 0.000001:
return -0.4, f"Exact round amount: {amount} BTC"
# Round satoshi amounts
if value_sats >= 100000 and value_sats % 100000 == 0:
return -0.3, f"Round 100k sats: {value_sats}"
if value_sats >= 10000 and value_sats % 10000 == 0:
return -0.2, f"Round 10k sats: {value_sats}"
if value_sats >= 1000 and value_sats % 1000 == 0:
return -0.1, f"Round 1k sats: {value_sats}"
# Very precise amounts suggest change
value_str = str(value_sats)
if len(value_str) >= 8 and not value_str.endswith('0000'):
return 0.2, f"Precise amount: {value_sats} sats"
return 0.0, "No clear round number pattern"
def analyze_output_values(self, outputs: List[Dict]) -> List[Tuple[float, str]]:
"""Analyze relative value patterns across outputs."""
if len(outputs) < 2:
return [(0.0, "Single output - no relative analysis")] * len(outputs)
values = [out['value'] for out in outputs]
total_value = sum(values)
scores = []
for i, value in enumerate(values):
score = 0.0
reasons = []
percentage = (value / total_value) * 100
# Very small outputs often change (especially in consolidations)
if percentage < 5:
score += 0.15
reasons.append(f"Small output: {percentage:.1f}% of total")
# Very large outputs rarely change
elif percentage > 80:
score -= 0.25
reasons.append(f"Dominant output: {percentage:.1f}% of total")
# Check if smallest output (common change pattern)
if value == min(values) and len(values) > 2:
score += 0.1
reasons.append("Smallest output")
# Check if second smallest (also common for change)
sorted_values = sorted(values)
if len(values) > 2 and value == sorted_values[1]:
score += 0.05
reasons.append("Second smallest output")
reason_text = "; ".join(reasons) if reasons else "No relative value indicators"
scores.append((score, reason_text))
return scores
def analyze_position_patterns(self, outputs: List[Dict], input_count: int) -> List[Tuple[float, str]]:
"""Analyze positional and structural patterns."""
scores = []
output_count = len(outputs)
for i, output in enumerate(outputs):
score = 0.0
reasons = []
# Last position often change (but not always)
if i == output_count - 1 and output_count > 1:
score += 0.1
reasons.append("Last position")
# Two-output transactions - second often change
if output_count == 2 and i == 1:
score += 0.15
reasons.append("Second output in 2-output tx")
# Consolidation pattern (many inputs, few outputs)
if input_count >= 5 and output_count <= 2:
score += 0.2
reasons.append(f"Consolidation pattern: {input_count} inputs → {output_count} outputs")
# Fan-out pattern (few inputs, many outputs) - change usually first or last
if input_count <= 2 and output_count >= 5:
if i == 0 or i == output_count - 1:
score += 0.1
reasons.append("Fan-out pattern: position suggests change")
reason_text = "; ".join(reasons) if reasons else "No positional indicators"
scores.append((score, reason_text))
return scores
def analyze_address_advanced(self, address: str) -> Tuple[float, str]:
"""Enhanced address history analysis."""
if not address or address == "N/A":
return 0.0, "No address to analyze"
try:
time.sleep(REQUEST_DELAY)
# Get comprehensive address info
addr_resp = SESSION.get(f"{BASE_URL_DEFAULT}/address/{address}", timeout=15)
if addr_resp.status_code != 200:
return 0.0, "Could not fetch address info"
addr_info = addr_resp.json()
chain_stats = addr_info.get('chain_stats', {})
mempool_stats = addr_info.get('mempool_stats', {})
tx_count = chain_stats.get('tx_count', 0)
funded_count = chain_stats.get('funded_txo_count', 0)
spent_count = chain_stats.get('spent_txo_count', 0)
# Brand new addresses are very likely change
if tx_count == 0:
return 0.4, "Brand new address (0 transactions)"
if tx_count == 1:
return 0.35, "Address used only once before"
# Get transaction history for pattern analysis
time.sleep(REQUEST_DELAY)
txs_resp = SESSION.get(f"{BASE_URL_DEFAULT}/address/{address}/txs", timeout=15)
if txs_resp.status_code == 200:
txs = txs_resp.json()
# Analyze usage patterns
if len(txs) <= 3:
score = 0.25
reason = f"Low usage: {len(txs)} transactions"
elif len(txs) <= 5:
score = 0.1
reason = f"Moderate usage: {len(txs)} transactions"
else:
# Check if it's an exchange/service address pattern
if len(txs) > 100:
score = -0.3
reason = f"High activity address: {len(txs)} transactions (likely service)"
else:
score = -0.1
reason = f"Regular usage: {len(txs)} transactions"
# Check for rapid reuse (suggests not change)
if len(txs) >= 2:
recent_txs = [tx for tx in txs if tx.get('status', {}).get('confirmed', False)]
if len(recent_txs) >= 2:
time_diff = recent_txs[0].get('status', {}).get('block_time', 0) - recent_txs[1].get('status', {}).get('block_time', 0)
if abs(time_diff) < 3600: # Less than 1 hour between uses
score -= 0.15
reason += "; rapid reuse detected"
return score, reason
# Fallback to basic analysis
if tx_count <= 2:
return 0.25, f"Low transaction count: {tx_count}"
elif tx_count <= 10:
return 0.0, f"Moderate transaction count: {tx_count}"
else:
return -0.2, f"High transaction count: {tx_count}"
except requests.RequestException as e:
return 0.0, f"Network error analyzing address: {str(e)}"
def analyze_script_complexity(self, output: Dict) -> Tuple[float, str]:
"""Analyze script type and complexity."""
script_type = output.get('scriptpubkey_type', 'unknown')
script_hex = output.get('scriptpubkey', '')
# Standard single-sig types are common for change
if script_type in ['p2pkh', 'p2wpkh']:
return 0.1, f"Standard single-sig: {script_type}"
# Wrapped segwit also common for change
if script_type == 'p2sh':
return 0.05, "P2SH (possibly wrapped segwit)"
# Native segwit
if script_type in ['p2wsh', 'p2tr']:
return 0.0, f"Advanced script type: {script_type}"
# Multi-sig and complex scripts less likely to be change
if script_type == 'v0_p2wsh' or 'multisig' in script_type.lower():
return -0.2, f"Complex script: {script_type}"
# OP_RETURN and non-standard outputs definitely not change
if script_type in ['op_return', 'nulldata']:
return -1.0, "OP_RETURN or nulldata output"
return 0.0, f"Unknown script type: {script_type}"
def analyze_fee_context(self, base_data: Dict) -> Tuple[float, str]:
"""Analyze transaction in context of current fee environment."""
if not self.fee_recommendations:
return 0.0, "No fee context available"
fee = base_data.get('fee', 0)
weight = base_data.get('weight', 1)
fee_rate = fee / (weight / 4) if weight > 0 else 0
# Compare to recommended fees
fast_fee = self.fee_recommendations.get('fastestFee', 1)
hour_fee = self.fee_recommendations.get('hourFee', 1)
economy_fee = self.fee_recommendations.get('economyFee', 1)
if fee_rate > fast_fee * 2:
return 0.1, f"High fee rate: {fee_rate:.1f} sat/vB (suggests urgent payment)"
elif fee_rate < economy_fee * 0.5:
return 0.05, f"Very low fee rate: {fee_rate:.1f} sat/vB (suggests batching/consolidation)"
return 0.0, f"Normal fee rate: {fee_rate:.1f} sat/vB"
def calculate_change_probability(self, features: Dict[str, Tuple[float, str]]) -> Tuple[float, Dict[str, Any]]:
"""Calculate change probability using weighted features."""
# Feature weights based on empirical effectiveness
weights = {
'round_number': 1.0, # Strong indicator
'address_history': 0.8, # Very reliable
'relative_value': 0.6, # Good indicator
'position': 0.4, # Moderate indicator
'script_complexity': 0.3, # Weak but useful
'fee_context': 0.2, # Minor indicator
'address_type_reuse': 0.7 # Strong indicator
}
weighted_score = 0.0
total_weight = 0.0
details = {}
for feature_name, (score, reason) in features.items():
if feature_name in weights:
weight = weights[feature_name]
weighted_score += score * weight
total_weight += weight
details[feature_name] = {
'score': score,
'weight': weight,
'contribution': score * weight,
'reason': reason
}
# Normalize score
if total_weight > 0:
normalized_score = weighted_score / total_weight
else:
normalized_score = 0.0
# Convert to probability using sigmoid function
probability = 1 / (1 + math.exp(-normalized_score * 3)) # Scale factor of 3
return probability, details
def analyze_address_type_reuse(self, base_data: Dict, output_index: int) -> Tuple[float, str]:
"""Check if output address type matches input types."""
if not base_data or 'vin' not in base_data or 'vout' not in base_data:
return 0.0, "Insufficient data for address type analysis"
inputs = base_data.get('vin', [])
outputs = base_data.get('vout', [])
if output_index >= len(outputs):
return 0.0, "Invalid output index"
output = outputs[output_index]
output_type = output.get('scriptpubkey_type')
if not output_type:
return 0.0, "No output script type"
# Collect input types
input_types = []
for inp in inputs:
prevout = inp.get('prevout', {})
inp_type = prevout.get('scriptpubkey_type')
if inp_type:
input_types.append(inp_type)
if not input_types:
return 0.0, "No input script types available"
# Find dominant input type
type_counts = Counter(input_types)
dominant_type, dominant_count = type_counts.most_common(1)[0]
if output_type == dominant_type:
reuse_percentage = (dominant_count / len(input_types)) * 100
return 0.3, f"Address type '{output_type}' matches {reuse_percentage:.0f}% of inputs"
else:
return -0.1, f"Address type '{output_type}' differs from dominant input type '{dominant_type}'"
def fetch_comprehensive_details(txid: str, analyzer: TransactionAnalyzer) -> Optional[Dict]:
"""Fetch comprehensive transaction details from multiple endpoints."""
print(f"Fetching comprehensive data for {txid}...")
results = {}
# Primary endpoints
endpoints = {
'base': f"{BASE_URL_DEFAULT}/tx/{txid}",
'outspends': f"{BASE_URL_DEFAULT}/tx/{txid}/outspends",
'rbf': f"{BASE_URL_V1}/tx/{txid}/rbf",
'hex': f"{BASE_URL_DEFAULT}/tx/{txid}/hex",
'status': f"{BASE_URL_DEFAULT}/tx/{txid}/status"
}
for key, url in endpoints.items():
try:
time.sleep(REQUEST_DELAY)
response = SESSION.get(url, timeout=15)
if response.status_code == 200:
results[key] = response.text if key == 'hex' else response.json()
print(f" ✓ Fetched {key}")
else:
results[key] = None
print(f" - {key} not available (Status: {response.status_code})")
except requests.RequestException as e:
print(f" - Network error fetching {key}: {e}")
results[key] = None
if key == 'base':
return None
return results
def perform_comprehensive_change_analysis(base_data: Dict, analyzer: TransactionAnalyzer) -> Optional[List[Dict]]:
"""Perform comprehensive change address analysis using multiple heuristics."""
if not base_data or 'vout' not in base_data:
return None
outputs = base_data.get('vout', [])
inputs = base_data.get('vin', [])
print(f" - Analyzing {len(outputs)} outputs using advanced heuristics...")
analysis_results = []
# Run value analysis once for all outputs
value_scores = analyzer.analyze_output_values(outputs)
position_scores = analyzer.analyze_position_patterns(outputs, len(inputs))
for i, output in enumerate(outputs):
print(f" → Analyzing output {i}")
address = output.get('scriptpubkey_address', 'N/A')
value = output.get('value', 0)
# Skip non-standard outputs
if not address or address == 'N/A':
analysis_results.append({
'index': i,
'address': 'N/A',
'value': value,
'probability': 0.0,
'confidence': 'HIGH',
'reasoning': 'Non-standard output (e.g., OP_RETURN)',
'details': {}
})
continue
# Collect all features
features = {}
# Round number analysis
round_score, round_reason = analyzer.analyze_round_numbers(value)
features['round_number'] = (round_score, round_reason)
# Address history analysis
addr_score, addr_reason = analyzer.analyze_address_advanced(address)
features['address_history'] = (addr_score, addr_reason)
# Relative value analysis
rel_score, rel_reason = value_scores[i]
features['relative_value'] = (rel_score, rel_reason)
# Position analysis
pos_score, pos_reason = position_scores[i]
features['position'] = (pos_score, pos_reason)
# Script complexity analysis
script_score, script_reason = analyzer.analyze_script_complexity(output)
features['script_complexity'] = (script_score, script_reason)
# Fee context analysis
fee_score, fee_reason = analyzer.analyze_fee_context(base_data)
features['fee_context'] = (fee_score, fee_reason)
# Address type reuse analysis
type_score, type_reason = analyzer.analyze_address_type_reuse(base_data, i)
features['address_type_reuse'] = (type_score, type_reason)
# Calculate final probability
probability, feature_details = analyzer.calculate_change_probability(features)
# Determine confidence level
confidence = "MEDIUM"
if probability > 0.8 or probability < 0.2:
confidence = "HIGH"
elif probability > 0.6 or probability < 0.4:
confidence = "MEDIUM"
else:
confidence = "LOW"
analysis_results.append({
'index': i,
'address': address,
'value': value,
'probability': probability,
'confidence': confidence,
'reasoning': f"Change probability: {probability:.1%}",
'details': feature_details
})
return analysis_results
def format_comprehensive_report(all_data: Dict, change_analysis: Optional[List[Dict]], analyzer: TransactionAnalyzer) -> str:
"""Format comprehensive forensic report."""
base_data = all_data.get('base')
if not base_data:
return "Could not retrieve base transaction data.\n\n"
txid = base_data.get("txid", "N/A")
fee = base_data.get("fee", 0)
weight = base_data.get("weight", 1)
size = base_data.get("size", 0)
output = ["=" * 100, f"COMPREHENSIVE FORENSIC ANALYSIS: {txid}", "=" * 100]
# Transaction Overview
status = base_data.get("status", {})
if status.get("confirmed", False):
block_time = datetime.utcfromtimestamp(status.get("block_time", 0)).strftime('%Y-%m-%d %H:%M:%S UTC')
status_str = f"Confirmed in block {status.get('block_height', 'N/A')} at {block_time}"
else:
status_str = "Unconfirmed (in mempool)"
fee_rate = fee / (weight / 4) if weight > 0 else 0
output.extend([
"\n" + "" * 50 + " TRANSACTION OVERVIEW " + "" * 50,
f"Status: {status_str}",
f"Fee: {fee:,} sats ({fee/100000000:.8f} BTC)",
f"Size: {size:,} bytes | Weight: {weight:,} vB | Fee Rate: {fee_rate:.2f} sat/vB",
f"Version: {base_data.get('version', 'N/A')} | Locktime: {base_data.get('locktime', 'N/A')}"
])
# Network Context
if analyzer.fee_recommendations:
output.append(f"\nCurrent Network Fees - Fast: {analyzer.fee_recommendations.get('fastestFee', 'N/A')} | "
f"Hour: {analyzer.fee_recommendations.get('hourFee', 'N/A')} | "
f"Economy: {analyzer.fee_recommendations.get('economyFee', 'N/A')} sat/vB")
# Input Analysis
vin = base_data.get("vin", [])
output.append("\n" + "" * 50 + f" INPUTS ({len(vin)}) " + "" * 50)
if not vin:
output.append("No inputs found (coinbase transaction)")
else:
total_input_value = 0
for i, inp in enumerate(vin, 1):
prevout = inp.get("prevout", {})
value = prevout.get('value', 0)
total_input_value += value
script_type = prevout.get('scriptpubkey_type', 'unknown')
output.append(f" {i}. TXID: {inp.get('txid', 'N/A')[:16]}...")
output.append(f" Value: {value:,} sats | Address: {prevout.get('scriptpubkey_address', 'N/A')}")
output.append(f" Script Type: {script_type}")
# Output Analysis
vout = base_data.get("vout", [])
output.append("\n" + "" * 50 + f" OUTPUTS ({len(vout)}) " + "" * 50)
total_output_value = 0
for i, out in enumerate(vout, 1):
value = out.get('value', 0)
total_output_value += value
script_type = out.get('scriptpubkey_type', 'unknown')
output.append(f" {i}. Value: {value:,} sats ({value/100000000:.8f} BTC)")
output.append(f" Address: {out.get('scriptpubkey_address', 'N/A')}")
output.append(f" Script Type: {script_type}")
# Fee verification
calculated_fee = total_input_value - total_output_value
output.append(f"\nFee Verification: Calculated={calculated_fee:,} sats | Reported={fee:,} sats")
if abs(calculated_fee - fee) > 1:
output.append("⚠️ WARNING: Fee mismatch detected!")
# Change Address Analysis
output.append("\n" + "=" * 100)
output.append("ADVANCED CHANGE ADDRESS ANALYSIS")
output.append("=" * 100)
output.append("\nMethodology: Multi-heuristic probabilistic model analyzing:")
output.append("• Round number patterns (payments often use round amounts)")
output.append("• Address reuse and history (change addresses often new)")
output.append("• Relative output values (change often smaller/different)")
output.append("• Positional patterns (change position varies by wallet)")
output.append("• Script type consistency (wallets reuse address types)")
output.append("• Transaction context (fee rates, timing, structure)")
if change_analysis:
# Sort by probability for easy identification
sorted_analysis = sorted(change_analysis, key=lambda x: x['probability'], reverse=True)
output.append("\n" + "" * 80 + " RESULTS " + "" * 80)
for result in sorted_analysis:
prob = result['probability']
conf = result['confidence']
# Determine label
if prob > 0.7:
label = "🟢 LIKELY CHANGE"
elif prob > 0.5:
label = "🟡 POSSIBLE CHANGE"
elif prob < 0.3:
label = "🔴 LIKELY PAYMENT"
else:
label = "⚪ UNCERTAIN"
output.append(f"\nOutput {result['index']} - {result['address'][:20]}{'...' if len(result['address']) > 20 else ''}")
output.append(f"Value: {result['value']:,} sats | Probability: {prob:.1%} | Confidence: {conf}")
output.append(f"Assessment: {label}")
output.append("Detailed Analysis:")
for feature, details in result['details'].items():
score = details['score']
contribution = details['contribution']
reason = details['reason']
indicator = "+" if score > 0 else "-" if score < 0 else "="
output.append(f" {indicator} {feature.replace('_', ' ').title()}: {reason}")
output.append(f" Score: {score:+.2f} | Weight: {details['weight']:.1f} | Contribution: {contribution:+.2f}")
# Summary
most_likely_change = max(sorted_analysis, key=lambda x: x['probability'])
if most_likely_change['probability'] > 0.5:
output.append("\n" + "" * 80 + " SUMMARY " + "" * 80)
output.append(f"Most Likely Change: Output {most_likely_change['index']} "
f"({most_likely_change['probability']:.1%} probability)")
output.append(f"Address: {most_likely_change['address']}")
output.append(f"Value: {most_likely_change['value']:,} sats")
else:
output.append("\n" + "" * 80 + " SUMMARY " + "" * 80)
output.append("⚠️ No clear change address identified - all outputs show low change probability")
output.append("This may indicate: multiple payments, exchange transaction, or privacy technique")
else:
output.append("\n❌ Could not perform change address analysis due to insufficient data")
# Spending Status Analysis
outspends_data = all_data.get('outspends')
output.append("\n" + "" * 50 + " SPENDING STATUS " + "" * 50)
if outspends_data:
for i, spend_info in enumerate(outspends_data):
if spend_info and spend_info.get('spent'):
spend_txid = spend_info.get('txid', 'N/A')
spend_vin = spend_info.get('vin', 'N/A')
spend_status = spend_info.get('status', {})
if spend_status.get('confirmed'):
spend_height = spend_status.get('block_height', 'N/A')
output.append(f" Output {i}: ✅ Spent in TX {spend_txid} (input {spend_vin}) at block {spend_height}")
else:
output.append(f" Output {i}: 🟡 Spent in unconfirmed TX {spend_txid}")
else:
output.append(f" Output {i}: 💰 Unspent (UTXO)")
else:
output.append("Could not retrieve spending status information")
# RBF Analysis
rbf_data = all_data.get('rbf')
output.append("\n" + "" * 50 + " RBF HISTORY " + "" * 50)
if rbf_data:
replacements = rbf_data.get('replacements')
replaces = rbf_data.get('replaces', [])
if replaces:
output.append("🔄 This transaction REPLACED the following:")
for replaced_txid in replaces:
output.append(f"{replaced_txid}")
if replacements:
output.append("🔄 This transaction was REPLACED by:")
output.append(f"{replacements.get('tx', {}).get('txid', 'N/A')}")
if not replaces and not replacements:
output.append("No RBF activity detected")
else:
output.append("No RBF history available")
# Raw Transaction Data
hex_data = all_data.get('hex')
output.append("\n" + "" * 50 + " RAW TRANSACTION " + "" * 50)
if hex_data:
output.append(f"Raw Hex ({len(hex_data)} characters):")
output.append(hex_data)
else:
output.append("Raw hex data not available")
output.append("\n" + "=" * 100)
output.append(f"Report generated at: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC")
output.append("=" * 100 + "\n\n")
return "\n".join(output)
def main():
"""Main function with enhanced argument parsing and execution."""
parser = argparse.ArgumentParser(
description="Advanced Bitcoin Transaction Forensic Analyzer v2.0",
formatter_class=argparse.RawTextHelpFormatter,
epilog="""
Features:
• Multi-heuristic change address detection
• Comprehensive transaction analysis
• Network context awareness
• Probabilistic scoring system
• Detailed forensic reporting
Example:
python btc_forensic.py transactions.txt report.txt
"""
)
parser.add_argument("input_file",
help="Path to input file containing transaction IDs (one per line)")
parser.add_argument("output_file",
help="Path to output file for the forensic report")
parser.add_argument("--verbose", "-v", action="store_true",
help="Enable verbose output")
parser.add_argument("--delay", "-d", type=float, default=0.1,
help="Delay between API requests in seconds (default: 0.1)")
args = parser.parse_args()
# Update delay if specified
global REQUEST_DELAY
REQUEST_DELAY = args.delay
# Load transaction IDs
try:
with open(args.input_file, 'r', encoding='utf-8') as f:
txids = [line.strip() for line in f if line.strip() and not line.startswith('#')]
except FileNotFoundError:
print(f"❌ Error: Input file '{args.input_file}' not found.")
sys.exit(1)
except Exception as e:
print(f"❌ Error reading input file: {e}")
sys.exit(1)
if not txids:
print("❌ Error: No valid transaction IDs found in input file.")
sys.exit(1)
print(f"🔍 Starting comprehensive forensic analysis of {len(txids)} transactions...")
print("📊 Using enhanced heuristics with probabilistic scoring")
print(f"🌐 Target API: {BASE_URL_DEFAULT}")
# Initialize analyzer
analyzer = TransactionAnalyzer()
# Process transactions
with open(args.output_file, 'w', encoding='utf-8') as out_file:
successful = 0
failed = 0
for i, txid in enumerate(txids, 1):
print(f"\n[{i}/{len(txids)}] Processing: {txid}")
print("-" * 80)
try:
# Fetch comprehensive data
all_details = fetch_comprehensive_details(txid, analyzer)
if all_details and all_details.get('base'):
# Perform advanced analysis
change_analysis = perform_comprehensive_change_analysis(
all_details['base'], analyzer
)
# Generate report
formatted_report = format_comprehensive_report(
all_details, change_analysis, analyzer
)
out_file.write(formatted_report)
out_file.flush() # Ensure data is written
successful += 1
print("✅ Analysis completed successfully")
else:
error_msg = f"❌ Failed to fetch critical data for {txid}\n\n"
out_file.write(error_msg)
failed += 1
print("❌ Failed to fetch critical data")
except Exception as e:
error_msg = f"❌ Error processing {txid}: {str(e)}\n\n"
out_file.write(error_msg)
failed += 1
print(f"❌ Error: {str(e)}")
# Final summary
print("\n" + "=" * 80)
print("📋 ANALYSIS SUMMARY")
print("=" * 80)
print(f"✅ Successfully analyzed: {successful} transactions")
print(f"❌ Failed to analyze: {failed} transactions")
print(f"📄 Comprehensive report saved to: {args.output_file}")
print(f"🕒 Total processing time: {datetime.utcnow().strftime('%H:%M:%S')} UTC")
if successful > 0:
print("\n🎯 Advanced forensic analysis complete!")
print(" Report includes probabilistic change detection,")
print(" comprehensive transaction analysis, and detailed")
print(" heuristic breakdowns for enhanced investigation.")
if __name__ == "__main__":
main()