781 lines
32 KiB
Python
781 lines
32 KiB
Python
import requests
|
|
import argparse
|
|
import sys
|
|
import json
|
|
import math
|
|
from datetime import datetime
|
|
from collections import Counter, defaultdict
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
import time
|
|
|
|
# --- Configuration ---
|
|
BASE_URL_V1 = "https://mempool.mikoshi.de/api/v1"
|
|
BASE_URL_DEFAULT = "https://mempool.mikoshi.de/api"
|
|
SESSION = requests.Session()
|
|
SESSION.headers.update({
|
|
'User-Agent': 'Bitcoin-Forensic-Analyzer/2.0'
|
|
})
|
|
|
|
# Rate limiting (even though not needed, good practice)
|
|
REQUEST_DELAY = 0.1
|
|
|
|
class TransactionAnalyzer:
|
|
def __init__(self):
|
|
self.mempool_stats = None
|
|
self.fee_recommendations = None
|
|
self._load_network_context()
|
|
|
|
def _load_network_context(self):
|
|
"""Load current network context for analysis."""
|
|
try:
|
|
# Get current mempool state
|
|
mempool_resp = SESSION.get(f"{BASE_URL_DEFAULT}/mempool", timeout=10)
|
|
if mempool_resp.status_code == 200:
|
|
self.mempool_stats = mempool_resp.json()
|
|
|
|
# Get current fee recommendations
|
|
fee_resp = SESSION.get(f"{BASE_URL_V1}/fees/recommended", timeout=10)
|
|
if fee_resp.status_code == 200:
|
|
self.fee_recommendations = fee_resp.json()
|
|
|
|
except requests.RequestException:
|
|
print(" - Warning: Could not load network context")
|
|
|
|
def analyze_round_numbers(self, value_sats: int) -> Tuple[float, str]:
|
|
"""Analyze if value appears to be a round number (indicates payment)."""
|
|
value_btc = value_sats / 100_000_000
|
|
|
|
# Very round BTC amounts (strong payment indicators)
|
|
round_btc_amounts = [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0]
|
|
for amount in round_btc_amounts:
|
|
if abs(value_btc - amount) < 0.000001:
|
|
return -0.4, f"Exact round amount: {amount} BTC"
|
|
|
|
# Round satoshi amounts
|
|
if value_sats >= 100000 and value_sats % 100000 == 0:
|
|
return -0.3, f"Round 100k sats: {value_sats}"
|
|
if value_sats >= 10000 and value_sats % 10000 == 0:
|
|
return -0.2, f"Round 10k sats: {value_sats}"
|
|
if value_sats >= 1000 and value_sats % 1000 == 0:
|
|
return -0.1, f"Round 1k sats: {value_sats}"
|
|
|
|
# Very precise amounts suggest change
|
|
value_str = str(value_sats)
|
|
if len(value_str) >= 8 and not value_str.endswith('0000'):
|
|
return 0.2, f"Precise amount: {value_sats} sats"
|
|
|
|
return 0.0, "No clear round number pattern"
|
|
|
|
def analyze_output_values(self, outputs: List[Dict]) -> List[Tuple[float, str]]:
|
|
"""Analyze relative value patterns across outputs."""
|
|
if len(outputs) < 2:
|
|
return [(0.0, "Single output - no relative analysis")] * len(outputs)
|
|
|
|
values = [out['value'] for out in outputs]
|
|
total_value = sum(values)
|
|
|
|
scores = []
|
|
for i, value in enumerate(values):
|
|
score = 0.0
|
|
reasons = []
|
|
|
|
percentage = (value / total_value) * 100
|
|
|
|
# Very small outputs often change (especially in consolidations)
|
|
if percentage < 5:
|
|
score += 0.15
|
|
reasons.append(f"Small output: {percentage:.1f}% of total")
|
|
|
|
# Very large outputs rarely change
|
|
elif percentage > 80:
|
|
score -= 0.25
|
|
reasons.append(f"Dominant output: {percentage:.1f}% of total")
|
|
|
|
# Check if smallest output (common change pattern)
|
|
if value == min(values) and len(values) > 2:
|
|
score += 0.1
|
|
reasons.append("Smallest output")
|
|
|
|
# Check if second smallest (also common for change)
|
|
sorted_values = sorted(values)
|
|
if len(values) > 2 and value == sorted_values[1]:
|
|
score += 0.05
|
|
reasons.append("Second smallest output")
|
|
|
|
reason_text = "; ".join(reasons) if reasons else "No relative value indicators"
|
|
scores.append((score, reason_text))
|
|
|
|
return scores
|
|
|
|
def analyze_position_patterns(self, outputs: List[Dict], input_count: int) -> List[Tuple[float, str]]:
|
|
"""Analyze positional and structural patterns."""
|
|
scores = []
|
|
output_count = len(outputs)
|
|
|
|
for i, output in enumerate(outputs):
|
|
score = 0.0
|
|
reasons = []
|
|
|
|
# Last position often change (but not always)
|
|
if i == output_count - 1 and output_count > 1:
|
|
score += 0.1
|
|
reasons.append("Last position")
|
|
|
|
# Two-output transactions - second often change
|
|
if output_count == 2 and i == 1:
|
|
score += 0.15
|
|
reasons.append("Second output in 2-output tx")
|
|
|
|
# Consolidation pattern (many inputs, few outputs)
|
|
if input_count >= 5 and output_count <= 2:
|
|
score += 0.2
|
|
reasons.append(f"Consolidation pattern: {input_count} inputs → {output_count} outputs")
|
|
|
|
# Fan-out pattern (few inputs, many outputs) - change usually first or last
|
|
if input_count <= 2 and output_count >= 5:
|
|
if i == 0 or i == output_count - 1:
|
|
score += 0.1
|
|
reasons.append("Fan-out pattern: position suggests change")
|
|
|
|
reason_text = "; ".join(reasons) if reasons else "No positional indicators"
|
|
scores.append((score, reason_text))
|
|
|
|
return scores
|
|
|
|
def analyze_address_advanced(self, address: str) -> Tuple[float, str]:
|
|
"""Enhanced address history analysis."""
|
|
if not address or address == "N/A":
|
|
return 0.0, "No address to analyze"
|
|
|
|
try:
|
|
time.sleep(REQUEST_DELAY)
|
|
|
|
# Get comprehensive address info
|
|
addr_resp = SESSION.get(f"{BASE_URL_DEFAULT}/address/{address}", timeout=15)
|
|
if addr_resp.status_code != 200:
|
|
return 0.0, "Could not fetch address info"
|
|
|
|
addr_info = addr_resp.json()
|
|
chain_stats = addr_info.get('chain_stats', {})
|
|
mempool_stats = addr_info.get('mempool_stats', {})
|
|
|
|
tx_count = chain_stats.get('tx_count', 0)
|
|
funded_count = chain_stats.get('funded_txo_count', 0)
|
|
spent_count = chain_stats.get('spent_txo_count', 0)
|
|
|
|
# Brand new addresses are very likely change
|
|
if tx_count == 0:
|
|
return 0.4, "Brand new address (0 transactions)"
|
|
|
|
if tx_count == 1:
|
|
return 0.35, "Address used only once before"
|
|
|
|
# Get transaction history for pattern analysis
|
|
time.sleep(REQUEST_DELAY)
|
|
txs_resp = SESSION.get(f"{BASE_URL_DEFAULT}/address/{address}/txs", timeout=15)
|
|
|
|
if txs_resp.status_code == 200:
|
|
txs = txs_resp.json()
|
|
|
|
# Analyze usage patterns
|
|
if len(txs) <= 3:
|
|
score = 0.25
|
|
reason = f"Low usage: {len(txs)} transactions"
|
|
elif len(txs) <= 5:
|
|
score = 0.1
|
|
reason = f"Moderate usage: {len(txs)} transactions"
|
|
else:
|
|
# Check if it's an exchange/service address pattern
|
|
if len(txs) > 100:
|
|
score = -0.3
|
|
reason = f"High activity address: {len(txs)} transactions (likely service)"
|
|
else:
|
|
score = -0.1
|
|
reason = f"Regular usage: {len(txs)} transactions"
|
|
|
|
# Check for rapid reuse (suggests not change)
|
|
if len(txs) >= 2:
|
|
recent_txs = [tx for tx in txs if tx.get('status', {}).get('confirmed', False)]
|
|
if len(recent_txs) >= 2:
|
|
time_diff = recent_txs[0].get('status', {}).get('block_time', 0) - recent_txs[1].get('status', {}).get('block_time', 0)
|
|
if abs(time_diff) < 3600: # Less than 1 hour between uses
|
|
score -= 0.15
|
|
reason += "; rapid reuse detected"
|
|
|
|
return score, reason
|
|
|
|
# Fallback to basic analysis
|
|
if tx_count <= 2:
|
|
return 0.25, f"Low transaction count: {tx_count}"
|
|
elif tx_count <= 10:
|
|
return 0.0, f"Moderate transaction count: {tx_count}"
|
|
else:
|
|
return -0.2, f"High transaction count: {tx_count}"
|
|
|
|
except requests.RequestException as e:
|
|
return 0.0, f"Network error analyzing address: {str(e)}"
|
|
|
|
def analyze_script_complexity(self, output: Dict) -> Tuple[float, str]:
|
|
"""Analyze script type and complexity."""
|
|
script_type = output.get('scriptpubkey_type', 'unknown')
|
|
script_hex = output.get('scriptpubkey', '')
|
|
|
|
# Standard single-sig types are common for change
|
|
if script_type in ['p2pkh', 'p2wpkh']:
|
|
return 0.1, f"Standard single-sig: {script_type}"
|
|
|
|
# Wrapped segwit also common for change
|
|
if script_type == 'p2sh':
|
|
return 0.05, "P2SH (possibly wrapped segwit)"
|
|
|
|
# Native segwit
|
|
if script_type in ['p2wsh', 'p2tr']:
|
|
return 0.0, f"Advanced script type: {script_type}"
|
|
|
|
# Multi-sig and complex scripts less likely to be change
|
|
if script_type == 'v0_p2wsh' or 'multisig' in script_type.lower():
|
|
return -0.2, f"Complex script: {script_type}"
|
|
|
|
# OP_RETURN and non-standard outputs definitely not change
|
|
if script_type in ['op_return', 'nulldata']:
|
|
return -1.0, "OP_RETURN or nulldata output"
|
|
|
|
return 0.0, f"Unknown script type: {script_type}"
|
|
|
|
def analyze_fee_context(self, base_data: Dict) -> Tuple[float, str]:
|
|
"""Analyze transaction in context of current fee environment."""
|
|
if not self.fee_recommendations:
|
|
return 0.0, "No fee context available"
|
|
|
|
fee = base_data.get('fee', 0)
|
|
weight = base_data.get('weight', 1)
|
|
fee_rate = fee / (weight / 4) if weight > 0 else 0
|
|
|
|
# Compare to recommended fees
|
|
fast_fee = self.fee_recommendations.get('fastestFee', 1)
|
|
hour_fee = self.fee_recommendations.get('hourFee', 1)
|
|
economy_fee = self.fee_recommendations.get('economyFee', 1)
|
|
|
|
if fee_rate > fast_fee * 2:
|
|
return 0.1, f"High fee rate: {fee_rate:.1f} sat/vB (suggests urgent payment)"
|
|
elif fee_rate < economy_fee * 0.5:
|
|
return 0.05, f"Very low fee rate: {fee_rate:.1f} sat/vB (suggests batching/consolidation)"
|
|
|
|
return 0.0, f"Normal fee rate: {fee_rate:.1f} sat/vB"
|
|
|
|
def calculate_change_probability(self, features: Dict[str, Tuple[float, str]]) -> Tuple[float, Dict[str, Any]]:
|
|
"""Calculate change probability using weighted features."""
|
|
|
|
# Feature weights based on empirical effectiveness
|
|
weights = {
|
|
'round_number': 1.0, # Strong indicator
|
|
'address_history': 0.8, # Very reliable
|
|
'relative_value': 0.6, # Good indicator
|
|
'position': 0.4, # Moderate indicator
|
|
'script_complexity': 0.3, # Weak but useful
|
|
'fee_context': 0.2, # Minor indicator
|
|
'address_type_reuse': 0.7 # Strong indicator
|
|
}
|
|
|
|
weighted_score = 0.0
|
|
total_weight = 0.0
|
|
details = {}
|
|
|
|
for feature_name, (score, reason) in features.items():
|
|
if feature_name in weights:
|
|
weight = weights[feature_name]
|
|
weighted_score += score * weight
|
|
total_weight += weight
|
|
details[feature_name] = {
|
|
'score': score,
|
|
'weight': weight,
|
|
'contribution': score * weight,
|
|
'reason': reason
|
|
}
|
|
|
|
# Normalize score
|
|
if total_weight > 0:
|
|
normalized_score = weighted_score / total_weight
|
|
else:
|
|
normalized_score = 0.0
|
|
|
|
# Convert to probability using sigmoid function
|
|
probability = 1 / (1 + math.exp(-normalized_score * 3)) # Scale factor of 3
|
|
|
|
return probability, details
|
|
|
|
def analyze_address_type_reuse(self, base_data: Dict, output_index: int) -> Tuple[float, str]:
|
|
"""Check if output address type matches input types."""
|
|
if not base_data or 'vin' not in base_data or 'vout' not in base_data:
|
|
return 0.0, "Insufficient data for address type analysis"
|
|
|
|
inputs = base_data.get('vin', [])
|
|
outputs = base_data.get('vout', [])
|
|
|
|
if output_index >= len(outputs):
|
|
return 0.0, "Invalid output index"
|
|
|
|
output = outputs[output_index]
|
|
output_type = output.get('scriptpubkey_type')
|
|
|
|
if not output_type:
|
|
return 0.0, "No output script type"
|
|
|
|
# Collect input types
|
|
input_types = []
|
|
for inp in inputs:
|
|
prevout = inp.get('prevout', {})
|
|
inp_type = prevout.get('scriptpubkey_type')
|
|
if inp_type:
|
|
input_types.append(inp_type)
|
|
|
|
if not input_types:
|
|
return 0.0, "No input script types available"
|
|
|
|
# Find dominant input type
|
|
type_counts = Counter(input_types)
|
|
dominant_type, dominant_count = type_counts.most_common(1)[0]
|
|
|
|
if output_type == dominant_type:
|
|
reuse_percentage = (dominant_count / len(input_types)) * 100
|
|
return 0.3, f"Address type '{output_type}' matches {reuse_percentage:.0f}% of inputs"
|
|
else:
|
|
return -0.1, f"Address type '{output_type}' differs from dominant input type '{dominant_type}'"
|
|
|
|
|
|
def fetch_comprehensive_details(txid: str, analyzer: TransactionAnalyzer) -> Optional[Dict]:
|
|
"""Fetch comprehensive transaction details from multiple endpoints."""
|
|
print(f"Fetching comprehensive data for {txid}...")
|
|
|
|
results = {}
|
|
|
|
# Primary endpoints
|
|
endpoints = {
|
|
'base': f"{BASE_URL_DEFAULT}/tx/{txid}",
|
|
'outspends': f"{BASE_URL_DEFAULT}/tx/{txid}/outspends",
|
|
'rbf': f"{BASE_URL_V1}/tx/{txid}/rbf",
|
|
'hex': f"{BASE_URL_DEFAULT}/tx/{txid}/hex",
|
|
'status': f"{BASE_URL_DEFAULT}/tx/{txid}/status"
|
|
}
|
|
|
|
for key, url in endpoints.items():
|
|
try:
|
|
time.sleep(REQUEST_DELAY)
|
|
response = SESSION.get(url, timeout=15)
|
|
if response.status_code == 200:
|
|
results[key] = response.text if key == 'hex' else response.json()
|
|
print(f" ✓ Fetched {key}")
|
|
else:
|
|
results[key] = None
|
|
print(f" - {key} not available (Status: {response.status_code})")
|
|
except requests.RequestException as e:
|
|
print(f" - Network error fetching {key}: {e}")
|
|
results[key] = None
|
|
if key == 'base':
|
|
return None
|
|
|
|
return results
|
|
|
|
|
|
def perform_comprehensive_change_analysis(base_data: Dict, analyzer: TransactionAnalyzer) -> Optional[List[Dict]]:
|
|
"""Perform comprehensive change address analysis using multiple heuristics."""
|
|
if not base_data or 'vout' not in base_data:
|
|
return None
|
|
|
|
outputs = base_data.get('vout', [])
|
|
inputs = base_data.get('vin', [])
|
|
|
|
print(f" - Analyzing {len(outputs)} outputs using advanced heuristics...")
|
|
|
|
analysis_results = []
|
|
|
|
# Run value analysis once for all outputs
|
|
value_scores = analyzer.analyze_output_values(outputs)
|
|
position_scores = analyzer.analyze_position_patterns(outputs, len(inputs))
|
|
|
|
for i, output in enumerate(outputs):
|
|
print(f" → Analyzing output {i}")
|
|
|
|
address = output.get('scriptpubkey_address', 'N/A')
|
|
value = output.get('value', 0)
|
|
|
|
# Skip non-standard outputs
|
|
if not address or address == 'N/A':
|
|
analysis_results.append({
|
|
'index': i,
|
|
'address': 'N/A',
|
|
'value': value,
|
|
'probability': 0.0,
|
|
'confidence': 'HIGH',
|
|
'reasoning': 'Non-standard output (e.g., OP_RETURN)',
|
|
'details': {}
|
|
})
|
|
continue
|
|
|
|
# Collect all features
|
|
features = {}
|
|
|
|
# Round number analysis
|
|
round_score, round_reason = analyzer.analyze_round_numbers(value)
|
|
features['round_number'] = (round_score, round_reason)
|
|
|
|
# Address history analysis
|
|
addr_score, addr_reason = analyzer.analyze_address_advanced(address)
|
|
features['address_history'] = (addr_score, addr_reason)
|
|
|
|
# Relative value analysis
|
|
rel_score, rel_reason = value_scores[i]
|
|
features['relative_value'] = (rel_score, rel_reason)
|
|
|
|
# Position analysis
|
|
pos_score, pos_reason = position_scores[i]
|
|
features['position'] = (pos_score, pos_reason)
|
|
|
|
# Script complexity analysis
|
|
script_score, script_reason = analyzer.analyze_script_complexity(output)
|
|
features['script_complexity'] = (script_score, script_reason)
|
|
|
|
# Fee context analysis
|
|
fee_score, fee_reason = analyzer.analyze_fee_context(base_data)
|
|
features['fee_context'] = (fee_score, fee_reason)
|
|
|
|
# Address type reuse analysis
|
|
type_score, type_reason = analyzer.analyze_address_type_reuse(base_data, i)
|
|
features['address_type_reuse'] = (type_score, type_reason)
|
|
|
|
# Calculate final probability
|
|
probability, feature_details = analyzer.calculate_change_probability(features)
|
|
|
|
# Determine confidence level
|
|
confidence = "MEDIUM"
|
|
if probability > 0.8 or probability < 0.2:
|
|
confidence = "HIGH"
|
|
elif probability > 0.6 or probability < 0.4:
|
|
confidence = "MEDIUM"
|
|
else:
|
|
confidence = "LOW"
|
|
|
|
analysis_results.append({
|
|
'index': i,
|
|
'address': address,
|
|
'value': value,
|
|
'probability': probability,
|
|
'confidence': confidence,
|
|
'reasoning': f"Change probability: {probability:.1%}",
|
|
'details': feature_details
|
|
})
|
|
|
|
return analysis_results
|
|
|
|
|
|
def format_comprehensive_report(all_data: Dict, change_analysis: Optional[List[Dict]], analyzer: TransactionAnalyzer) -> str:
|
|
"""Format comprehensive forensic report."""
|
|
base_data = all_data.get('base')
|
|
if not base_data:
|
|
return "Could not retrieve base transaction data.\n\n"
|
|
|
|
txid = base_data.get("txid", "N/A")
|
|
fee = base_data.get("fee", 0)
|
|
weight = base_data.get("weight", 1)
|
|
size = base_data.get("size", 0)
|
|
|
|
output = ["=" * 100, f"COMPREHENSIVE FORENSIC ANALYSIS: {txid}", "=" * 100]
|
|
|
|
# Transaction Overview
|
|
status = base_data.get("status", {})
|
|
if status.get("confirmed", False):
|
|
block_time = datetime.utcfromtimestamp(status.get("block_time", 0)).strftime('%Y-%m-%d %H:%M:%S UTC')
|
|
status_str = f"Confirmed in block {status.get('block_height', 'N/A')} at {block_time}"
|
|
else:
|
|
status_str = "Unconfirmed (in mempool)"
|
|
|
|
fee_rate = fee / (weight / 4) if weight > 0 else 0
|
|
|
|
output.extend([
|
|
"\n" + "─" * 50 + " TRANSACTION OVERVIEW " + "─" * 50,
|
|
f"Status: {status_str}",
|
|
f"Fee: {fee:,} sats ({fee/100000000:.8f} BTC)",
|
|
f"Size: {size:,} bytes | Weight: {weight:,} vB | Fee Rate: {fee_rate:.2f} sat/vB",
|
|
f"Version: {base_data.get('version', 'N/A')} | Locktime: {base_data.get('locktime', 'N/A')}"
|
|
])
|
|
|
|
# Network Context
|
|
if analyzer.fee_recommendations:
|
|
output.append(f"\nCurrent Network Fees - Fast: {analyzer.fee_recommendations.get('fastestFee', 'N/A')} | "
|
|
f"Hour: {analyzer.fee_recommendations.get('hourFee', 'N/A')} | "
|
|
f"Economy: {analyzer.fee_recommendations.get('economyFee', 'N/A')} sat/vB")
|
|
|
|
# Input Analysis
|
|
vin = base_data.get("vin", [])
|
|
output.append("\n" + "─" * 50 + f" INPUTS ({len(vin)}) " + "─" * 50)
|
|
|
|
if not vin:
|
|
output.append("No inputs found (coinbase transaction)")
|
|
else:
|
|
total_input_value = 0
|
|
for i, inp in enumerate(vin, 1):
|
|
prevout = inp.get("prevout", {})
|
|
value = prevout.get('value', 0)
|
|
total_input_value += value
|
|
script_type = prevout.get('scriptpubkey_type', 'unknown')
|
|
|
|
output.append(f" {i}. TXID: {inp.get('txid', 'N/A')[:16]}...")
|
|
output.append(f" Value: {value:,} sats | Address: {prevout.get('scriptpubkey_address', 'N/A')}")
|
|
output.append(f" Script Type: {script_type}")
|
|
|
|
# Output Analysis
|
|
vout = base_data.get("vout", [])
|
|
output.append("\n" + "─" * 50 + f" OUTPUTS ({len(vout)}) " + "─" * 50)
|
|
|
|
total_output_value = 0
|
|
for i, out in enumerate(vout, 1):
|
|
value = out.get('value', 0)
|
|
total_output_value += value
|
|
script_type = out.get('scriptpubkey_type', 'unknown')
|
|
|
|
output.append(f" {i}. Value: {value:,} sats ({value/100000000:.8f} BTC)")
|
|
output.append(f" Address: {out.get('scriptpubkey_address', 'N/A')}")
|
|
output.append(f" Script Type: {script_type}")
|
|
|
|
# Fee verification
|
|
calculated_fee = total_input_value - total_output_value
|
|
output.append(f"\nFee Verification: Calculated={calculated_fee:,} sats | Reported={fee:,} sats")
|
|
if abs(calculated_fee - fee) > 1:
|
|
output.append("⚠️ WARNING: Fee mismatch detected!")
|
|
|
|
# Change Address Analysis
|
|
output.append("\n" + "=" * 100)
|
|
output.append("ADVANCED CHANGE ADDRESS ANALYSIS")
|
|
output.append("=" * 100)
|
|
output.append("\nMethodology: Multi-heuristic probabilistic model analyzing:")
|
|
output.append("• Round number patterns (payments often use round amounts)")
|
|
output.append("• Address reuse and history (change addresses often new)")
|
|
output.append("• Relative output values (change often smaller/different)")
|
|
output.append("• Positional patterns (change position varies by wallet)")
|
|
output.append("• Script type consistency (wallets reuse address types)")
|
|
output.append("• Transaction context (fee rates, timing, structure)")
|
|
|
|
if change_analysis:
|
|
# Sort by probability for easy identification
|
|
sorted_analysis = sorted(change_analysis, key=lambda x: x['probability'], reverse=True)
|
|
|
|
output.append("\n" + "─" * 80 + " RESULTS " + "─" * 80)
|
|
|
|
for result in sorted_analysis:
|
|
prob = result['probability']
|
|
conf = result['confidence']
|
|
|
|
# Determine label
|
|
if prob > 0.7:
|
|
label = "🟢 LIKELY CHANGE"
|
|
elif prob > 0.5:
|
|
label = "🟡 POSSIBLE CHANGE"
|
|
elif prob < 0.3:
|
|
label = "🔴 LIKELY PAYMENT"
|
|
else:
|
|
label = "⚪ UNCERTAIN"
|
|
|
|
output.append(f"\nOutput {result['index']} - {result['address'][:20]}{'...' if len(result['address']) > 20 else ''}")
|
|
output.append(f"Value: {result['value']:,} sats | Probability: {prob:.1%} | Confidence: {conf}")
|
|
output.append(f"Assessment: {label}")
|
|
output.append("Detailed Analysis:")
|
|
|
|
for feature, details in result['details'].items():
|
|
score = details['score']
|
|
contribution = details['contribution']
|
|
reason = details['reason']
|
|
|
|
indicator = "+" if score > 0 else "-" if score < 0 else "="
|
|
output.append(f" {indicator} {feature.replace('_', ' ').title()}: {reason}")
|
|
output.append(f" Score: {score:+.2f} | Weight: {details['weight']:.1f} | Contribution: {contribution:+.2f}")
|
|
|
|
# Summary
|
|
most_likely_change = max(sorted_analysis, key=lambda x: x['probability'])
|
|
if most_likely_change['probability'] > 0.5:
|
|
output.append("\n" + "─" * 80 + " SUMMARY " + "─" * 80)
|
|
output.append(f"Most Likely Change: Output {most_likely_change['index']} "
|
|
f"({most_likely_change['probability']:.1%} probability)")
|
|
output.append(f"Address: {most_likely_change['address']}")
|
|
output.append(f"Value: {most_likely_change['value']:,} sats")
|
|
else:
|
|
output.append("\n" + "─" * 80 + " SUMMARY " + "─" * 80)
|
|
output.append("⚠️ No clear change address identified - all outputs show low change probability")
|
|
output.append("This may indicate: multiple payments, exchange transaction, or privacy technique")
|
|
|
|
else:
|
|
output.append("\n❌ Could not perform change address analysis due to insufficient data")
|
|
|
|
# Spending Status Analysis
|
|
outspends_data = all_data.get('outspends')
|
|
output.append("\n" + "─" * 50 + " SPENDING STATUS " + "─" * 50)
|
|
|
|
if outspends_data:
|
|
for i, spend_info in enumerate(outspends_data):
|
|
if spend_info and spend_info.get('spent'):
|
|
spend_txid = spend_info.get('txid', 'N/A')
|
|
spend_vin = spend_info.get('vin', 'N/A')
|
|
spend_status = spend_info.get('status', {})
|
|
|
|
if spend_status.get('confirmed'):
|
|
spend_height = spend_status.get('block_height', 'N/A')
|
|
output.append(f" Output {i}: ✅ Spent in TX {spend_txid} (input {spend_vin}) at block {spend_height}")
|
|
else:
|
|
output.append(f" Output {i}: 🟡 Spent in unconfirmed TX {spend_txid}")
|
|
else:
|
|
output.append(f" Output {i}: 💰 Unspent (UTXO)")
|
|
else:
|
|
output.append("Could not retrieve spending status information")
|
|
|
|
# RBF Analysis
|
|
rbf_data = all_data.get('rbf')
|
|
output.append("\n" + "─" * 50 + " RBF HISTORY " + "─" * 50)
|
|
|
|
if rbf_data:
|
|
replacements = rbf_data.get('replacements')
|
|
replaces = rbf_data.get('replaces', [])
|
|
|
|
if replaces:
|
|
output.append("🔄 This transaction REPLACED the following:")
|
|
for replaced_txid in replaces:
|
|
output.append(f" ← {replaced_txid}")
|
|
|
|
if replacements:
|
|
output.append("🔄 This transaction was REPLACED by:")
|
|
output.append(f" → {replacements.get('tx', {}).get('txid', 'N/A')}")
|
|
|
|
if not replaces and not replacements:
|
|
output.append("No RBF activity detected")
|
|
else:
|
|
output.append("No RBF history available")
|
|
|
|
# Raw Transaction Data
|
|
hex_data = all_data.get('hex')
|
|
output.append("\n" + "─" * 50 + " RAW TRANSACTION " + "─" * 50)
|
|
if hex_data:
|
|
output.append(f"Raw Hex ({len(hex_data)} characters):")
|
|
output.append(hex_data)
|
|
else:
|
|
output.append("Raw hex data not available")
|
|
|
|
output.append("\n" + "=" * 100)
|
|
output.append(f"Report generated at: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC")
|
|
output.append("=" * 100 + "\n\n")
|
|
|
|
return "\n".join(output)
|
|
|
|
|
|
def main():
|
|
"""Main function with enhanced argument parsing and execution."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Advanced Bitcoin Transaction Forensic Analyzer v2.0",
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
epilog="""
|
|
Features:
|
|
• Multi-heuristic change address detection
|
|
• Comprehensive transaction analysis
|
|
• Network context awareness
|
|
• Probabilistic scoring system
|
|
• Detailed forensic reporting
|
|
|
|
Example:
|
|
python btc_forensic.py transactions.txt report.txt
|
|
"""
|
|
)
|
|
|
|
parser.add_argument("input_file",
|
|
help="Path to input file containing transaction IDs (one per line)")
|
|
parser.add_argument("output_file",
|
|
help="Path to output file for the forensic report")
|
|
parser.add_argument("--verbose", "-v", action="store_true",
|
|
help="Enable verbose output")
|
|
parser.add_argument("--delay", "-d", type=float, default=0.1,
|
|
help="Delay between API requests in seconds (default: 0.1)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Update delay if specified
|
|
global REQUEST_DELAY
|
|
REQUEST_DELAY = args.delay
|
|
|
|
# Load transaction IDs
|
|
try:
|
|
with open(args.input_file, 'r', encoding='utf-8') as f:
|
|
txids = [line.strip() for line in f if line.strip() and not line.startswith('#')]
|
|
except FileNotFoundError:
|
|
print(f"❌ Error: Input file '{args.input_file}' not found.")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"❌ Error reading input file: {e}")
|
|
sys.exit(1)
|
|
|
|
if not txids:
|
|
print("❌ Error: No valid transaction IDs found in input file.")
|
|
sys.exit(1)
|
|
|
|
print(f"🔍 Starting comprehensive forensic analysis of {len(txids)} transactions...")
|
|
print("📊 Using enhanced heuristics with probabilistic scoring")
|
|
print(f"🌐 Target API: {BASE_URL_DEFAULT}")
|
|
|
|
# Initialize analyzer
|
|
analyzer = TransactionAnalyzer()
|
|
|
|
# Process transactions
|
|
with open(args.output_file, 'w', encoding='utf-8') as out_file:
|
|
successful = 0
|
|
failed = 0
|
|
|
|
for i, txid in enumerate(txids, 1):
|
|
print(f"\n[{i}/{len(txids)}] Processing: {txid}")
|
|
print("-" * 80)
|
|
|
|
try:
|
|
# Fetch comprehensive data
|
|
all_details = fetch_comprehensive_details(txid, analyzer)
|
|
|
|
if all_details and all_details.get('base'):
|
|
# Perform advanced analysis
|
|
change_analysis = perform_comprehensive_change_analysis(
|
|
all_details['base'], analyzer
|
|
)
|
|
|
|
# Generate report
|
|
formatted_report = format_comprehensive_report(
|
|
all_details, change_analysis, analyzer
|
|
)
|
|
|
|
out_file.write(formatted_report)
|
|
out_file.flush() # Ensure data is written
|
|
|
|
successful += 1
|
|
print("✅ Analysis completed successfully")
|
|
|
|
else:
|
|
error_msg = f"❌ Failed to fetch critical data for {txid}\n\n"
|
|
out_file.write(error_msg)
|
|
failed += 1
|
|
print("❌ Failed to fetch critical data")
|
|
|
|
except Exception as e:
|
|
error_msg = f"❌ Error processing {txid}: {str(e)}\n\n"
|
|
out_file.write(error_msg)
|
|
failed += 1
|
|
print(f"❌ Error: {str(e)}")
|
|
|
|
# Final summary
|
|
print("\n" + "=" * 80)
|
|
print("📋 ANALYSIS SUMMARY")
|
|
print("=" * 80)
|
|
print(f"✅ Successfully analyzed: {successful} transactions")
|
|
print(f"❌ Failed to analyze: {failed} transactions")
|
|
print(f"📄 Comprehensive report saved to: {args.output_file}")
|
|
print(f"🕒 Total processing time: {datetime.utcnow().strftime('%H:%M:%S')} UTC")
|
|
|
|
if successful > 0:
|
|
print("\n🎯 Advanced forensic analysis complete!")
|
|
print(" Report includes probabilistic change detection,")
|
|
print(" comprehensive transaction analysis, and detailed")
|
|
print(" heuristic breakdowns for enhanced investigation.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |