Optimizations and multi format

Optimized the code with a new library and added FLAC, MP3 and M4A into one downloader
This commit is contained in:
Olai Vike Bøe 2024-01-17 00:33:43 +01:00
parent 4ce8445f48
commit 0612ccb0a3
5 changed files with 176 additions and 332 deletions

164
Downloader.py Normal file
View file

@ -0,0 +1,164 @@
import os
import requests
import multiprocessing
from bs4 import BeautifulSoup
from urllib.parse import unquote
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import aiohttp
import asyncio
MAX_RETRIES = 2
url = input("Enter the URL: ")
# Fetch HTML content from the specified URL
response = requests.get(url)
html_content = response.text
# Parse the HTML content
soup = BeautifulSoup(html_content, 'lxml')
# Find all elements with class 'playlistDownloadSong'
elements = soup.find_all(class_='playlistDownloadSong')
# Store URLs in a list
urls = []
for index, element in enumerate(elements):
link = element.find('a')
if link:
url = link.get('href')
full_url = f'https://downloads.khinsider.com{url}'
urls.append(full_url)
# List to store failed URLs
failed_urls = []
# Lock to prevent concurrent printing of error messages
print_lock = asyncio.Lock()
# Function to fetch HTML content asynchronously
async def async_get_html_content(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
response.raise_for_status()
return await response.text()
except aiohttp.ClientError as e:
return None
# Function to find and save FLAC, MP3, or M4A URLs and album name
def find_audio_urls_and_album_name(html_content):
audio_urls = []
album_name = None
if html_content:
soup = BeautifulSoup(html_content, "html.parser")
# Find all links in the page
links = soup.find_all("a", href=True)
for link in links:
href = link.get("href")
if href.endswith(".flac"):
audio_url = href
audio_urls = [audio_url]
break
elif href.endswith(".mp3") or href.endswith(".m4a"):
audio_urls.append(href)
# Extract album name
album_name_element = soup.select_one("#pageContent > p:nth-child(6) > b:nth-child(1)")
if album_name_element:
album_name = album_name_element.text.strip()
return audio_urls, album_name
# Function to download a file asynchronously with retry
async def async_download_audio_file(session, url, directory, total_progress):
retries = 0
while retries <= MAX_RETRIES:
try:
async with session.get(url) as response:
response.raise_for_status()
content = await response.read()
filename = unquote(os.path.join(directory, os.path.basename(url)))
with open(filename, 'wb') as file:
file.write(content)
total_progress.update(1)
break # Break the loop if download is successful
except Exception as e:
retries += 1
if retries <= MAX_RETRIES:
await asyncio.sleep(2) # Wait for a moment before retrying
else:
async with print_lock:
failed_urls.append(url)
break # Break the loop if max retries reached
# Function to process a single URL asynchronously
async def async_process_url(session, url, total_progress):
html_content = await async_get_html_content(url)
audio_urls, album_name = find_audio_urls_and_album_name(html_content)
if audio_urls and album_name:
sanitized_album_name = "".join(c if c.isalnum() or c in [' ', '-', '_'] else '' for c in album_name)
album_directory = os.path.join('Audio files', sanitized_album_name)
os.makedirs(album_directory, exist_ok=True)
for audio_url in audio_urls:
await async_download_audio_file(session, audio_url, album_directory, total_progress)
else:
pass # No audio files found for the URL
def get_cpu_threads():
try:
num_threads = os.cpu_count() or 1
except NotImplementedError:
num_threads = multiprocessing.cpu_count() or 1
return num_threads
async def main():
cpu_threads = get_cpu_threads()
async with aiohttp.ClientSession() as session:
with ThreadPoolExecutor(max_workers=cpu_threads) as executor:
total_items = len(urls)
total_progress = tqdm(total=total_items, desc="Total Progress", position=0)
futures = []
loop = asyncio.get_event_loop()
for url in urls:
# Corrected: use loop.create_task() to ensure the coroutine is awaited properly
future = loop.create_task(async_process_url(session, url, total_progress))
futures.append(future)
# Await all the futures
await asyncio.gather(*futures)
total_progress.close()
# Display error messages for failed URLs after the download is complete
if failed_urls:
print("\nThe following files encountered errors during download:")
for failed_url in failed_urls:
print(f"- {failed_url}")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -13,54 +13,28 @@ Install the required python libraries by downloading the requirements.txt found
pip install -r requirements.txt
```
Start the program using Python and enter the album URL of the website by running
Not all albums have mp3 or flac, If one does not work try the other
```
python downloader-flac.py
```
or
```
python downloader-mp3.py
python Downloader.py
```
## Info
The downloaded files will be in either "MP3 files" or "FLAC files". The folders will be created where the python script is located.
The script tries to find FLAC, MP3 and M4A files. It prioritizes FLAC files but if there isnt then it tries MP3 and if both are misisng then M4A.
The downloaded files will be "Audio files". By default the folder will be created where the python script is located.
## Custom path download
<details>
<summary><b>MP3</b></summary>
Find "album_directory = os.path.join('Audio files', sanitized_album_name)" and replace it with the following code
### Windows
Replace "album_directory = os.path.join('MP3 files', sanitized_album_name)" with
```
base_directory = 'C:\\your\\custom\\path'
album_directory = os.path.join(base_directory, 'MP3 files', sanitized_album_name)
album_directory = os.path.join(r'C:\your\custom\path', sanitized_album_name)
```
### Linux
Replace "album_directory = os.path.join('MP3 files', sanitized_album_name)" with
```
base_directory = '/your/custom/path'
album_directory = os.path.join(base_directory, 'MP3 files', sanitized_album_name)
album_directory = os.path.join('/your/custom/path', sanitized_album_name)
```
</details>
<details>
<summary><b>FLAC</b></summary>
### Windows
Replace "album_directory = os.path.join('FLAC files', sanitized_album_name)" with
```
base_directory = 'C:\\your\\custom\\path'
album_directory = os.path.join(base_directory, 'FLAC files', sanitized_album_name)
```
### Linux
Replace "album_directory = os.path.join('FLAC files', sanitized_album_name)" with
```
base_directory = '/your/custom/path'
album_directory = os.path.join(base_directory, 'FLAC files', sanitized_album_name)
```
</details>

View file

@ -1,148 +0,0 @@
import os
import requests
import multiprocessing
from bs4 import BeautifulSoup
from urllib.parse import unquote
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
url = input("Enter the URL: ")
# Fetch HTML content from the specified URL
response = requests.get(url)
html_content = response.text
# Parse the HTML content
soup = BeautifulSoup(html_content, 'html.parser')
# Find all elements with class 'playlistDownloadSong'
elements = soup.find_all(class_='playlistDownloadSong')
# Store URLs in a list
urls = []
for index, element in enumerate(elements):
link = element.find('a')
if link:
url = link.get('href')
full_url = f'https://downloads.khinsider.com{url}'
urls.append(full_url)
# Function to fetch and parse HTML content
def get_html_content(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"Error fetching {url}: {e}")
return None
# Function to find and save FLAC URLs and album name
def find_flac_urls_and_album_name(html_content):
flac_urls = []
album_name = None
if html_content:
soup = BeautifulSoup(html_content, "html.parser")
# Find all links in the page
links = soup.find_all("a", href=True)
for link in links:
href = link.get("href")
if href.endswith(".flac"):
flac_url = href
flac_urls.append(flac_url)
# Extract album name
album_name_element = soup.select_one("#pageContent > p:nth-child(6) > b:nth-child(1)")
if album_name_element:
album_name = album_name_element.text.strip()
return flac_urls, album_name
# Function to download a file
def download_file(url, directory, total_progress):
try:
response = requests.get(url, stream=True)
response.raise_for_status()
# Unquote the filename to convert %20 back to spaces
filename = unquote(os.path.join(directory, os.path.basename(url)))
with open(filename, 'wb') as file:
for data in response.iter_content(chunk_size=1024):
file.write(data)
total_progress.update(1) # Update the total progress by 1 for each file downloaded
# print(f"Downloaded: {filename}")
except requests.exceptions.RequestException as e:
print(f"Error downloading {url}: {e}")
# Function to process a single URL
def process_url(url, total_progress):
# print(f"Scraping {url} for FLAC files...")
html_content = get_html_content(url)
flac_urls, album_name = find_flac_urls_and_album_name(html_content)
if flac_urls and album_name:
# Sanitize album name for creating a directory
sanitized_album_name = "".join(c if c.isalnum() or c in [' ', '-', '_'] else '' for c in album_name)
album_directory = os.path.join('FLAC files', sanitized_album_name)
os.makedirs(album_directory, exist_ok=True)
# print(f"FLAC files found for album '{album_name}':")
for flac_url in flac_urls:
download_file(flac_url, album_directory, total_progress)
else:
# No need to print an error message here
pass
def get_cpu_threads():
try:
# For Linux/Unix/MacOS
num_threads = os.cpu_count() or 1
except NotImplementedError:
# For Windows
num_threads = multiprocessing.cpu_count() or 1
return num_threads
if __name__ == "__main__":
cpu_threads = get_cpu_threads()
# Use ThreadPoolExecutor to run the process_url function concurrently
with ThreadPoolExecutor(max_workers=cpu_threads) as executor:
total_items = len(urls)
total_progress = tqdm(total=total_items, desc="Total Progress", position=0)
futures = []
for url in urls:
future = executor.submit(process_url, url, total_progress)
futures.append(future)
# Wait for all futures to complete
for future in futures:
future.result()
total_progress.close()
# Display the final message based on the download results
downloaded_files = total_progress.n
error_message = None
if downloaded_files == 0:
error_message = "Album name missing from site."
elif downloaded_files < total_items:
error_message = f"{total_items - downloaded_files} files not downloaded. Missing FLAC files."
if error_message:
print(error_message)

View file

@ -1,148 +0,0 @@
import os
import requests
import multiprocessing
from bs4 import BeautifulSoup
from urllib.parse import unquote
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
url = input("Enter the URL: ")
# Fetch HTML content from the specified URL
response = requests.get(url)
html_content = response.text
# Parse the HTML content
soup = BeautifulSoup(html_content, 'html.parser')
# Find all elements with class 'playlistDownloadSong'
elements = soup.find_all(class_='playlistDownloadSong')
# Store URLs in a list
urls = []
for index, element in enumerate(elements):
link = element.find('a')
if link:
url = link.get('href')
full_url = f'https://downloads.khinsider.com{url}'
urls.append(full_url)
# Function to fetch and parse HTML content
def get_html_content(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
print(f"Error fetching {url}: {e}")
return None
# Function to find and save MP3 URLs and album name
def find_mp3_urls_and_album_name(html_content):
mp3_urls = []
album_name = None
if html_content:
soup = BeautifulSoup(html_content, "html.parser")
# Find all links in the page
links = soup.find_all("a", href=True)
for link in links:
href = link.get("href")
if href.endswith(".mp3"):
mp3_url = href
mp3_urls.append(mp3_url)
# Extract album name
album_name_element = soup.select_one("#pageContent > p:nth-child(6) > b:nth-child(1)")
if album_name_element:
album_name = album_name_element.text.strip()
return mp3_urls, album_name
# Function to download a file
def download_file(url, directory, total_progress):
try:
response = requests.get(url, stream=True)
response.raise_for_status()
# Unquote the filename to convert %20 back to spaces
filename = unquote(os.path.join(directory, os.path.basename(url)))
with open(filename, 'wb') as file:
for data in response.iter_content(chunk_size=1024):
file.write(data)
total_progress.update(1) # Update the total progress by 1 for each file downloaded
# print(f"Downloaded: {filename}")
except requests.exceptions.RequestException as e:
print(f"Error downloading {url}: {e}")
# Function to process a single URL
def process_url(url, total_progress):
# print(f"Scraping {url} for MP3 files...")
html_content = get_html_content(url)
mp3_urls, album_name = find_mp3_urls_and_album_name(html_content)
if mp3_urls and album_name:
# Sanitize album name for creating a directory
sanitized_album_name = "".join(c if c.isalnum() or c in [' ', '-', '_'] else '' for c in album_name)
album_directory = os.path.join('MP3 files', sanitized_album_name)
os.makedirs(album_directory, exist_ok=True)
# print(f"MP3 files found for album '{album_name}':")
for mp3_url in mp3_urls:
download_file(mp3_url, album_directory, total_progress)
else:
# No need to print an error message here
pass
def get_cpu_threads():
try:
# For Linux/Unix/MacOS
num_threads = os.cpu_count() or 1
except NotImplementedError:
# For Windows
num_threads = multiprocessing.cpu_count() or 1
return num_threads
if __name__ == "__main__":
cpu_threads = get_cpu_threads()
# Use ThreadPoolExecutor to run the process_url function concurrently
with ThreadPoolExecutor(max_workers=cpu_threads) as executor:
total_items = len(urls)
total_progress = tqdm(total=total_items, desc="Total Progress", position=0)
futures = []
for url in urls:
future = executor.submit(process_url, url, total_progress)
futures.append(future)
# Wait for all futures to complete
for future in futures:
future.result()
total_progress.close()
# Display the final message based on the download results
downloaded_files = total_progress.n
error_message = None
if downloaded_files == 0:
error_message = "Album name missing from site."
elif downloaded_files < total_items:
error_message = f"{total_items - downloaded_files} files not downloaded. Missing MP3 files."
if error_message:
print(error_message)

View file

@ -1,3 +1,5 @@
beautifulsoup4==4.9.3
requests==2.25.1
tqdm==4.56.0
beautifulsoup4==4.10.0
requests==2.26.0
tqdm==4.62.3
aiohttp==3.8.6
lxml==5.1.0