YouTubeDownloader v1.1.2
YouTube content downloader
Loading...
Searching...
No Matches
downloader.py
Go to the documentation of this file.
1"""!
2********************************************************************************
3@file downloader.py
4@brief Download Thread
5********************************************************************************
6"""
7
8import os
9import re
10import logging
11from typing import TYPE_CHECKING
12import statistics
13import threading
14import time
15import pytubefix
16from pytubefix import YouTube
17from pytubefix.streams import Stream
18
19from Source.version import __title__
20from Source.Views.mainwindow_tk_ui import HIGH_RESOLUTION, LOW_RESOLUTION, ONLY_AUDIO
21if TYPE_CHECKING:
22 from Source.Controller.main_window import MainWindow
23
24B_MP3_CONVERT = False
25
26if B_MP3_CONVERT:
27 from moviepy.editor import AudioFileClip
28
29log = logging.getLogger(__title__)
30
31S_DOWNLOAD_FOLDER = "Download"
32I_SPEED_AVERAGE_VALUES = 10
33
34
35class DownloadThread(threading.Thread):
36 """!
37 @brief Thread class for download
38 @param main_controller : main controller
39 """
40
41 def __init__(self, main_controller: "MainWindow"):
42 threading.Thread.__init__(self)
43 self.main_controller = main_controller
45 self.i_file_size = 0
47 self.f_time_stamp = 0.0
49 self.l_speed_history: list[int] = [] # download speed history
50 self.clear_data()
51
52 def clear_data(self) -> None:
53 """!
54 @brief Clear data
55 """
56 self.b_first_callback_call = False
57 self.i_file_size = 0
58 self.i_last_percent = 0
59 self.f_time_stamp = 0.0
61 self.l_speed_history = [] # download speed history
62
63 def run(self) -> None:
64 """!
65 @brief Download YouTube content
66 """
67 self.main_controller.status_lbl.configure(text="Analysiere URL...", text_color="grey")
68 self.main_controller.progress_bar.set(0)
69 i_choice = self.main_controller.choice_var.get()
70 s_url = self.main_controller.o_url_choice.get()
71 if i_choice != 0:
72 l_url = []
73 s_subfolder_name = ""
74 if re.search("&list=", s_url):
75 o_playlist = pytubefix.Playlist(s_url)
76 s_subfolder_name = "/" + o_playlist.title
77 for video in o_playlist.videos:
78 l_url.append(video.watch_url)
79 else:
80 l_url = [s_url]
81 i_title_cnt = len(l_url)
82 for i, s_url in enumerate(l_url, 1):
83 self.main_controller.status_lbl.configure(text="Analysiere URL...", text_color="grey")
84 s_text = f"Titel {i}/{i_title_cnt}: ..."
85 self.main_controller.title_lbl.configure(text=s_text, text_color="orange")
86 b_valid_url = False
87 try:
88 o_youtube = YouTube(s_url, on_progress_callback=self.progress_callback)
89 s_titel = o_youtube.title[:35]
90 s_text = f"Titel {i}/{i_title_cnt}: {s_titel}"
91 self.main_controller.title_lbl.configure(text=s_text, text_color="orange")
92 b_valid_url = True
93 except BaseException: # pylint: disable=bare-except
94 self.main_controller.status_lbl.configure(text="Ungültige URL!", text_color="red")
95 if b_valid_url:
96 self.clear_data()
97 s_filename = None
98 if i_choice == HIGH_RESOLUTION:
99 o_stream = o_youtube.streams.filter(progressive=True, file_extension='mp4')\
100 .get_highest_resolution()
101 elif i_choice == LOW_RESOLUTION:
102 o_stream = o_youtube.streams.filter(progressive=True, file_extension='mp4')\
103 .get_lowest_resolution()
104 elif i_choice == ONLY_AUDIO:
105 o_stream = o_youtube.streams.filter(only_audio=True).first()
106 else:
107 o_stream = None # TODO
108 self.main_controller.status_lbl.configure(text="Unerwarteter Fehler!", text_color="red")
109 try:
110 self.main_controller.status_lbl.configure(text="Download läuft...", text_color="grey")
111 o_stream.download(S_DOWNLOAD_FOLDER + s_subfolder_name, s_filename)
112 self.main_controller.status_lbl.configure(text="Download abgeschlossen!", text_color="green")
113 if B_MP3_CONVERT:
114 if i_choice == ONLY_AUDIO:
115 self.main_controller.status_lbl.configure(text="MP3 wird erstellt...", text_color="grey")
116 s_file_path_name = S_DOWNLOAD_FOLDER + s_subfolder_name + "/" + o_stream.default_filename
117 audioclip = AudioFileClip(s_file_path_name)
118 audioclip.write_audiofile(s_file_path_name[:-1] + "3")
119 audioclip.close()
120 os.remove(s_file_path_name)
121 self.main_controller.status_lbl.configure(text="MP3 erstellt!", text_color="green")
122 except BaseException: # pylint: disable=bare-except
123 self.main_controller.status_lbl.configure(text="Dieses Video kann nicht heruntergeladen werden!",
124 text_color="red")
125 else:
126 self.main_controller.status_lbl.configure(text="Bitte Format angeben!", fg="red")
127 self.main_controller.download_btn.configure(state="normal")
128 self.main_controller.direct_btn.configure(state="normal")
129
130 def progress_callback(self, _stream: Stream, _chunk: bytes, bytes_remaining: int) -> None:
131 """!
132 @brief Calculate process and update process bar
133 @param _stream : stream
134 @param _chunk : chunk
135 @param bytes_remaining : bytes remaining
136 """
137 if not self.b_first_callback_call:
138 self.i_file_size = bytes_remaining
139 self.i_last_bytes_remaining = bytes_remaining
140 self.b_first_callback_call = True
141 else:
142 f_actual_time = time.time()
143 f_past_time = f_actual_time - self.f_time_stamp
144 if self.f_time_stamp != 0: # set calculate time only for second call
145 i_actual_speed = int((self.i_last_bytes_remaining - bytes_remaining) / f_past_time)
146 i_history_len = len(self.l_speed_history)
147 if i_history_len < I_SPEED_AVERAGE_VALUES:
148 self.l_speed_history.append(i_actual_speed)
149 else:
150 for i, value in enumerate(self.l_speed_history):
151 if i != 0:
152 self.l_speed_history[i - 1] = value
153 self.l_speed_history[I_SPEED_AVERAGE_VALUES - 1] = i_actual_speed
154 i_average_speed = statistics.mean(self.l_speed_history)
155 i_remaining_seconds = int(bytes_remaining / i_average_speed)
156 self.main_controller.status_lbl.configure(text=f'Download läuft... noch {i_remaining_seconds}sek', text_color="grey")
157 self.f_time_stamp = f_actual_time
158 self.i_last_bytes_remaining = bytes_remaining
159 i_percent = int(((self.i_file_size - bytes_remaining) / self.i_file_size) * 100)
160 i_percent_diff = i_percent - self.i_last_percent
161 for _ in range(i_percent_diff):
162 self.main_controller.progress_bar.set(i_percent / 100)
163 self.i_last_percent = i_percent
164
165
166if __name__ == "__main__":
167 my_url = "https://www.youtube.com/watch?v=QCRYA6ck3x0"
168 try:
169 my_youtube = YouTube(my_url)
170 except BaseException: # pylint: disable=bare-except
171 print("Ungültige URL!")
172 else:
173 my_filename = None
174 my_stream = my_youtube.streams.filter(only_audio=True).first()
175 try:
176 print("Download läuft...")
177 my_stream.download(S_DOWNLOAD_FOLDER, my_filename)
178 print("Download abgeschlossen!")
179 except BaseException: # pylint: disable=bare-except
180 print("Dieses Video kann nicht heruntergeladen werden!")
Thread class for download.
Definition downloader.py:35
__init__(self, "MainWindow" main_controller)
Definition downloader.py:41
None run(self)
Download YouTube content.
Definition downloader.py:63
None progress_callback(self, Stream _stream, bytes _chunk, int bytes_remaining)
Calculate process and update process bar.