Files
ollama-utils/scripts/hf-llm-install.py
2026-01-19 14:12:20 +01:00

639 lines
21 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
HuggingFace LLM Installer for Ollama
Automatically downloads GGUF files from HuggingFace and creates Ollama models.
Features:
- SHA256 checksum verification
- Disk space checking
- Dry run mode
- Parallel processing
- Skip existing models
"""
import argparse
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from urllib.parse import urlparse
import urllib.request
def parse_model_name_from_gguf(gguf_filename):
"""
Parse model name and tag from GGUF filename.
Args:
gguf_filename: Name of the GGUF file
Returns:
Tuple of (model_base, tag, full_name) or (filename, 'latest', filename) if parsing fails
"""
filename_stem = Path(gguf_filename).stem.lower()
# Split on hyphens
parts = filename_stem.split('-')
if len(parts) >= 3:
# Find where the size variant starts (e.g., "0.5b", "3b", "8b", "14b")
base_parts = []
tag_parts = []
found_variant = False
for part in parts:
# Check if this looks like a size variant (e.g., "3b", "8b", "0.5b")
if not found_variant and re.match(r'^\d+(\.\d+)?b$', part):
found_variant = True
tag_parts.append(part)
elif found_variant:
# Include everything after the variant (including quantization)
tag_parts.append(part)
else:
# Before the variant = base name
base_parts.append(part)
if base_parts and tag_parts:
model_base = '-'.join(base_parts)
model_tag = '-'.join(tag_parts)
full_name = f"{model_base}:{model_tag}"
return (model_base, model_tag, full_name)
# Fallback to filename without extension
return (filename_stem, 'latest', filename_stem)
def parse_modelfile(modelfile_path):
"""
Parse a Modelfile to extract HuggingFace upstream URL and model info.
Args:
modelfile_path: Path to the .Modelfile
Returns:
dict with model metadata or None if invalid
"""
with open(modelfile_path, 'r') as f:
content = f.read()
# Look for hf_upstream in the header comments
hf_match = re.search(r'#\s*hf_upstream:\s*(https://huggingface\.co/[^\s]+)', content)
if not hf_match:
return None
hf_url = hf_match.group(1)
# Look for optional quantization specification (default: q4_k_m)
quant_match = re.search(r'#\s*quantization:\s*([a-zA-Z0-9_]+)', content)
quantization = quant_match.group(1).upper() if quant_match else 'Q4_K_M'
# Look for optional SHA256 checksum
sha256_match = re.search(r'#\s*sha256:\s*([a-fA-F0-9]{64})', content)
sha256 = sha256_match.group(1) if sha256_match else None
# Look for optional capabilities (comma-separated list)
# Format: # capabilities: tools, vision
capabilities_match = re.search(r'#\s*capabilities:\s*([^\n]+)', content)
capabilities = None
if capabilities_match:
# Parse comma-separated capabilities and clean whitespace
caps_str = capabilities_match.group(1).strip()
capabilities = [cap.strip() for cap in caps_str.split(',') if cap.strip()]
# Check if URL points to a specific GGUF file or just the repo
if hf_url.endswith('.gguf') or '/blob/' in hf_url or '/resolve/' in hf_url:
# Specific file provided - use as-is
resolve_url = hf_url.replace('/blob/', '/resolve/')
gguf_filename = os.path.basename(urlparse(resolve_url).path)
else:
# Repository root provided - construct filename from repo name and quantization
# URL format: https://huggingface.co/{org}/{repo}
url_parts = urlparse(hf_url).path.strip('/').split('/')
if len(url_parts) >= 2:
repo_name = url_parts[1] # e.g., "Ministral-3-3B-Instruct-2512-GGUF"
# Remove -GGUF suffix if present (case-insensitive)
if repo_name.upper().endswith('-GGUF'):
repo_name = repo_name[:-5]
# Construct filename: RepoName-Quantization.gguf
gguf_filename = f"{repo_name}-{quantization}.gguf"
resolve_url = f"{hf_url.rstrip('/')}/resolve/main/{gguf_filename}"
else:
print(f"✗ Invalid HuggingFace URL format: {hf_url}")
return None
# Extract model name and tag from the GGUF filename
# Format: Model-Version-Variant-Year-Quant.gguf -> model:version-variant-year-quant
# Example: Ministral-3-3B-Instruct-2512-Q5_K_M.gguf -> ministral-3:3b-instruct-2512-q5_k_m
model_base, model_tag, model_name = parse_model_name_from_gguf(gguf_filename)
return {
'hf_url': hf_url,
'resolve_url': resolve_url,
'gguf_filename': gguf_filename,
'model_name': model_name,
'modelfile_path': modelfile_path,
'sha256': sha256,
'capabilities': capabilities
}
def get_file_size(url):
"""
Get the size of a file from URL without downloading it.
Args:
url: File URL
Returns:
Size in bytes or None if unavailable
"""
try:
req = urllib.request.Request(url, method='HEAD')
with urllib.request.urlopen(req, timeout=10) as response:
size = response.headers.get('Content-Length')
return int(size) if size else None
except Exception:
return None
def check_disk_space(required_bytes, path='.'):
"""
Check if there's enough disk space available.
Args:
required_bytes: Required space in bytes
path: Path to check space on (default: current directory)
Returns:
Tuple of (has_space, available_bytes, required_bytes)
"""
# Get absolute path to check actual filesystem
abs_path = os.path.abspath(path)
stat = shutil.disk_usage(abs_path)
# Add 10% safety margin
required_with_margin = int(required_bytes * 1.1)
return (stat.free >= required_with_margin, stat.free, required_with_margin)
def calculate_sha256(filepath, chunk_size=8192):
"""
Calculate SHA256 checksum of a file.
Args:
filepath: Path to file
chunk_size: Bytes to read at once
Returns:
SHA256 hex digest
"""
sha256_hash = hashlib.sha256()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(chunk_size), b''):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
def verify_checksum(filepath, expected_sha256):
"""
Verify file checksum matches expected value.
Args:
filepath: Path to file
expected_sha256: Expected SHA256 hash
Returns:
True if match, False otherwise
"""
print(f" Verifying checksum...")
actual = calculate_sha256(filepath)
if actual.lower() == expected_sha256.lower():
print(f" ✓ Checksum verified: {actual[:16]}...")
return True
else:
print(f" ✗ Checksum mismatch!")
print(f" Expected: {expected_sha256}")
print(f" Actual: {actual}")
return False
def get_existing_models():
"""
Get list of existing Ollama models.
Returns:
Set of model names
"""
try:
result = subprocess.run(
['ollama', 'list'],
capture_output=True,
text=True,
check=True
)
# Parse output to get model names
# Format: NAME ID SIZE MODIFIED
models = set()
for line in result.stdout.strip().split('\n')[1:]: # Skip header
if line.strip():
# Get first column (name)
name = line.split()[0]
# Remove tag if present
base_name = name.split(':')[0]
models.add(base_name)
return models
except (subprocess.CalledProcessError, FileNotFoundError):
return set()
def download_file(url, dest_path, filename, should_cancel=None, progress_callback=None):
"""
Download a file from URL to destination with progress indication.
Args:
url: Source URL
dest_path: Destination file path
filename: Name for display purposes
should_cancel: Optional callback function that returns True if download should be cancelled
progress_callback: Optional callback function to report progress messages
"""
def log(msg):
"""Helper to print and optionally call progress callback."""
print(msg)
if progress_callback:
progress_callback(msg)
log(f"Downloading {filename}...")
log(f" From: {url}")
log(f" To: {dest_path}")
def show_progress(block_num, block_size, total_size):
# Check for cancellation
if should_cancel and should_cancel():
raise InterruptedError("Download cancelled")
downloaded = block_num * block_size
if total_size > 0:
percent = min(100, downloaded * 100 / total_size)
mb_downloaded = downloaded / (1024 * 1024)
mb_total = total_size / (1024 * 1024)
msg = f"\r Progress: {percent:.1f}% ({mb_downloaded:.1f}/{mb_total:.1f} MB)"
print(msg, end='')
if progress_callback:
progress_callback(f"Progress: {percent:.1f}% ({mb_downloaded:.1f}/{mb_total:.1f} MB)")
try:
urllib.request.urlretrieve(url, dest_path, show_progress)
print() # New line after progress
log(f"✓ Download complete")
except Exception as e:
print(f"\n✗ Download failed: {e}")
if progress_callback:
progress_callback(f"✗ Download failed: {e}")
raise
def create_ollama_model(modelfile_path, gguf_path, model_name, capabilities=None):
"""
Create an Ollama model from the Modelfile and GGUF file.
Args:
modelfile_path: Path to the .Modelfile
gguf_path: Path to the downloaded GGUF file
model_name: Name for the Ollama model
capabilities: Optional list of capabilities to add (e.g., ['tools', 'vision'])
"""
print(f"\nCreating Ollama model: {model_name}")
# Note: Capabilities are detected from the GGUF file metadata by Ollama automatically
if capabilities:
print(f" Expected capabilities from GGUF metadata: {', '.join(capabilities)}")
# Read the Modelfile and update the FROM path to point to the downloaded GGUF
with open(modelfile_path, 'r') as f:
modelfile_content = f.read()
# Replace the FROM line to use the actual GGUF path
# Handle both relative paths like "./filename.gguf" and URLs like "https://..."
original_content = modelfile_content
modelfile_content = re.sub(
r'FROM\s+(?:\./[^\s]+\.gguf|https?://[^\n]+)',
f'FROM {gguf_path}',
modelfile_content
)
# Debug: check if replacement happened
if original_content == modelfile_content:
print(f" WARNING: FROM line was not replaced!")
print(f" Looking for pattern in: {original_content[:200]}")
else:
print(f" ✓ Replaced FROM line with local path: {gguf_path}")
# Create a temporary Modelfile with the correct path
with tempfile.NamedTemporaryFile(mode='w', suffix='.Modelfile', delete=False) as tmp_modelfile:
tmp_modelfile.write(modelfile_content)
tmp_modelfile_path = tmp_modelfile.name
try:
# Run ollama create
cmd = ['ollama', 'create', model_name, '-f', tmp_modelfile_path]
print(f" Running: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True
)
if result.returncode == 0:
# Success - output will be shown by the caller
if result.stdout:
print(result.stdout.strip())
else:
print(f"✗ Failed to create model")
if result.stderr:
print(f" Error: {result.stderr.strip()}")
raise subprocess.CalledProcessError(result.returncode, cmd)
finally:
# Clean up temporary Modelfile
os.unlink(tmp_modelfile_path)
def install_model(modelfile_path, dry_run=False, skip_existing=False, existing_models=None, should_cancel=None, progress_callback=None):
"""
Install a single model from a Modelfile.
Args:
modelfile_path: Path to the .Modelfile
dry_run: If True, only simulate installation
skip_existing: If True, skip models already in Ollama
existing_models: Set of existing model names
should_cancel: Optional callback function that returns True if installation should be cancelled
progress_callback: Optional callback function to report progress messages
Returns:
Tuple of (success: bool, skipped: bool, model_name: str)
"""
def log(msg):
"""Helper to print and optionally call progress callback."""
print(msg)
if progress_callback:
progress_callback(msg)
log(f"\n{'='*80}")
log(f"Processing: {modelfile_path}")
log(f"{'='*80}")
# Parse the Modelfile
model_info = parse_modelfile(modelfile_path)
if not model_info:
log(f"✗ No hf_upstream found in {modelfile_path}")
return (False, False, None)
log(f"Model name: {model_info['model_name']}")
log(f"GGUF file: {model_info['gguf_filename']}")
if model_info['sha256']:
log(f"SHA256: {model_info['sha256'][:16]}...")
if model_info.get('capabilities'):
log(f"Capabilities: {', '.join(model_info['capabilities'])}")
# Check if model already exists
if skip_existing and existing_models and model_info['model_name'] in existing_models:
log(f"⊘ Model '{model_info['model_name']}' already exists, skipping")
return (True, True, model_info['model_name'])
# Get file size and check disk space
file_size = get_file_size(model_info['resolve_url'])
if file_size:
size_gb = file_size / (1024**3)
log(f"File size: {size_gb:.2f} GB")
if not dry_run:
has_space, available, required = check_disk_space(file_size)
if not has_space:
log(f"✗ Insufficient disk space!")
log(f" Required: {required / (1024**3):.2f} GB (with 10% margin)")
log(f" Available: {available / (1024**3):.2f} GB")
return (False, False, model_info['model_name'])
else:
log(f"✓ Disk space check passed ({available / (1024**3):.2f} GB available)")
if dry_run:
log(f"\n[DRY RUN] Would download and install model: {model_info['model_name']}")
return (True, False, model_info['model_name'])
# Create temporary directory for download
with tempfile.TemporaryDirectory() as tmp_dir:
gguf_path = os.path.join(tmp_dir, model_info['gguf_filename'])
try:
# Download the GGUF file
download_file(model_info['resolve_url'], gguf_path, model_info['gguf_filename'], should_cancel, progress_callback)
# Verify checksum if provided
if model_info['sha256']:
if not verify_checksum(gguf_path, model_info['sha256']):
print(f"✗ Checksum verification failed!")
return (False, False, model_info['model_name'])
# Create the Ollama model
create_ollama_model(
modelfile_path,
gguf_path,
model_info['model_name'],
model_info.get('capabilities')
)
print(f"\n✓ Successfully installed model: {model_info['model_name']}")
return (True, False, model_info['model_name'])
except Exception as e:
print(f"\n✗ Failed to install model: {e}")
return (False, False, model_info['model_name'])
def install_model_wrapper(args):
"""Wrapper for parallel execution."""
return install_model(*args)
def main():
parser = argparse.ArgumentParser(
description='Install Ollama models from HuggingFace using Modelfiles',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Install a single model
%(prog)s path/to/model.Modelfile
# Install all models in the default repo directory
%(prog)s
# Dry run to see what would be installed
%(prog)s --dry-run
# Skip models that already exist
%(prog)s --skip-existing
# Install with 3 parallel downloads
%(prog)s --parallel 3
"""
)
parser.add_argument(
'modelfile',
nargs='?',
help='Path to a specific .Modelfile to install (optional)'
)
parser.add_argument(
'--dir',
default='./modelfile-repo',
help='Directory containing .Modelfile files (default: ./modelfile-repo)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Simulate installation without downloading or creating models'
)
parser.add_argument(
'--skip-existing',
action='store_true',
help='Skip models that already exist in Ollama'
)
parser.add_argument(
'--parallel',
type=int,
default=1,
metavar='N',
help='Number of parallel downloads/installations (default: 1)'
)
args = parser.parse_args()
# Validate parallel argument
if args.parallel < 1:
print("✗ Error: --parallel must be at least 1")
sys.exit(1)
# Check if ollama is available
try:
subprocess.run(['ollama', '--version'], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("✗ Error: 'ollama' command not found. Please install Ollama first.")
print(" Visit: https://ollama.ai")
sys.exit(1)
# Get existing models if skip_existing is enabled
existing_models = None
if args.skip_existing:
existing_models = get_existing_models()
if existing_models:
print(f"Found {len(existing_models)} existing model(s)")
# Determine which Modelfiles to process
if args.modelfile:
# Single file mode
modelfile_path = Path(args.modelfile)
if not modelfile_path.exists():
print(f"✗ Error: File not found: {modelfile_path}")
sys.exit(1)
if not modelfile_path.suffix == '.Modelfile':
print(f"✗ Error: File must have .Modelfile extension")
sys.exit(1)
modelfiles = [modelfile_path]
else:
# Batch mode - process all .Modelfile files in directory
modelfile_dir = Path(args.dir)
if not modelfile_dir.exists():
print(f"✗ Error: Directory not found: {modelfile_dir}")
sys.exit(1)
modelfiles = sorted(modelfile_dir.glob('*.Modelfile'))
if not modelfiles:
print(f"✗ No .Modelfile files found in {modelfile_dir}")
sys.exit(1)
print(f"Found {len(modelfiles)} Modelfile(s) to process")
if args.dry_run:
print("\n*** DRY RUN MODE - No files will be downloaded or models created ***\n")
# Process all Modelfiles
results = []
if args.parallel > 1 and len(modelfiles) > 1:
# Parallel processing
print(f"\nUsing {args.parallel} parallel worker(s)")
with ThreadPoolExecutor(max_workers=args.parallel) as executor:
# Submit all tasks
future_to_modelfile = {
executor.submit(
install_model_wrapper,
(modelfile, args.dry_run, args.skip_existing, existing_models)
): modelfile
for modelfile in modelfiles
}
# Collect results as they complete
for future in as_completed(future_to_modelfile):
modelfile = future_to_modelfile[future]
try:
success, skipped, model_name = future.result()
results.append((modelfile.name, success, skipped))
except Exception as e:
print(f"\n✗ Exception processing {modelfile.name}: {e}")
results.append((modelfile.name, False, False))
else:
# Sequential processing
for modelfile in modelfiles:
success, skipped, model_name = install_model(
modelfile,
args.dry_run,
args.skip_existing,
existing_models
)
results.append((modelfile.name, success, skipped))
# Summary
print(f"\n{'='*80}")
print("INSTALLATION SUMMARY")
print(f"{'='*80}")
successful = sum(1 for _, success, skipped in results if success and not skipped)
skipped = sum(1 for _, success, skip in results if skip)
failed = len(results) - successful - skipped
for name, success, skip in results:
if skip:
status = ""
elif success:
status = ""
else:
status = ""
print(f"{status} {name}")
print(f"\nTotal: {len(results)} | Successful: {successful} | Skipped: {skipped} | Failed: {failed}")
if failed > 0:
sys.exit(1)
if __name__ == '__main__':
main()