dumper / torrent_client.py
Fred808's picture
Update torrent_client.py
e8c3dd5 verified
Raw
History Blame Contribute Delete
7.06 kB
import os
import time
import json
import requests
from huggingface_hub import upload_file
# === CONFIGURATION ===
HF_TOKEN = os.environ.get("HF_TOKEN")
REPO_ID = "Fred808/BG1"
DATA_PATH = "AEffects"
PROCESSED_FILE = "processed.json"
TORRENT_SERVICE_URL = "https://fred808-dumper.hf.space" # Change this to your deployed service URL
# Placeholder for magnet links - these will replace the original VIDEO_URLS
MAGNET_LINKS = [
"magnet:?xt=urn:btih:08ada5a7a6183aae1e09d831df6748d566095a10&dn=Sintel&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80%2Fannounce&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337%2Fannounce&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969%2Fannounce"
]
# === Load processed videos ===
if os.path.exists(PROCESSED_FILE):
with open(PROCESSED_FILE, "r") as f:
processed_urls = set(json.load(f))
else:
processed_urls = set()
def save_processed():
with open(PROCESSED_FILE, "w") as f:
json.dump(list(processed_urls), f, indent=2)
def upload_to_dataset(filepath, filename):
try:
upload_file(
path_or_fileobj=filepath,
path_in_repo=f"{DATA_PATH}/{filename}",
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
print(f"[↑] Uploaded: {filename}")
return True
except Exception as e:
print(f"[!] Upload failed: {filename}{e}")
return False
def start_torrent_download(magnet_link):
"""Start a torrent download on the remote service"""
try:
response = requests.post(
f"{TORRENT_SERVICE_URL}/api/torrent/download",
json={"magnet_link": magnet_link},
timeout=30
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"[!] Error starting download: {e}")
return None
def check_download_status(download_id):
"""Check the status of a download"""
try:
response = requests.get(
f"{TORRENT_SERVICE_URL}/api/torrent/status/{download_id}",
timeout=30
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"[!] Error checking status: {e}")
return None
def get_download_files(download_id):
"""Get list of downloaded files"""
try:
response = requests.get(
f"{TORRENT_SERVICE_URL}/api/torrent/files/{download_id}",
timeout=30
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"[!] Error getting files: {e}")
return None
def download_and_upload_file(download_id, filename):
"""Download a file from the service and upload to HF dataset"""
try:
# Download file from service
response = requests.get(
f"{TORRENT_SERVICE_URL}/api/torrent/download-file/{download_id}/{filename}",
stream=True,
timeout=300 # 5 minute timeout for large files
)
response.raise_for_status()
# Save temporarily and upload
temp_path = f"/tmp/{filename}"
with open(temp_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Upload to HF dataset
success = upload_to_dataset(temp_path, filename)
# Clean up temp file
if os.path.exists(temp_path):
os.remove(temp_path)
return success
except Exception as e:
print(f"[!] Error downloading/uploading {filename}: {e}")
return False
def cleanup_download(download_id):
"""Clean up a download on the remote service"""
try:
response = requests.delete(
f"{TORRENT_SERVICE_URL}/api/torrent/cleanup/{download_id}",
timeout=30
)
response.raise_for_status()
print(f"[✓] Cleaned up download {download_id}")
except Exception as e:
print(f"[!] Error cleaning up download {download_id}: {e}")
def process_magnet_link(magnet_link):
"""Process a single magnet link"""
print(f"[*] Starting download for: {magnet_link}")
# Start download
result = start_torrent_download(magnet_link)
if not result:
return False
download_id = result.get("download_id")
if not download_id:
print("[!] No download ID received")
return False
print(f"[+] Download started with ID: {download_id}")
# Wait for completion
max_wait_time = 1800 # 30 minutes max
start_time = time.time()
while time.time() - start_time < max_wait_time:
status = check_download_status(download_id)
if not status:
time.sleep(10)
continue
current_status = status.get("status")
print(f"[~] Status: {current_status}")
if current_status == "completed":
break
elif current_status == "error":
print(f"[!] Download failed: {status.get('message', 'Unknown error')}")
cleanup_download(download_id)
return False
time.sleep(10) # Check every 10 seconds
else:
print("[!] Download timed out")
cleanup_download(download_id)
return False
# Get files and upload them
files_info = get_download_files(download_id)
if not files_info:
cleanup_download(download_id)
return False
files = files_info.get("files", [])
if not files:
print("[!] No files found")
cleanup_download(download_id)
return False
print(f"[+] Found {len(files)} files")
# Download and upload each file
success_count = 0
for file_info in files:
filename = file_info["filename"]
print(f"[*] Processing file: {filename}")
if download_and_upload_file(download_id, filename):
success_count += 1
# Clean up remote download
cleanup_download(download_id)
print(f"[✓] Successfully uploaded {success_count}/{len(files)} files")
return success_count > 0
def main():
unprocessed_links = [link for link in MAGNET_LINKS if link not in processed_urls]
if not unprocessed_links:
print("[*] No new magnet links to process")
return
print(f"[*] Processing {len(unprocessed_links)} magnet links...")
for link in unprocessed_links:
try:
if process_magnet_link(link):
processed_urls.add(link)
save_processed()
print(f"[✓] Completed: {link}")
else:
print(f"[!] Failed: {link}")
except Exception as e:
print(f"[!] Error processing {link}: {e}")
print("[⏱] Waiting 30s before next torrent...\n")
time.sleep(30) # Delay between torrents
print("\n✅ Done. All torrents processed.")
if __name__ == "__main__":
main()