This commit is contained in:
overcuriousity 2025-09-08 10:37:35 +02:00
parent e005f0104e
commit b96b876e98
2 changed files with 988 additions and 86 deletions

231
README.md Normal file
View File

@ -0,0 +1,231 @@
# Bitcoin Transaction Forensic Analyzer v2.0
A comprehensive Python tool for forensic analysis of Bitcoin transactions with advanced change address detection using probabilistic heuristics.
## Overview
This tool performs in-depth forensic analysis of Bitcoin transactions by:
- Fetching comprehensive transaction data from mempool APIs
- Applying multiple heuristics to identify likely change addresses
- Generating detailed forensic reports with probability assessments
- Providing network context and spending status analysis
## Features
### Core Analysis
- **Multi-heuristic change detection**: Combines 7+ different analytical approaches
- **Probabilistic scoring**: Uses weighted feature analysis instead of arbitrary point systems
- **Comprehensive data collection**: Fetches from multiple API endpoints
- **Network context awareness**: Incorporates current fee environment and mempool state
### Heuristics Implemented
1. **Round Number Analysis**: Identifies payment patterns vs precise change amounts
2. **Address History Analysis**: Examines address reuse patterns and transaction frequency
3. **Relative Value Analysis**: Compares output values and percentages
4. **Positional Pattern Analysis**: Considers wallet-specific change placement strategies
5. **Script Type Analysis**: Checks for address type consistency across inputs/outputs
6. **Fee Context Analysis**: Evaluates transaction urgency and batching patterns
7. **Address Type Reuse**: Determines if outputs match input address types
### Output Features
- Detailed probability assessments with confidence levels
- Comprehensive transaction breakdown
- Spending status tracking
- RBF (Replace-by-Fee) history analysis
- Raw transaction data preservation
- Feature contribution breakdown for each heuristic
## Requirements
### System Requirements
- Python 3.7 or higher
- Internet connection for API access
- ~50MB available disk space
### Python Dependencies
```
requests>=2.25.0
```
### API Access
- Requires access to a mempool.space-compatible API
- Default configuration uses `mempool.mikoshi.de`
- No API key required for basic usage
## Installation
1. **Clone or download** the script file
2. **Install dependencies**:
```bash
pip install requests
```
3. **Verify setup**:
```bash
python btc_forensic.py --help
```
## Usage
### Basic Usage
```bash
python btc_forensic.py input_file.txt output_report.txt
```
### Command Line Options
```bash
python btc_forensic.py [options] input_file output_file
Required Arguments:
input_file Path to text file containing transaction IDs (one per line)
output_file Path where forensic report will be saved
Optional Arguments:
-h, --help Show help message
-v, --verbose Enable verbose output during processing
-d DELAY, --delay Delay between API requests in seconds (default: 0.1)
```
### Input File Format
Create a text file with one transaction ID per line:
```
15e10745f15593a899cef391191bdd3d7c12412cc4696b7bcb669d0feadc8521
dba43fd04b7ae3df8e5b596f2e7fab247c58629d622e3a5213f03a5a09684430
# Comments starting with # are ignored
c4e53c2e37f4fac759fdb0d8380e4d49e6c7211233ae276a44ce7074a1d6d168
```
### Example Usage
```bash
# Basic analysis
python btc_forensic.py transactions.txt forensic_report.txt
# Verbose output with custom delay
python btc_forensic.py -v -d 0.2 suspicious_txs.txt detailed_report.txt
```
## Understanding the Output
### Change Address Analysis
The script provides probability assessments for each output:
- **Probability > 70%**: Likely change address
- **Probability 50-70%**: Possible change address
- **Probability 30-50%**: Uncertain classification
- **Probability < 30%**: Likely payment address
### Confidence Levels
- **HIGH**: Strong evidence supporting the assessment
- **MEDIUM**: Moderate evidence, some uncertainty
- **LOW**: Weak evidence, high uncertainty
### Feature Breakdown
Each output analysis includes detailed reasoning showing:
- Individual heuristic scores and contributions
- Weighted feature analysis
- Specific evidence found (round numbers, address history, etc.)
## Technical Details
### API Endpoints Used
- `/api/tx/{txid}` - Basic transaction data
- `/api/tx/{txid}/outspends` - Spending status
- `/api/v1/tx/{txid}/rbf` - RBF history
- `/api/tx/{txid}/hex` - Raw transaction data
- `/api/address/{address}` - Address statistics
- `/api/address/{address}/txs` - Address transaction history
- `/api/mempool` - Current mempool state
- `/api/v1/fees/recommended` - Fee recommendations
### Scoring Algorithm
The probabilistic model uses weighted feature combination:
1. Each heuristic produces a score (-1.0 to +1.0)
2. Scores are weighted by empirical effectiveness
3. Combined score is normalized using sigmoid function
4. Result is converted to probability percentage
### Rate Limiting
- Default 100ms delay between requests
- Configurable via `--delay` parameter
- Designed to be respectful of API resources
## Limitations and Considerations
### Analytical Limitations
- **Heuristic-based**: Not 100% accurate, provides probability estimates
- **Privacy techniques**: May be less effective against advanced privacy wallets
- **Exchange transactions**: Complex batching patterns may confuse analysis
- **Network conditions**: Accuracy may vary based on fee environment
### Technical Limitations
- **API dependency**: Requires reliable internet connection
- **Rate limits**: May need adjustment for different API providers
- **Memory usage**: Large transaction sets may require significant RAM
### Legal and Ethical Considerations
- **Intended for legitimate forensic analysis only**
- **Compliance**: Ensure usage complies with local laws and regulations
- **Privacy**: Be mindful of privacy implications when analyzing transactions
- **Data handling**: Secure storage and handling of forensic reports required
## Troubleshooting
### Common Issues
**"Network error fetching data"**
- Check internet connection
- Verify API endpoint is accessible
- Try increasing delay with `--delay` parameter
**"No valid transaction IDs found"**
- Verify input file format (one TXID per line)
- Check for proper 64-character hex transaction IDs
- Ensure file encoding is UTF-8
**"Could not retrieve base transaction data"**
- Transaction may not exist or be too recent
- API may be temporarily unavailable
- Try with a known valid transaction ID
**High memory usage**
- Process smaller batches of transactions
- Monitor system resources during execution
- Consider running analysis in segments
### Performance Optimization
- Use appropriate `--delay` setting for your network
- Process transactions in smaller batches for large datasets
- Monitor API response times and adjust accordingly
## Version History
### v2.0 (Current)
- Advanced probabilistic change detection
- Multi-heuristic analysis framework
- Comprehensive forensic reporting
- Network context awareness
- Enhanced API utilization
### v1.0 (Legacy)
- Basic change detection using simple heuristics
- Limited API endpoint usage
- Basic reporting functionality
## Support and Contributing
### Reporting Issues
When reporting issues, please include:
- Python version and operating system
- Complete error messages
- Sample transaction IDs (if not sensitive)
- Command line arguments used
### Best Practices
- Test with known transactions before production use
- Validate results against other analytical tools
- Keep transaction ID lists organized and documented
- Regularly update the script for new features
## Disclaimer
This tool is provided for educational and legitimate forensic analysis purposes only. Users are responsible for ensuring compliance with applicable laws and regulations. The probabilistic nature of the analysis means results should be considered as investigative leads rather than definitive conclusions.
Accuracy of change address detection varies based on wallet software, transaction patterns, and other factors. Always corroborate findings with additional analytical techniques and evidence.

View File

@ -1,109 +1,780 @@
import requests import requests
import json import argparse
import sys import sys
import json
import math
from datetime import datetime from datetime import datetime
from collections import Counter, defaultdict
from typing import Dict, List, Optional, Tuple, Any
import time
# --- Configuration --- # --- Configuration ---
# The base URL for your mempool API instance. BASE_URL_V1 = "https://mempool.mikoshi.de/api/v1"
BASE_URL = "https://mempool.mikoshi.de/api/tx/" BASE_URL_DEFAULT = "https://mempool.mikoshi.de/api"
SESSION = requests.Session()
SESSION.headers.update({
'User-Agent': 'Bitcoin-Forensic-Analyzer/2.0'
})
def format_transaction_details(tx_data): # Rate limiting (even though not needed, good practice)
"""Formats the JSON data from the API into a human-readable string.""" REQUEST_DELAY = 0.1
txid = tx_data.get("txid", "N/A") class TransactionAnalyzer:
fee = tx_data.get("fee", 0) def __init__(self):
size = tx_data.get("size", 0) self.mempool_stats = None
weight = tx_data.get("weight", 0) self.fee_recommendations = None
self._load_network_context()
# Status details def _load_network_context(self):
status = tx_data.get("status", {}) """Load current network context for analysis."""
confirmed = status.get("confirmed", False) try:
if confirmed: # Get current mempool state
block_height = status.get("block_height", "N/A") mempool_resp = SESSION.get(f"{BASE_URL_DEFAULT}/mempool", timeout=10)
block_time_unix = status.get("block_time", 0) if mempool_resp.status_code == 200:
block_time_str = datetime.utcfromtimestamp(block_time_unix).strftime('%Y-%m-%d %H:%M:%S UTC') self.mempool_stats = mempool_resp.json()
status_str = f"✅ Confirmed in block {block_height} at {block_time_str}"
# Get current fee recommendations
fee_resp = SESSION.get(f"{BASE_URL_V1}/fees/recommended", timeout=10)
if fee_resp.status_code == 200:
self.fee_recommendations = fee_resp.json()
except requests.RequestException:
print(" - Warning: Could not load network context")
def analyze_round_numbers(self, value_sats: int) -> Tuple[float, str]:
"""Analyze if value appears to be a round number (indicates payment)."""
value_btc = value_sats / 100_000_000
# Very round BTC amounts (strong payment indicators)
round_btc_amounts = [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0]
for amount in round_btc_amounts:
if abs(value_btc - amount) < 0.000001:
return -0.4, f"Exact round amount: {amount} BTC"
# Round satoshi amounts
if value_sats >= 100000 and value_sats % 100000 == 0:
return -0.3, f"Round 100k sats: {value_sats}"
if value_sats >= 10000 and value_sats % 10000 == 0:
return -0.2, f"Round 10k sats: {value_sats}"
if value_sats >= 1000 and value_sats % 1000 == 0:
return -0.1, f"Round 1k sats: {value_sats}"
# Very precise amounts suggest change
value_str = str(value_sats)
if len(value_str) >= 8 and not value_str.endswith('0000'):
return 0.2, f"Precise amount: {value_sats} sats"
return 0.0, "No clear round number pattern"
def analyze_output_values(self, outputs: List[Dict]) -> List[Tuple[float, str]]:
"""Analyze relative value patterns across outputs."""
if len(outputs) < 2:
return [(0.0, "Single output - no relative analysis")] * len(outputs)
values = [out['value'] for out in outputs]
total_value = sum(values)
scores = []
for i, value in enumerate(values):
score = 0.0
reasons = []
percentage = (value / total_value) * 100
# Very small outputs often change (especially in consolidations)
if percentage < 5:
score += 0.15
reasons.append(f"Small output: {percentage:.1f}% of total")
# Very large outputs rarely change
elif percentage > 80:
score -= 0.25
reasons.append(f"Dominant output: {percentage:.1f}% of total")
# Check if smallest output (common change pattern)
if value == min(values) and len(values) > 2:
score += 0.1
reasons.append("Smallest output")
# Check if second smallest (also common for change)
sorted_values = sorted(values)
if len(values) > 2 and value == sorted_values[1]:
score += 0.05
reasons.append("Second smallest output")
reason_text = "; ".join(reasons) if reasons else "No relative value indicators"
scores.append((score, reason_text))
return scores
def analyze_position_patterns(self, outputs: List[Dict], input_count: int) -> List[Tuple[float, str]]:
"""Analyze positional and structural patterns."""
scores = []
output_count = len(outputs)
for i, output in enumerate(outputs):
score = 0.0
reasons = []
# Last position often change (but not always)
if i == output_count - 1 and output_count > 1:
score += 0.1
reasons.append("Last position")
# Two-output transactions - second often change
if output_count == 2 and i == 1:
score += 0.15
reasons.append("Second output in 2-output tx")
# Consolidation pattern (many inputs, few outputs)
if input_count >= 5 and output_count <= 2:
score += 0.2
reasons.append(f"Consolidation pattern: {input_count} inputs → {output_count} outputs")
# Fan-out pattern (few inputs, many outputs) - change usually first or last
if input_count <= 2 and output_count >= 5:
if i == 0 or i == output_count - 1:
score += 0.1
reasons.append("Fan-out pattern: position suggests change")
reason_text = "; ".join(reasons) if reasons else "No positional indicators"
scores.append((score, reason_text))
return scores
def analyze_address_advanced(self, address: str) -> Tuple[float, str]:
"""Enhanced address history analysis."""
if not address or address == "N/A":
return 0.0, "No address to analyze"
try:
time.sleep(REQUEST_DELAY)
# Get comprehensive address info
addr_resp = SESSION.get(f"{BASE_URL_DEFAULT}/address/{address}", timeout=15)
if addr_resp.status_code != 200:
return 0.0, "Could not fetch address info"
addr_info = addr_resp.json()
chain_stats = addr_info.get('chain_stats', {})
mempool_stats = addr_info.get('mempool_stats', {})
tx_count = chain_stats.get('tx_count', 0)
funded_count = chain_stats.get('funded_txo_count', 0)
spent_count = chain_stats.get('spent_txo_count', 0)
# Brand new addresses are very likely change
if tx_count == 0:
return 0.4, "Brand new address (0 transactions)"
if tx_count == 1:
return 0.35, "Address used only once before"
# Get transaction history for pattern analysis
time.sleep(REQUEST_DELAY)
txs_resp = SESSION.get(f"{BASE_URL_DEFAULT}/address/{address}/txs", timeout=15)
if txs_resp.status_code == 200:
txs = txs_resp.json()
# Analyze usage patterns
if len(txs) <= 3:
score = 0.25
reason = f"Low usage: {len(txs)} transactions"
elif len(txs) <= 5:
score = 0.1
reason = f"Moderate usage: {len(txs)} transactions"
else:
# Check if it's an exchange/service address pattern
if len(txs) > 100:
score = -0.3
reason = f"High activity address: {len(txs)} transactions (likely service)"
else:
score = -0.1
reason = f"Regular usage: {len(txs)} transactions"
# Check for rapid reuse (suggests not change)
if len(txs) >= 2:
recent_txs = [tx for tx in txs if tx.get('status', {}).get('confirmed', False)]
if len(recent_txs) >= 2:
time_diff = recent_txs[0].get('status', {}).get('block_time', 0) - recent_txs[1].get('status', {}).get('block_time', 0)
if abs(time_diff) < 3600: # Less than 1 hour between uses
score -= 0.15
reason += "; rapid reuse detected"
return score, reason
# Fallback to basic analysis
if tx_count <= 2:
return 0.25, f"Low transaction count: {tx_count}"
elif tx_count <= 10:
return 0.0, f"Moderate transaction count: {tx_count}"
else:
return -0.2, f"High transaction count: {tx_count}"
except requests.RequestException as e:
return 0.0, f"Network error analyzing address: {str(e)}"
def analyze_script_complexity(self, output: Dict) -> Tuple[float, str]:
"""Analyze script type and complexity."""
script_type = output.get('scriptpubkey_type', 'unknown')
script_hex = output.get('scriptpubkey', '')
# Standard single-sig types are common for change
if script_type in ['p2pkh', 'p2wpkh']:
return 0.1, f"Standard single-sig: {script_type}"
# Wrapped segwit also common for change
if script_type == 'p2sh':
return 0.05, "P2SH (possibly wrapped segwit)"
# Native segwit
if script_type in ['p2wsh', 'p2tr']:
return 0.0, f"Advanced script type: {script_type}"
# Multi-sig and complex scripts less likely to be change
if script_type == 'v0_p2wsh' or 'multisig' in script_type.lower():
return -0.2, f"Complex script: {script_type}"
# OP_RETURN and non-standard outputs definitely not change
if script_type in ['op_return', 'nulldata']:
return -1.0, "OP_RETURN or nulldata output"
return 0.0, f"Unknown script type: {script_type}"
def analyze_fee_context(self, base_data: Dict) -> Tuple[float, str]:
"""Analyze transaction in context of current fee environment."""
if not self.fee_recommendations:
return 0.0, "No fee context available"
fee = base_data.get('fee', 0)
weight = base_data.get('weight', 1)
fee_rate = fee / (weight / 4) if weight > 0 else 0
# Compare to recommended fees
fast_fee = self.fee_recommendations.get('fastestFee', 1)
hour_fee = self.fee_recommendations.get('hourFee', 1)
economy_fee = self.fee_recommendations.get('economyFee', 1)
if fee_rate > fast_fee * 2:
return 0.1, f"High fee rate: {fee_rate:.1f} sat/vB (suggests urgent payment)"
elif fee_rate < economy_fee * 0.5:
return 0.05, f"Very low fee rate: {fee_rate:.1f} sat/vB (suggests batching/consolidation)"
return 0.0, f"Normal fee rate: {fee_rate:.1f} sat/vB"
def calculate_change_probability(self, features: Dict[str, Tuple[float, str]]) -> Tuple[float, Dict[str, Any]]:
"""Calculate change probability using weighted features."""
# Feature weights based on empirical effectiveness
weights = {
'round_number': 1.0, # Strong indicator
'address_history': 0.8, # Very reliable
'relative_value': 0.6, # Good indicator
'position': 0.4, # Moderate indicator
'script_complexity': 0.3, # Weak but useful
'fee_context': 0.2, # Minor indicator
'address_type_reuse': 0.7 # Strong indicator
}
weighted_score = 0.0
total_weight = 0.0
details = {}
for feature_name, (score, reason) in features.items():
if feature_name in weights:
weight = weights[feature_name]
weighted_score += score * weight
total_weight += weight
details[feature_name] = {
'score': score,
'weight': weight,
'contribution': score * weight,
'reason': reason
}
# Normalize score
if total_weight > 0:
normalized_score = weighted_score / total_weight
else:
normalized_score = 0.0
# Convert to probability using sigmoid function
probability = 1 / (1 + math.exp(-normalized_score * 3)) # Scale factor of 3
return probability, details
def analyze_address_type_reuse(self, base_data: Dict, output_index: int) -> Tuple[float, str]:
"""Check if output address type matches input types."""
if not base_data or 'vin' not in base_data or 'vout' not in base_data:
return 0.0, "Insufficient data for address type analysis"
inputs = base_data.get('vin', [])
outputs = base_data.get('vout', [])
if output_index >= len(outputs):
return 0.0, "Invalid output index"
output = outputs[output_index]
output_type = output.get('scriptpubkey_type')
if not output_type:
return 0.0, "No output script type"
# Collect input types
input_types = []
for inp in inputs:
prevout = inp.get('prevout', {})
inp_type = prevout.get('scriptpubkey_type')
if inp_type:
input_types.append(inp_type)
if not input_types:
return 0.0, "No input script types available"
# Find dominant input type
type_counts = Counter(input_types)
dominant_type, dominant_count = type_counts.most_common(1)[0]
if output_type == dominant_type:
reuse_percentage = (dominant_count / len(input_types)) * 100
return 0.3, f"Address type '{output_type}' matches {reuse_percentage:.0f}% of inputs"
else:
return -0.1, f"Address type '{output_type}' differs from dominant input type '{dominant_type}'"
def fetch_comprehensive_details(txid: str, analyzer: TransactionAnalyzer) -> Optional[Dict]:
"""Fetch comprehensive transaction details from multiple endpoints."""
print(f"Fetching comprehensive data for {txid}...")
results = {}
# Primary endpoints
endpoints = {
'base': f"{BASE_URL_DEFAULT}/tx/{txid}",
'outspends': f"{BASE_URL_DEFAULT}/tx/{txid}/outspends",
'rbf': f"{BASE_URL_V1}/tx/{txid}/rbf",
'hex': f"{BASE_URL_DEFAULT}/tx/{txid}/hex",
'status': f"{BASE_URL_DEFAULT}/tx/{txid}/status"
}
for key, url in endpoints.items():
try:
time.sleep(REQUEST_DELAY)
response = SESSION.get(url, timeout=15)
if response.status_code == 200:
results[key] = response.text if key == 'hex' else response.json()
print(f" ✓ Fetched {key}")
else:
results[key] = None
print(f" - {key} not available (Status: {response.status_code})")
except requests.RequestException as e:
print(f" - Network error fetching {key}: {e}")
results[key] = None
if key == 'base':
return None
return results
def perform_comprehensive_change_analysis(base_data: Dict, analyzer: TransactionAnalyzer) -> Optional[List[Dict]]:
"""Perform comprehensive change address analysis using multiple heuristics."""
if not base_data or 'vout' not in base_data:
return None
outputs = base_data.get('vout', [])
inputs = base_data.get('vin', [])
print(f" - Analyzing {len(outputs)} outputs using advanced heuristics...")
analysis_results = []
# Run value analysis once for all outputs
value_scores = analyzer.analyze_output_values(outputs)
position_scores = analyzer.analyze_position_patterns(outputs, len(inputs))
for i, output in enumerate(outputs):
print(f" → Analyzing output {i}")
address = output.get('scriptpubkey_address', 'N/A')
value = output.get('value', 0)
# Skip non-standard outputs
if not address or address == 'N/A':
analysis_results.append({
'index': i,
'address': 'N/A',
'value': value,
'probability': 0.0,
'confidence': 'HIGH',
'reasoning': 'Non-standard output (e.g., OP_RETURN)',
'details': {}
})
continue
# Collect all features
features = {}
# Round number analysis
round_score, round_reason = analyzer.analyze_round_numbers(value)
features['round_number'] = (round_score, round_reason)
# Address history analysis
addr_score, addr_reason = analyzer.analyze_address_advanced(address)
features['address_history'] = (addr_score, addr_reason)
# Relative value analysis
rel_score, rel_reason = value_scores[i]
features['relative_value'] = (rel_score, rel_reason)
# Position analysis
pos_score, pos_reason = position_scores[i]
features['position'] = (pos_score, pos_reason)
# Script complexity analysis
script_score, script_reason = analyzer.analyze_script_complexity(output)
features['script_complexity'] = (script_score, script_reason)
# Fee context analysis
fee_score, fee_reason = analyzer.analyze_fee_context(base_data)
features['fee_context'] = (fee_score, fee_reason)
# Address type reuse analysis
type_score, type_reason = analyzer.analyze_address_type_reuse(base_data, i)
features['address_type_reuse'] = (type_score, type_reason)
# Calculate final probability
probability, feature_details = analyzer.calculate_change_probability(features)
# Determine confidence level
confidence = "MEDIUM"
if probability > 0.8 or probability < 0.2:
confidence = "HIGH"
elif probability > 0.6 or probability < 0.4:
confidence = "MEDIUM"
else:
confidence = "LOW"
analysis_results.append({
'index': i,
'address': address,
'value': value,
'probability': probability,
'confidence': confidence,
'reasoning': f"Change probability: {probability:.1%}",
'details': feature_details
})
return analysis_results
def format_comprehensive_report(all_data: Dict, change_analysis: Optional[List[Dict]], analyzer: TransactionAnalyzer) -> str:
"""Format comprehensive forensic report."""
base_data = all_data.get('base')
if not base_data:
return "Could not retrieve base transaction data.\n\n"
txid = base_data.get("txid", "N/A")
fee = base_data.get("fee", 0)
weight = base_data.get("weight", 1)
size = base_data.get("size", 0)
output = ["=" * 100, f"COMPREHENSIVE FORENSIC ANALYSIS: {txid}", "=" * 100]
# Transaction Overview
status = base_data.get("status", {})
if status.get("confirmed", False):
block_time = datetime.utcfromtimestamp(status.get("block_time", 0)).strftime('%Y-%m-%d %H:%M:%S UTC')
status_str = f"Confirmed in block {status.get('block_height', 'N/A')} at {block_time}"
else: else:
status_str = "⌛ Unconfirmed" status_str = "Unconfirmed (in mempool)"
# --- Start building the output string --- fee_rate = fee / (weight / 4) if weight > 0 else 0
output = []
output.append("=" * 80)
output.append(f"TRANSACTION DETAILS FOR: {txid}")
output.append("=" * 80)
# General Info output.extend([
output.append("\n--- General ---") "\n" + "" * 50 + " TRANSACTION OVERVIEW " + "" * 50,
output.append(f"Status: {status_str}") f"Status: {status_str}",
output.append(f"Fee: {fee} sats") f"Fee: {fee:,} sats ({fee/100000000:.8f} BTC)",
output.append(f"Size: {size} bytes") f"Size: {size:,} bytes | Weight: {weight:,} vB | Fee Rate: {fee_rate:.2f} sat/vB",
output.append(f"Weight: {weight} vB") f"Version: {base_data.get('version', 'N/A')} | Locktime: {base_data.get('locktime', 'N/A')}"
output.append(f"Fee Rate: {fee/(weight/4):.2f} sat/vB") ])
# Network Context
if analyzer.fee_recommendations:
output.append(f"\nCurrent Network Fees - Fast: {analyzer.fee_recommendations.get('fastestFee', 'N/A')} | "
f"Hour: {analyzer.fee_recommendations.get('hourFee', 'N/A')} | "
f"Economy: {analyzer.fee_recommendations.get('economyFee', 'N/A')} sat/vB")
# Input Analysis
vin = base_data.get("vin", [])
output.append("\n" + "" * 50 + f" INPUTS ({len(vin)}) " + "" * 50)
# Inputs (vin)
vin = tx_data.get("vin", [])
output.append(f"\n--- Inputs ({len(vin)}) ---")
if not vin: if not vin:
output.append("No inputs found (likely a coinbase transaction).") output.append("No inputs found (coinbase transaction)")
for i, an_input in enumerate(vin, 1): else:
prev_txid = an_input.get("txid", "N/A") total_input_value = 0
prev_vout = an_input.get("vout", "N/A") for i, inp in enumerate(vin, 1):
prevout = an_input.get("prevout", {}) prevout = inp.get("prevout", {})
value = prevout.get("value", "N/A") value = prevout.get('value', 0)
address = prevout.get("scriptpubkey_address", "Address not available") total_input_value += value
output.append(f" {i}. From TX: {prev_txid[:20]}... | Output Index: {prev_vout}") script_type = prevout.get('scriptpubkey_type', 'unknown')
output.append(f" Value: {value} sats | Spent by: {address}")
# Outputs (vout) output.append(f" {i}. TXID: {inp.get('txid', 'N/A')[:16]}...")
vout = tx_data.get("vout", []) output.append(f" Value: {value:,} sats | Address: {prevout.get('scriptpubkey_address', 'N/A')}")
output.append(f"\n--- Outputs ({len(vout)}) ---") output.append(f" Script Type: {script_type}")
for i, an_output in enumerate(vout, 1):
value = an_output.get("value", "N/A") # Output Analysis
address = an_output.get("scriptpubkey_address", "Address not available") vout = base_data.get("vout", [])
output.append(f" {i}. Value: {value} sats | To Address: {address}") output.append("\n" + "" * 50 + f" OUTPUTS ({len(vout)}) " + "" * 50)
total_output_value = 0
for i, out in enumerate(vout, 1):
value = out.get('value', 0)
total_output_value += value
script_type = out.get('scriptpubkey_type', 'unknown')
output.append(f" {i}. Value: {value:,} sats ({value/100000000:.8f} BTC)")
output.append(f" Address: {out.get('scriptpubkey_address', 'N/A')}")
output.append(f" Script Type: {script_type}")
# Fee verification
calculated_fee = total_input_value - total_output_value
output.append(f"\nFee Verification: Calculated={calculated_fee:,} sats | Reported={fee:,} sats")
if abs(calculated_fee - fee) > 1:
output.append("⚠️ WARNING: Fee mismatch detected!")
# Change Address Analysis
output.append("\n" + "=" * 100)
output.append("ADVANCED CHANGE ADDRESS ANALYSIS")
output.append("=" * 100)
output.append("\nMethodology: Multi-heuristic probabilistic model analyzing:")
output.append("• Round number patterns (payments often use round amounts)")
output.append("• Address reuse and history (change addresses often new)")
output.append("• Relative output values (change often smaller/different)")
output.append("• Positional patterns (change position varies by wallet)")
output.append("• Script type consistency (wallets reuse address types)")
output.append("• Transaction context (fee rates, timing, structure)")
if change_analysis:
# Sort by probability for easy identification
sorted_analysis = sorted(change_analysis, key=lambda x: x['probability'], reverse=True)
output.append("\n" + "" * 80 + " RESULTS " + "" * 80)
for result in sorted_analysis:
prob = result['probability']
conf = result['confidence']
# Determine label
if prob > 0.7:
label = "🟢 LIKELY CHANGE"
elif prob > 0.5:
label = "🟡 POSSIBLE CHANGE"
elif prob < 0.3:
label = "🔴 LIKELY PAYMENT"
else:
label = "⚪ UNCERTAIN"
output.append(f"\nOutput {result['index']} - {result['address'][:20]}{'...' if len(result['address']) > 20 else ''}")
output.append(f"Value: {result['value']:,} sats | Probability: {prob:.1%} | Confidence: {conf}")
output.append(f"Assessment: {label}")
output.append("Detailed Analysis:")
for feature, details in result['details'].items():
score = details['score']
contribution = details['contribution']
reason = details['reason']
indicator = "+" if score > 0 else "-" if score < 0 else "="
output.append(f" {indicator} {feature.replace('_', ' ').title()}: {reason}")
output.append(f" Score: {score:+.2f} | Weight: {details['weight']:.1f} | Contribution: {contribution:+.2f}")
# Summary
most_likely_change = max(sorted_analysis, key=lambda x: x['probability'])
if most_likely_change['probability'] > 0.5:
output.append("\n" + "" * 80 + " SUMMARY " + "" * 80)
output.append(f"Most Likely Change: Output {most_likely_change['index']} "
f"({most_likely_change['probability']:.1%} probability)")
output.append(f"Address: {most_likely_change['address']}")
output.append(f"Value: {most_likely_change['value']:,} sats")
else:
output.append("\n" + "" * 80 + " SUMMARY " + "" * 80)
output.append("⚠️ No clear change address identified - all outputs show low change probability")
output.append("This may indicate: multiple payments, exchange transaction, or privacy technique")
else:
output.append("\n❌ Could not perform change address analysis due to insufficient data")
# Spending Status Analysis
outspends_data = all_data.get('outspends')
output.append("\n" + "" * 50 + " SPENDING STATUS " + "" * 50)
if outspends_data:
for i, spend_info in enumerate(outspends_data):
if spend_info and spend_info.get('spent'):
spend_txid = spend_info.get('txid', 'N/A')
spend_vin = spend_info.get('vin', 'N/A')
spend_status = spend_info.get('status', {})
if spend_status.get('confirmed'):
spend_height = spend_status.get('block_height', 'N/A')
output.append(f" Output {i}: ✅ Spent in TX {spend_txid} (input {spend_vin}) at block {spend_height}")
else:
output.append(f" Output {i}: 🟡 Spent in unconfirmed TX {spend_txid}")
else:
output.append(f" Output {i}: 💰 Unspent (UTXO)")
else:
output.append("Could not retrieve spending status information")
# RBF Analysis
rbf_data = all_data.get('rbf')
output.append("\n" + "" * 50 + " RBF HISTORY " + "" * 50)
if rbf_data:
replacements = rbf_data.get('replacements')
replaces = rbf_data.get('replaces', [])
if replaces:
output.append("🔄 This transaction REPLACED the following:")
for replaced_txid in replaces:
output.append(f"{replaced_txid}")
if replacements:
output.append("🔄 This transaction was REPLACED by:")
output.append(f"{replacements.get('tx', {}).get('txid', 'N/A')}")
if not replaces and not replacements:
output.append("No RBF activity detected")
else:
output.append("No RBF history available")
# Raw Transaction Data
hex_data = all_data.get('hex')
output.append("\n" + "" * 50 + " RAW TRANSACTION " + "" * 50)
if hex_data:
output.append(f"Raw Hex ({len(hex_data)} characters):")
output.append(hex_data)
else:
output.append("Raw hex data not available")
output.append("\n" + "=" * 100)
output.append(f"Report generated at: {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC")
output.append("=" * 100 + "\n\n")
output.append("\n" * 2) # Add space before the next transaction
return "\n".join(output) return "\n".join(output)
def main(): def main():
"""Main function to run the script.""" """Main function with enhanced argument parsing and execution."""
if len(sys.argv) != 3: parser = argparse.ArgumentParser(
print("Usage: python fetch_tx_details.py <input_file.txt> <output_file.txt>") description="Advanced Bitcoin Transaction Forensic Analyzer v2.0",
sys.exit(1) formatter_class=argparse.RawTextHelpFormatter,
epilog="""
Features:
Multi-heuristic change address detection
Comprehensive transaction analysis
Network context awareness
Probabilistic scoring system
Detailed forensic reporting
input_filename = sys.argv[1] Example:
output_filename = sys.argv[2] python btc_forensic.py transactions.txt report.txt
"""
)
parser.add_argument("input_file",
help="Path to input file containing transaction IDs (one per line)")
parser.add_argument("output_file",
help="Path to output file for the forensic report")
parser.add_argument("--verbose", "-v", action="store_true",
help="Enable verbose output")
parser.add_argument("--delay", "-d", type=float, default=0.1,
help="Delay between API requests in seconds (default: 0.1)")
args = parser.parse_args()
# Update delay if specified
global REQUEST_DELAY
REQUEST_DELAY = args.delay
# Load transaction IDs
try: try:
with open(input_filename, 'r') as f: with open(args.input_file, 'r', encoding='utf-8') as f:
txids = [line.strip() for line in f if line.strip()] txids = [line.strip() for line in f if line.strip() and not line.startswith('#')]
except FileNotFoundError: except FileNotFoundError:
print(f"Error: Input file '{input_filename}' not found.") print(f"❌ Error: Input file '{args.input_file}' not found.")
sys.exit(1)
except Exception as e:
print(f"❌ Error reading input file: {e}")
sys.exit(1) sys.exit(1)
print(f"Found {len(txids)} transaction IDs. Fetching details...") if not txids:
print("❌ Error: No valid transaction IDs found in input file.")
sys.exit(1)
print(f"🔍 Starting comprehensive forensic analysis of {len(txids)} transactions...")
print("📊 Using enhanced heuristics with probabilistic scoring")
print(f"🌐 Target API: {BASE_URL_DEFAULT}")
# Initialize analyzer
analyzer = TransactionAnalyzer()
# Process transactions
with open(args.output_file, 'w', encoding='utf-8') as out_file:
successful = 0
failed = 0
for i, txid in enumerate(txids, 1):
print(f"\n[{i}/{len(txids)}] Processing: {txid}")
print("-" * 80)
with open(output_filename, 'w') as out_file:
for i, txid in enumerate(txids):
print(f"({i+1}/{len(txids)}) Fetching {txid}...")
try: try:
response = requests.get(f"{BASE_URL}{txid}", timeout=10) # Fetch comprehensive data
# Check if the request was successful all_details = fetch_comprehensive_details(txid, analyzer)
if response.status_code == 200:
data = response.json()
formatted_data = format_transaction_details(data)
out_file.write(formatted_data)
else:
error_message = f"Failed to fetch {txid}. Status Code: {response.status_code}. Response: {response.text}\n\n"
print(error_message)
out_file.write(error_message)
except requests.RequestException as e:
error_message = f"An error occurred while fetching {txid}: {e}\n\n"
print(error_message)
out_file.write(error_message)
print(f"\n✅ Done! All details have been saved to '{output_filename}'.") if all_details and all_details.get('base'):
# Perform advanced analysis
change_analysis = perform_comprehensive_change_analysis(
all_details['base'], analyzer
)
# Generate report
formatted_report = format_comprehensive_report(
all_details, change_analysis, analyzer
)
out_file.write(formatted_report)
out_file.flush() # Ensure data is written
successful += 1
print("✅ Analysis completed successfully")
else:
error_msg = f"❌ Failed to fetch critical data for {txid}\n\n"
out_file.write(error_msg)
failed += 1
print("❌ Failed to fetch critical data")
except Exception as e:
error_msg = f"❌ Error processing {txid}: {str(e)}\n\n"
out_file.write(error_msg)
failed += 1
print(f"❌ Error: {str(e)}")
# Final summary
print("\n" + "=" * 80)
print("📋 ANALYSIS SUMMARY")
print("=" * 80)
print(f"✅ Successfully analyzed: {successful} transactions")
print(f"❌ Failed to analyze: {failed} transactions")
print(f"📄 Comprehensive report saved to: {args.output_file}")
print(f"🕒 Total processing time: {datetime.utcnow().strftime('%H:%M:%S')} UTC")
if successful > 0:
print("\n🎯 Advanced forensic analysis complete!")
print(" Report includes probabilistic change detection,")
print(" comprehensive transaction analysis, and detailed")
print(" heuristic breakdowns for enhanced investigation.")
if __name__ == "__main__": if __name__ == "__main__":