## Açıklama
Bu Python scripti, Rclone kullanarak Uloz.to bulut depolama hesabınızdaki çift kayıtlı MP4 ve MKV video dosyalarını otomatik olarak tespit eder ve siler. Script, aynı isme sahip dosyaları gruplar ve her gruptan en büyük boyutlu dosyayı koruyarak diğerlerini siler.
## Özellikler
– ✅ Rclone entegrasyonu ile güvenli dosya yönetimi
– ✅ MP4 ve MKV dosyalarını otomatik filtreleme
– ✅ Aynı isme sahip dosyaları tespit etme
– ✅ En büyük dosyayı koruma, küçükleri silme
– ✅ Detaylı raporlama ve ilerleme göstergesi
– ✅ Güvenli onay mekanizması (silmeden önce kullanıcı onayı)
– ✅ Streaming modunda çalışma (büyük dosya listeleri için optimize)
## Gereksinimler
– Python 3.6+
– Rclone v1.69.1+ (https://rclone.org/downloads/)
– Uloz.to hesabı ve API token
## Kurulum
1. Rclone’u yükleyin ve PATH’e ekleyin
2. Uloz.to için remote yapılandırın: `rclone config`
3. Script içindeki `RCLONE_REMOTE` değişkenini kendi remote adınızla güncelleyin
4. Gerekli Python kütüphaneleri zaten standart kütüphaneler (subprocess, json, collections)
## Kullanım
python find_and_delete_duplicates_rclone.py## Güvenlik
– Silme işlemi geri alınamaz, bu nedenle script silmeden önce kullanıcıdan onay ister
– Her gruptan en büyük dosya korunur, diğerleri silinir
– Detaylı log ve raporlama ile işlemler takip edilebilir
## Notlar
– Script, Rclone’un `lsjson` komutunu kullanarak dosya listesini alır
– Çok fazla dosya varsa işlem biraz zaman alabilir (ilerleme mesajları gösterilir)
– Token hatası durumunda script detaylı yardım mesajları gösterir
## Tarih
Oluşturulma: 2025
Versiyon: 1.0
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Uloz.to Çift Kayıtlı Dosya Silme Scripti (Rclone ile)
Bu script, rclone kullanarak uloz.to hesabınızdaki çift kayıtlı MP4 ve MKV dosyalarını bulur ve siler.
En büyük dosyayı korur, diğerlerini siler.
"""
import subprocess
import json
import re
from collections import defaultdict
from typing import List, Dict, Tuple
import time
# Rclone remote adı (rclone config ile yapılandırdığınız remote adı)
RCLONE_REMOTE = 'ulozDB' # Bu değeri kendi rclone remote adınızla değiştirin
class RcloneUlozTo:
def __init__(self, remote_name: str):
self.remote = remote_name
def list_files(self) -> List[Dict]:
"""Rclone ile tüm dosyaları listeler (streaming modunda, timeout olmadan)"""
try:
# Rclone lsjson komutu ile dosyaları JSON formatında al
cmd = ['rclone', 'lsjson', f'{self.remote}:', '--recursive']
print(f"Komut çalıştırılıyor: {' '.join(cmd)}")
print("⏳ Dosyalar getiriliyor (bu işlem biraz zaman alabilir, lütfen bekleyin)...")
# Streaming modunda oku - timeout yok
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
errors='replace',
bufsize=8192
)
buffer = ""
file_count = 0
last_print = time.time()
# Stdout'u chunk chunk oku
while True:
chunk = process.stdout.read(8192) # 8KB chunks
if not chunk:
break
buffer += chunk
# İlerleme göster (her 10 saniyede bir)
current_time = time.time()
if current_time - last_print > 10:
# Buffer'daki dosya sayısını tahmin et
estimated = buffer.count('"Path"')
if estimated > file_count:
file_count = estimated
print(f" ⏳ {file_count} dosya bulundu (devam ediyor...)")
last_print = current_time
# Process'in bitmesini bekle
process.wait()
stderr_output = process.stderr.read() if process.stderr else ""
if process.returncode != 0:
print(f"❌ Rclone hatası: {stderr_output}")
# Token hatası kontrolü
if "Invalid authorization token" in stderr_output or "401" in stderr_output:
print("\n" + "="*60)
print("API TOKEN HATASI")
print("="*60)
print("\nRclone yapılandırmasında API token'ı yanlış veya geçersiz.")
print("\nToken'ı düzeltmek için:")
print(f" 1. rclone config")
print(f" 2. '{self.remote}' remote'unu seçin (numara ile)")
print(f" 3. 'token' seçeneğini düzenleyin")
print(f" 4. Doğru API token'ınızı girin: 46719ov9d82u")
print(f"\nVeya direkt olarak:")
print(f" rclone config password {self.remote}: token")
print(f" (Token'ı girin: 46719ov9d82u)")
print("\nToken'ınızın doğru olduğundan emin olun.")
print("Uloz.to hesabınızdan yeni bir API token oluşturmanız gerekebilir.")
print("\nLütfen kontrol edin:")
print(f"1. Rclone yüklü mü? (rclone --version)")
print(f"2. Remote '{self.remote}' yapılandırılmış mı? (rclone listremotes)")
print(f"3. Remote adı doğru mu?")
print(f"4. API token doğru mu? (rclone config show {self.remote})")
return []
# JSON çıktısını parse et
if not buffer.strip():
print("❌ Boş çıktı alındı")
return []
try:
files = json.loads(buffer)
print(f"✓ {len(files)} dosya bulundu.")
return files
except json.JSONDecodeError as e:
print(f"❌ JSON parse hatası: {e}")
print(f"Buffer uzunluğu: {len(buffer)} karakter")
print(f"İlk 500 karakter: {buffer[:500]}")
print(f"Son 500 karakter: {buffer[-500:]}")
# Alternatif yöntem dene
print("\n⚠ JSON parse başarısız, alternatif yöntem deneniyor...")
return self._list_files_alternative()
except FileNotFoundError:
print("❌ Rclone bulunamadı. Lütfen rclone'u yükleyin:")
print(" Windows: https://rclone.org/downloads/")
print(" veya: winget install Rclone.Rclone")
return []
except Exception as e:
print(f"❌ Hata: {e}")
import traceback
traceback.print_exc()
return []
def _list_files_alternative(self) -> List[Dict]:
"""Alternatif yöntem: rclone ls --json kullan (her satır bir JSON objesi)"""
try:
print("📋 Alternatif yöntem: rclone ls --json kullanılıyor...")
cmd = ['rclone', 'ls', f'{self.remote}:', '--recursive', '--json']
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
errors='replace',
bufsize=1
)
files = []
file_count = 0
last_print = time.time()
# Her satır bir JSON objesi
for line in process.stdout:
if line.strip():
try:
file_info = json.loads(line.strip())
files.append(file_info)
file_count += 1
# İlerleme göster
current_time = time.time()
if current_time - last_print > 10:
print(f" ⏳ {file_count} dosya işlendi (devam ediyor...)")
last_print = current_time
except json.JSONDecodeError:
continue
process.wait()
if process.returncode != 0:
stderr = process.stderr.read() if process.stderr else ""
print(f"❌ Alternatif yöntem de başarısız: {stderr}")
return []
print(f"✓ {len(files)} dosya bulundu (alternatif yöntem).")
return files
except Exception as e:
print(f"❌ Alternatif yöntem hatası: {e}")
return []
def delete_file(self, file_path: str) -> bool:
"""Rclone ile dosyayı siler"""
try:
cmd = ['rclone', 'delete', f'{self.remote}:{file_path}']
result = subprocess.run(
cmd,
capture_output=True,
text=True,
encoding='utf-8',
errors='replace',
timeout=30
)
if result.returncode == 0:
return True
else:
print(f" Silme hatası: {result.stderr}")
return False
except Exception as e:
print(f" Silme hatası: {e}")
return False
def find_duplicates_by_name(files: List[Dict]) -> Dict[str, List[Dict]]:
"""Aynı isme sahip dosyaları bulur"""
file_dict = defaultdict(list)
for file in files:
# Dosya adını al
path = file.get('Path', '')
filename = path.split('/')[-1] if '/' in path else path
# Uzantıyı kontrol et
extension = filename.split('.')[-1].lower() if '.' in filename else ''
# Sadece MP4 ve MKV dosyalarını kontrol et
if extension in ['mp4', 'mkv']:
# Sadece dosya adını kullan (path'i değil)
file_dict[filename].append(file)
# Sadece birden fazla olan dosyaları döndür
duplicates = {name: files for name, files in file_dict.items() if len(files) > 1}
return duplicates
def display_duplicates(duplicates: Dict):
"""Bulunan çift dosyaları gösterir"""
if not duplicates:
print("\n✓ Çift kayıtlı dosya bulunamadı!")
return
print(f"\n{'='*60}")
print(f"Toplam {len(duplicates)} grup çift kayıtlı dosya bulundu:")
print(f"{'='*60}\n")
total_files = 0
for filename, files in duplicates.items():
# Dosyaları boyuta göre sırala (büyükten küçüğe)
files_sorted = sorted(files, key=lambda f: f.get('Size', 0), reverse=True)
largest_size = files_sorted[0].get('Size', 0)
print(f"📁 {filename}")
print(f" Kopya sayısı: {len(files)}")
print(f" En büyük dosya: {largest_size / (1024*1024):.2f} MB (korunacak)")
for i, file in enumerate(files_sorted, 1):
file_path = file.get('Path', 'bilinmiyor')
size = file.get('Size', 0)
mod_time = file.get('ModTime', 'bilinmiyor')
status = "✓ KORUNACAK" if i == 1 else "✗ SİLİNECEK"
print(f" [{i}] Path: {file_path}")
print(f" Boyut: {size / (1024*1024):.2f} MB, Tarih: {mod_time} {status}")
print()
total_files += len(files)
print(f"Toplam dosya: {total_files} ({len(duplicates)} grup)")
print(f"Silinecek dosya: {total_files - len(duplicates)} (her gruptan en büyüğü korunacak)")
def delete_duplicates(rclone: RcloneUlozTo, duplicates: Dict, confirm: bool = True) -> Tuple[int, int]:
"""Çift kayıtları siler (her gruptan en büyüğünü korur)"""
if confirm:
print("\n⚠ UYARI: Bu işlem geri alınamaz!")
response = input("Çift kayıtları silmek istediğinize emin misiniz? (evet/hayir): ")
if response.lower() not in ['evet', 'e', 'yes', 'y']:
print("İşlem iptal edildi.")
return 0, 0
deleted_count = 0
failed_count = 0
print("\nSilme işlemi başlıyor...\n")
for filename, files in duplicates.items():
print(f"İşleniyor: {filename}")
# Dosyaları boyuta göre sırala (büyükten küçüğe)
files_sorted = sorted(files, key=lambda f: f.get('Size', 0), reverse=True)
# En büyük dosyayı koru, diğerlerini sil
for file in files_sorted[1:]:
file_path = file.get('Path', '')
size = file.get('Size', 0)
if file_path:
print(f" Siliniyor: {file_path} ({size / (1024*1024):.2f} MB)...", end=' ')
if rclone.delete_file(file_path):
print("✓")
deleted_count += 1
else:
print("✗")
failed_count += 1
# Rate limiting için kısa bekleme
time.sleep(0.5)
else:
print(f" ✗ Dosya path'i bulunamadı")
failed_count += 1
return deleted_count, failed_count
def check_rclone_setup(remote_name: str) -> bool:
"""Rclone kurulumunu ve remote yapılandırmasını kontrol eder"""
try:
# Rclone yüklü mü?
result = subprocess.run(
['rclone', '--version'],
capture_output=True,
text=True,
encoding='utf-8',
errors='replace',
timeout=10,
)
if result.returncode != 0:
print("❌ Rclone bulunamadı. Lütfen rclone'u yükleyin:")
print(" https://rclone.org/downloads/")
return False
print(f"✓ Rclone yüklü: {result.stdout.split()[1]}")
# Remote var mı?
result = subprocess.run(
['rclone', 'listremotes'],
capture_output=True,
text=True,
encoding='utf-8',
errors='replace',
timeout=10,
)
if result.returncode != 0:
print("❌ Rclone remote listesi alınamadı")
return False
remotes = result.stdout.strip().split('\n')
remotes = [r.replace(':', '') for r in remotes if r.strip()]
if remote_name not in remotes:
print(f"❌ Remote '{remote_name}' bulunamadı!")
print(f"\nMevcut remote'lar: {', '.join(remotes) if remotes else 'Yok'}")
print(f"\nYeni remote eklemek için:")
print(f" rclone config")
print(f"\nUloz.to için remote yapılandırması:")
print(f" 1. 'n' (new remote)")
print(f" 2. Remote adı: {remote_name}")
print(f" 3. Storage type: ulozto")
print(f" 4. API token'ınızı girin: 46719ov9d82u")
return False
print(f"✓ Remote '{remote_name}' bulundu")
# Remote yapılandırmasını göster (token kontrolü için)
print(f"\n📋 Remote yapılandırması kontrol ediliyor...")
result = subprocess.run(
['rclone', 'config', 'show', remote_name],
capture_output=True,
text=True,
encoding='utf-8',
errors='replace',
timeout=10,
)
if result.returncode == 0:
config = result.stdout
if 'token' in config.lower():
print(f"✓ Token yapılandırılmış")
# Token'ın son birkaç karakterini göster (güvenlik için)
token_lines = [line for line in config.split('\n') if 'token' in line.lower()]
if token_lines:
token_line = token_lines[0]
if '=' in token_line:
token_value = token_line.split('=')[1].strip()
if token_value:
masked = token_value[:4] + '*' * (len(token_value) - 8) + token_value[-4:] if len(token_value) > 8 else '***'
print(f" Token: {masked}")
else:
print(f"⚠ Token bulunamadı")
else:
print(f"⚠ Yapılandırma detayları alınamadı")
return True
except FileNotFoundError:
print("❌ Rclone bulunamadı. Lütfen rclone'u yükleyin:")
print(" https://rclone.org/downloads/")
return False
except Exception as e:
print(f"❌ Kontrol hatası: {e}")
return False
def main():
print("="*60)
print("Uloz.to Çift Kayıtlı Dosya Temizleme (Rclone ile)")
print("="*60)
# Rclone kurulumunu kontrol et
print("\n🔍 Rclone kurulumu kontrol ediliyor...")
if not check_rclone_setup(RCLONE_REMOTE):
print("\nLütfen rclone'u yapılandırın ve tekrar deneyin.")
return
# Rclone başlat
rclone = RcloneUlozTo(RCLONE_REMOTE)
# Dosyaları getir
print("\n📥 Dosyalar getiriliyor...")
files = rclone.list_files()
if not files:
print("❌ Dosya listesi alınamadı veya hesabınızda dosya yok.")
return
print(f"✓ Toplam {len(files)} dosya bulundu.")
# MP4 ve MKV dosyalarını say
video_files = [f for f in files if f.get('Path', '').split('.')[-1].lower() in ['mp4', 'mkv']]
print(f"✓ {len(video_files)} adet MP4/MKV dosyası bulundu.")
# Çift kayıtları bul (aynı isme sahip dosyalar)
print("\n🔍 Çift kayıtlar aranıyor...")
duplicates = find_duplicates_by_name(files)
# Sonuçları göster
display_duplicates(duplicates)
# Silme işlemi
if duplicates:
deleted, failed = delete_duplicates(rclone, duplicates, confirm=True)
print("\n" + "="*60)
print("İŞLEM TAMAMLANDI")
print("="*60)
print(f"✓ Silinen dosya: {deleted}")
if failed > 0:
print(f"✗ Silinemeden dosya: {failed}")
print("="*60)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n⚠ İşlem kullanıcı tarafından iptal edildi.")
except Exception as e:
print(f"\n❌ Beklenmeyen hata: {e}")
import traceback
traceback.print_exc()
requests>=2.31.0
https://github.com/ismailaydemiriu/uloz.to_duplicates