@@ 175,7 175,7 @@ class Server(ABC):
"""Returns a list of videos in the given feed"""
return []
@abstractmethod
- def search(self, query:str, mask: SearchMode, page=1) -> list[Resource]:
+ def search(self, query:str, mask: SearchMode, page=0) -> list[Resource]:
"""
Search for a given query
filter for things according to mask
@@ 203,7 203,7 @@ class Server(ABC):
"""
pass
@abstractmethod
- def get_channel_feed_content(self, cid: str, feed_id: str, page=1) -> list[Resource]:
+ def get_channel_feed_content(self, cid: str, feed_id: str, page=0) -> list[Resource]:
"""
Returns list of resources in the channel feed
"""
@@ 216,7 216,7 @@ class Server(ABC):
"""
pass
@abstractmethod
- def get_playlist_content(self, pid: str, page=1) -> list[Resource]:
+ def get_playlist_content(self, pid: str, page=0) -> list[Resource]:
"""
Returns a list of resources in the playlist
"""
@@ 77,7 77,7 @@ class Invidious(Server):
instance = self.get_external_url()
return self.request_and_parse(f"{instance}/feed/{id}")
- def search(self, query, mask, page=1):
+ def search(self, query, mask, page=0):
instance = self.get_external_url()
# default to any
t = "none"
@@ 128,7 128,7 @@ class Invidious(Server):
def get_default_channel_feed(self, cid:str):
return ""
- def get_channel_feed_content(self, cid: str, feed_id: str, page=1):
+ def get_channel_feed_content(self, cid: str, feed_id: str, page=0):
instance = self.get_external_url()
return self.request_and_parse(f"{instance}/channel/{cid}/{feed_id}")
@@ 148,7 148,7 @@ class Invidious(Server):
# on the playlist screen
return Playlist(self.id, url, pid, title, channel, None)
- def get_playlist_content(self, pid: str, page=1):
+ def get_playlist_content(self, pid: str, page=0):
instance = self.get_external_url()
return self.request_and_parse(f"{instance}/playlist?list={pid}")
@@ 297,6 297,7 @@ class Invidious(Server):
for src in video.find_all("source"):
try:
rel = src["src"]
+ print(rel)
stream_url = f"{instance}{rel}"
results.append(Stream(stream_url, src["label"]))
except:
@@ 139,7 139,7 @@ class Nebula(Server):
return results
return []
- def search(self, query, mask, page=1):
+ def search(self, query, mask, page=0):
# nebula doesn't provide playlist search
# which means that this feed will always be empty
if mask == SearchMode.PLAYLISTS:
@@ 200,7 200,7 @@ class Nebula(Server):
def get_default_channel_feed(self, cid:str):
return "videos"
- def get_channel_feed_content(self, channel_id: str, feed_id: str, page=1):
+ def get_channel_feed_content(self, channel_id: str, feed_id: str, page=0):
if feed_id == "videos":
results = []
# TODO: add pagination
@@ 244,7 244,7 @@ class Nebula(Server):
# nebula doesn't support playlist thumbnails
None)
- def get_playlist_content(self, playlist_id: str, page=1):
+ def get_playlist_content(self, playlist_id: str, page=0):
# TODO: add pagination
# apparently this call doesn't support page numbers
# and instead we have to use the provided data["next"] cursor
@@ 0,0 1,376 @@
+from melon.servers import Server, Preference, PreferenceType
+from melon.servers import Feed, Channel, Video, Playlist, Stream, SearchMode
+from melon.servers import USER_AGENT
+import requests
+from urllib.parse import urlparse,parse_qs
+from datetime import datetime
+from melon.import_providers.newpipe import NewpipeImporter
+
+class Peertube(Server):
+
+ id = "peertube"
+ name = "Peertube"
+ description = "Decentralized video hosting network, based on free/libre software"
+
+ settings = {
+ "instances": Preference(
+ "instances",
+ "Instances",
+ "List of peertube instances, from which to fetch content. See https://joinpeertube.org/instances",
+ PreferenceType.MULTI,
+ ["https://tilvids.com/"],
+ ["https://tilvids.com/"]),
+ "nsfw": Preference(
+ "nsfw",
+ "Show NSFW content",
+ "Passes the nsfw filter to the peertube search API",
+ PreferenceType.TOGGLE,
+ False, False),
+ "federate": Preference(
+ "federate",
+ "Enable Federation",
+ "Returns content from federated instances instead of only local content",
+ PreferenceType.TOGGLE,
+ True, True)
+ }
+
+ # peertube instances may contain 18+ content
+ # so we have to indicate that this may contain nsfw content
+ is_nsfw = True,
+
+ def get_external_url(self):
+ # because this plugin supports multiple instances,
+ # we can't really get one external url
+ # so we redirect to the peertube info page instead
+ return "https://joinpeertube.org/"
+
+ def get_instance_list(self):
+ return [ inst.strip("/") for inst in self.settings["instances"].value ]
+
+ def get_public_feeds(self):
+ # because this plugin supports multiple instances
+ # we cannot set n external URL
+ # nor is it possible to show trending videos,
+ # because they cannot be merged
+ return [
+ Feed("latest", "Latest", "dialog-information-symbolic")
+ ]
+
+ def get_query_config(self) -> str:
+ local = not self.settings["federate"].value
+ nsfw = self.settings["nsfw"].value
+ if local:
+ local = "true"
+ else:
+ local = "false"
+ if nsfw:
+ nsfw = "true"
+ else:
+ nsfw = "false"
+ return f"isLocal={local}&nsfw={nsfw}"
+
+ def get_public_feed_content(self,id):
+ if id != "latest":
+ return []
+ conf = self.get_query_config()
+ videos:(Video,int) = []
+ for instance in self.get_instance_list():
+ url = f"{instance}/feeds/videos.json?sort=-publishedAt&{conf}"
+ r = requests.get(url)
+ if r.ok:
+ items = r.json()["items"]
+ for item in items:
+ url = item["url"]
+ vid = item["id"].split("/")[-1]
+ id = f"{instance}::{vid}"
+ title = item["title"]
+ desc = ""
+ if "summary" in item:
+ desc = item["summary"]
+ # don't know how to get the thumbnail
+ thumb = None
+ channel_name = item["author"]["name"]
+ channel_id = item["author"]["url"].split("/")[-1]
+ channel = (channel_name, channel_id)
+ v = Video(self.id, url, id, title, channel, desc, thumb)
+ date_str = item["date_modified"]
+ uts = datetime.fromisoformat(date_str).timestamp()
+ videos.append((v, uts))
+
+ videos.sort(key=lambda x:x[1])
+ return [ v[0] for v in videos ]
+
+ def search(self, query, mask, page=1):
+ vid_conf = self.get_query_config()
+ results = []
+ for instance in self.get_instance_list():
+ search_base = f"{instance}/api/v1/search"
+ if mask == SearchMode.CHANNELS or mask == SearchMode.ANY:
+ # channel search
+ url = f"{search_base}/video-channels?search={query}"
+ r = requests.get(url)
+ if r.ok:
+ for item in r.json()["data"]:
+ item_id = item["name"]
+ item_host = item["host"]
+ channel_id = f"https://{item_host}::{item_id}"
+ avatar = None
+ if "avatar" in item and not item["avatar"] is None:
+ path=item["avatar"]["path"]
+ avatar=f"{instance}{path}"
+ c = Channel(
+ self.id,
+ item["url"],
+ channel_id,
+ item["displayName"],
+ item["description"] or "",
+ avatar)
+ results.append(c)
+ if mask == SearchMode.PLAYLISTS or mask == SearchMode.ANY:
+ # playlist search
+ url = f"{search_base}/video-playlists?search={query}"
+ r = requests.get(url)
+ if r.ok:
+ for item in r.json()["data"]:
+ item_uuid = item["uuid"]
+ item_id = f"{instance}::{item_uuid}"
+ channel_slug = item["videoChannel"]["name"]
+ channel_host = item["videoChannel"]["host"]
+ channel_id = f"https://{channel_host}::{channel_slug}"
+ channel_name = item["videoChannel"]["displayName"]
+ thumb_path = item["thumbnailPath"]
+ thumb = f"{instance}{thumb_path}"
+ p = Playlist(
+ self.id,
+ item["url"],
+ item_id,
+ item["displayName"],
+ (channel_name, channel_id),
+ thumb)
+ results.append(p)
+ if mask == SearchMode.VIDEOS or mask == SearchMode.ANY:
+ # video search
+ url = f"{search_base}/videos?search={query}&{vid_conf}"
+ r = requests.get(url)
+ if r.ok:
+ for item in r.json()["data"]:
+ video_host = instance
+ video_slug = item["uuid"]
+ video_id=f"{video_host}::{video_slug}"
+ thumb_path = item["thumbnailPath"]
+ thumb = f"{instance}/{thumb_path}"
+ channel_name = item["channel"]["displayName"]
+ channel_slug = item["channel"]["name"]
+ channel_host = item["channel"]["host"]
+ channel_id = f"https://{channel_host}::{channel_slug}"
+ v = Video(
+ self.id,
+ item["url"],
+ video_id,
+ item["name"],
+ (channel_name, channel_id),
+ item["description"] or "",
+ thumb)
+ results.append(v)
+ return results
+
+ def get_channel_info(self, cid: str):
+ instance, channel_id = cid.split("::")
+ url = f"{instance}/api/v1/video-channels/{channel_id}"
+ r = requests.get(url)
+ if r.ok:
+ data = r.json()
+ channel_slug = data["name"]
+ channel_host = data["host"]
+ channel_id = f"https://{channel_host}::{channel_slug}"
+ avatar = None
+ if "avatar" in data and not data["avatar"] is None:
+ avatar_path = data["avatar"]["path"]
+ avatar = f"{instance}{avatar_path}"
+ return Channel(
+ self.id,
+ data["url"],
+ channel_id,
+ data["displayName"],
+ data["description"],
+ avatar)
+
+ def get_channel_feeds(self, cid:str) :
+ res = self.get_channel_feed_content(cid, "playlists")
+ if len(res) != 0:
+ return [
+ Feed("videos", "Videos"),
+ Feed("playlists", "Playlists")
+ ]
+ return [
+ Feed("videos", "Videos")
+ ]
+
+ def get_default_channel_feed(self, cid:str):
+ return "videos"
+
+ def get_channel_feed_content(self, cid: str, feed_id: str, page=1):
+ instance, channel_id = cid.split("::")
+ results = []
+ if feed_id == "videos":
+ url = f"{instance}/api/v1/video-channels/{channel_id}/videos"
+ r = requests.get(url)
+ if r.ok:
+ for item in r.json()["data"]:
+ video_host = instance
+ video_slug = item["uuid"]
+ video_id=f"{video_host}::{video_slug}"
+ thumb_path = item["thumbnailPath"]
+ thumb = f"{instance}{thumb_path}"
+ channel_name = item["channel"]["displayName"]
+ channel_slug = item["channel"]["name"]
+ channel_host = item["channel"]["host"]
+ channel_id = f"https://{channel_host}::{channel_slug}"
+ v = Video(
+ self.id,
+ item["url"],
+ video_id,
+ item["name"],
+ (channel_name, channel_id),
+ item["description"],
+ thumb)
+ results.append(v)
+ elif feed_id == "playlists":
+ url = f"{instance}/api/v1/video-channels/{channel_id}/video-playlists"
+ r = requests.get(url)
+ if r.ok:
+ for item in r.json()["data"]:
+ item_uuid = item["uuid"]
+ item_id = f"{instance}::{item_uuid}"
+ channel_slug = item["videoChannel"]["name"]
+ channel_id = f"{instance}::{channel_slug}"
+ channel_name = item["videoChannel"]["displayName"]
+ thumb_path = item["thumbnailPath"]
+ thumb = f"{instance}{thumb_path}"
+ p = Playlist(
+ self.id,
+ item["url"],
+ item_id,
+ item["displayName"],
+ (channel_name, channel_id),
+ thumb)
+ results.append(p)
+
+ return results
+
+
+ def get_playlist_info(self, pid: str):
+ instance, playlist_id = pid.split("::")
+ url = f"{instance}/api/v1/video-playlists/{playlist_id}"
+ r = requests.get(url)
+ if r.ok:
+ data = r.json()
+ item_uuid = data["uuid"]
+ item_id = f"{instance}::{item_uuid}"
+ channel_name = data["videoChannel"]["displayName"]
+ channel_slug = data["videoChannel"]["name"]
+ channel_host = data["videoChannel"]["host"]
+ channel_id = f"https://{channel_host}::{channel_slug}"
+ thumb_path = data["thumbnailPath"]
+ thumb = f"{instance}{thumb_path}"
+ return Playlist(
+ self.id,
+ data["url"],
+ item_id,
+ data["displayName"],
+ (channel_name, channel_id),
+ thumb)
+
+ def get_playlist_content(self, pid: str, page=0):
+ instance, playlist_id = pid.split("::")
+ size = 15
+ url = f"{instance}/api/v1/video-playlists/{playlist_id}/videos?start={page*size}&count={size}"
+ r = requests.get(url)
+ results = []
+ if r.ok:
+ for item in r.json()["data"]:
+ item = item["video"]
+ video_host = instance
+ video_slug = item["uuid"]
+ video_id=f"{video_host}::{video_slug}"
+ thumb_path = item["thumbnailPath"]
+ thumb = f"{instance}{thumb_path}"
+ channel_name = item["channel"]["displayName"]
+ channel_slug = item["channel"]["name"]
+ channel_host = item["channel"]["host"]
+ channel_id = f"https://{channel_host}::{channel_slug}"
+ v = Video(
+ self.id,
+ item["url"],
+ video_id,
+ item["name"],
+ (channel_name, channel_id),
+ item["description"],
+ thumb)
+ results.append(v)
+ return results
+
+ def get_timeline(self, cid: str):
+ instance, channel_id = cid.split("::")
+ results = []
+ url = f"{instance}/api/v1/video-channels/{channel_id}/videos"
+ r = requests.get(url)
+ if r.ok:
+ for item in r.json()["data"]:
+ video_host = instance
+ video_slug = item["uuid"]
+ video_id=f"{video_host}::{video_slug}"
+ thumb_path = item["thumbnailPath"]
+ thumb = f"{instance}{thumb_path}"
+ channel_name = item["channel"]["displayName"]
+ channel_slug = item["channel"]["name"]
+ channel_host = item["channel"]["host"]
+ channel_id = f"https://{channel_host}::{channel_slug}"
+ v = Video(
+ self.id,
+ item["url"],
+ video_id,
+ item["name"],
+ (channel_name, channel_id),
+ item["description"],
+ thumb)
+ uts = datetime.fromisoformat(item["updatedAt"]).timestamp()
+ results.append((v, uts))
+ return results
+
+ def get_video_info(self, vid: str):
+ instance, video_id = vid.split("::")
+ url = f"{instance}/api/v1/videos/{video_id}"
+ r = requests.get(url)
+ if r.ok:
+ data = r.json()
+ video_host = instance
+ video_slug = data["uuid"]
+ video_id=f"{video_host}::{video_slug}"
+ channel_name = data["channel"]["displayName"]
+ channel_slug = data["channel"]["name"]
+ channel_host = data["channel"]["host"]
+ channel_id = f"https://{channel_host}::{channel_slug}"
+ thumb_path = data["thumbnailPath"]
+ thumb = f"{instance}{thumb_path}"
+ return Video(
+ self.id,
+ data["url"],
+ video_id,
+ data["name"],
+ (channel_name, channel_id),
+ data["description"],
+ thumb)
+
+ def get_video_streams(self, vid:str):
+ instance, video_id = vid.split("::")
+ return [
+ # the embed player does the quality selection
+ Stream(f"{instance}/videos/embed/{video_id}", "auto")
+ ]
+
+ def get_import_providers(self):
+ return [
+ # TODO: port newpipe invidious importer to peertube
+ # REQUIRES: figuring out what service_id peertube uses
+ ]