@@ 0,0 1,157 @@
+from abc import ABC
+import sqlite3
+
+from melon.import_providers import ImportProvider, PickerMode
+from melon.servers import Channel, Playlist, Video
+from melon.models import ensure_subscribed_to_channel, ensure_bookmark_external_playlist, add_to_history, new_local_playlist, add_to_local_playlist, set_local_playlist_thumbnail, add_history_items, add_videos
+
+def connect_to_db(database_path):
+ con = sqlite3.connect(database_path)
+ return con
+
+def adapt_json(data):
+ return (json.dumps(data, sort_keys=True)).encode()
+
+def convert_json(blob):
+ return json.loads(blob.decode())
+
+sqlite3.register_adapter(dict, adapt_json)
+sqlite3.register_adapter(list, adapt_json)
+sqlite3.register_adapter(tuple, adapt_json)
+sqlite3.register_converter('json', convert_json)
+
+class NewpipeImporter(ImportProvider, ABC):
+ id = "newpipedb"
+
+ title = "Newpipe Database importer"
+ description = "Import the .db file from inside the newpipe .zip export (as invidious content)"
+ picker_title = "Select .db file"
+
+ # the server id for resources
+ server_id:str
+ # url used to replace https://www.youtube.com
+ base_url:str
+ # url used to replace https://i.ytimg.com
+ img_url:str
+ # service id to match in newpipe db
+ service_id:int
+
+ mode = PickerMode.FILE
+ is_multi = False
+
+ filters = {
+ "Newpipe Database": [ "application/x-sqlite3" ]
+ }
+
+ db:sqlite3.Connection = None
+
+ # additional information about newpipes database layout
+ # https://github.com/TeamNewPipe/NewPipe/wiki/Database
+ def load(self, selection:list[str]):
+ db_path = selection[0]
+ self.db = connect_to_db(db_path)
+
+ self.__load_subs()
+ self.__load_local_playlists()
+ self.__load_remote_playlists()
+ self.__load_history()
+
+ def __load_subs(self):
+ results = self.db.execute("""
+ SELECT service_id, url, name, avatar_url, description
+ FROM subscriptions
+ """).fetchall()
+ for dt in results:
+ if dt[0] == self.service_id:
+ server = self.server_id
+ url = dt[1].replace("https://www.youtube.com", self.base_url)
+ name = dt[2]
+ bio = dt[4]
+ avatar = dt[3].replace("https://i.ytimg.com", self.img_url)
+ id = url.split("/")[-1]
+ c = Channel(server, url, id, name, bio, avatar)
+ ensure_subscribed_to_channel(c)
+
+ def __load_history(self):
+ results = self.db.execute("""
+ SELECT service_id, url, title, access_date, uploader_url, uploader, thumbnail_url
+ FROM streams JOIN stream_history ON stream_history.stream_id = streams.uid
+ """).fetchall()
+ entries = []
+ for dt in results:
+ if dt[0] == self.service_id:
+ server = self.server_id
+ url = dt[1].replace("https://www.youtube.com", self.base_url)
+ id = url.split("=")[-1]
+ title = dt[2]
+ channel_name = dt[5]
+ channel_id = None
+ if not dt[4] is None:
+ channel_id = dt[4].split("/")[-1]
+ thumb = dt[6].replace("https://i.ytimg.com", self.img_url)
+ # newpipe does not store description
+ desc = ""
+ uts = dt[3] / 1000
+ v = Video(server, url, id, title, (channel_name, channel_id), desc, thumb)
+ entries.append((v, uts))
+ add_videos([ d[0] for d in entries ])
+ add_history_items(entries)
+
+ def __load_local_playlists(self):
+ results = self.db.execute("""
+ SELECT uid, name, thumbnail_stream_id
+ FROM playlists
+ """).fetchall()
+ for dt in results:
+ name = dt[1]
+ # newpipe doesn't support playlist descriptions
+ description = ""
+ uid = dt[0]
+ vids = self.db.execute("""
+ SELECT service_id, url, title, uploader, uploader_url, thumbnail_url
+ FROM playlist_stream_join, streams
+ WHERE playlist_stream_join.playlist_id = ? AND streams.uid = playlist_stream_join.stream_id
+ ORDER BY playlist_stream_join.join_index
+ """, (uid,)).fetchall()
+ videos = []
+ for vdt in vids:
+ if vdt[0] == self.service_id:
+ # youtube detected (I think)
+ server = self.server_id
+ url = vdt[1].replace("https://www.youtube.com", self.base_url)
+ id = url.split("=")[-1]
+ title = vdt[2]
+ channel_name = vdt[3]
+ channel_id = None
+ if not vdt[4] is None:
+ channel_id = vdt[4].split("/")[-1]
+ thumb = vdt[5].replace("https://i.ytimg.com", self.img_url)
+ # newpipe doesn't store descriptions
+ desc = ""
+ videos.append(Video(server, url, id, title, (channel_name, channel_id), desc, thumb))
+ pid = new_local_playlist(name, description, videos)
+ thumb = self.db.execute("""
+ SELECT thumbnail_url
+ FROM streams
+ WHERE uid = ?
+ """, (dt[2],)).fetchone()
+ if not thumb is None:
+ set_local_playlist_thumbnail(pid, thumb[0].replace("https://i.ytimg.com", self.img_url))
+
+ def __load_remote_playlists(self):
+ results = self.db.execute("""
+ SELECT service_id, name, url, thumbnail_url, uploader
+ FROM remote_playlists
+ """).fetchall()
+ for dt in results:
+ if dt[0] == self.service_id:
+ server = self.server_id
+ url = dt[2].replace("https://www.youtube.com", self.base_url)
+ id = url.split("=")[-1]
+ title = dt[1]
+ channel_name = dt[4]
+ # newpipe doesn't store the channel id
+ channel_id = None
+ thumb = dt[3].replace("https://i.ytimg.com", self.img_url)
+ p = Playlist(server, url, id, title, (channel_name, channel_id), thumb)
+ ensure_bookmark_external_playlist(p)