歌单怎么办

跟朋友推荐了一下自己组个NAS然后搭建自己的流媒体服务器,但是当朋友费劲心思将自己的网易云歌单上的歌下载后遇到了一个问题。即,怎么去将这些歌单导入到自己的Navidrome中呢?以及折腾NAS的,尤其是AIO的,最后大多都会有ALL IN BOOM的窘境,在歌单导入后怎么备份的问题。

这是一个确实比较关键的问题,考虑到每个人的分类习惯可能不太一致,我一般很少使用歌单的功能,因为我的歌单中的歌曲不重复,所以我可以直接以文件夹作为分类,那么这样导入到Navidrome中只需要遍历文件夹然后直接导入即可;但是很多人可能歌单中会有重复,比如“国语”和“古风”这两个歌单中可能都存在同一首歌,按照我的这种方式,就不太合适了。

考虑到这个问题,又因为是我带朋友入坑的,那么这个问题显然得我来帮他解决,说干就干。

Navidrome API

在开始一切之前,我们需要提取一下Navidrome的API相关调用,但是我没找到文档,索性自己用Web开发者工具看请求提取了。

主要还是获取歌单列表、创建歌单、获取歌单对应的歌曲、添加歌曲到歌单这些常规的操作,当然这里面还实现了一下获取所有的歌曲,用于判断添加的歌曲是否在Navidrome的媒体库中。

封装的Python Class:

class NavidromeAPI:
  def __init__(self, url: str, username: str, password: str):
    self.url = url
    self.username = username
    self.password = password
    self.session = requests.Session()
    self.session.headers.update({
        "x-nd-authorization": f"Bearer {self.get_auth_token()}"
    })
 
  def get_auth_token(self) -> str:
    """
    Get the authentication token from the server.
    """
    data = {
        "username": self.username,
        "password": self.password
    }
    response = requests.post(self.url + "auth/login", json=data)
    if response.status_code != 200:
      raise Exception("Failed to get auth token: " + response.text)
    return response.json().get("token")
    
  def get_playlist(self, name: str = None) -> list | None: 
    """
    Get the playlist from the server using the authentication token.
    """
    response = self.session.get(self.url + "api/playlist")
    if response.status_code != 200:
      raise Exception("Failed to get playlist: " + response.text)
    if name:
      playlists = response.json()
      for playlist in playlists:
        if playlist['name'] == name:
          return [playlist]
      return None
    return response.json()
 
  def get_playlist_tracks(self, playlist_id: str) -> list:
    """
    Get the tracks of a specific playlist by its ID.
    """
    response = self.session.get(self.url + f"api/playlist/{playlist_id}/tracks")
    if response.status_code != 200:
      raise Exception("Failed to get playlist tracks: " + response.text)
    return response.json()
 
  def get_all_songs(self) -> list:
    """
    Get all songs from the server.
    """
    response = self.session.get(self.url + "api/song")
    if response.status_code != 200:
      raise Exception("Failed to get all songs: " + response.text)
    return response.json()
 
  def create_playlist(self, name: str) -> str:
    """
    Create a new playlist with the given name and return its ID.
    """
    exists_playlist = self.get_playlist()
    playlist_names = [playlist['name'] for playlist in exists_playlist]
    if name in playlist_names:
      raise Exception(f"Playlist '{name}' already exists.")
    data = {
        "name": name,
        "public": False
    }
    response = self.session.post(self.url + "api/playlist", json=data)
    if response.status_code == 200 and response.json().get("id") is not None:
        return response.json().get("id")
    else:
        raise Exception("Failed to create playlist: " + response.text)
    
  def add_songs_to_playlist(self, playlist_id: str, song_ids: list):
    """
    Add songs to a specific playlist by its ID.
    """
    be_added_ids = [id for id in song_ids if id not in [track['mediaFileId'] for track in self.get_playlist_tracks(playlist_id)]]
    data = {
        "ids": be_added_ids
    }
    response = self.session.post(self.url + f"api/playlist/{playlist_id}/tracks", json=data)
    if response.status_code != 200:
      raise Exception("Failed to add songs to playlist: " + response.text)
    
  def export_playlist(self, playlist_id: str) -> tuple:
    """
    Export a playlist by its ID.
    """
    response = self.session.get(self.url + f"api/playlist/{playlist_id}")
    if response.status_code != 200:
      raise Exception("Failed to export playlist: " + response.text)
    name = response.json().get("name", "")
    tracks = self.get_playlist_tracks(playlist_id)
    playlist = []
    for track in tracks:
      playlist.append(f"{track['title']} - {''.join(track['artist'])}")
    return name, playlist

歌单导出

第一步自然是将歌单进行导出了,由于网易云这些音乐软件已经太久太久没有使用过了,才发现现在的网易云颇有流氓潜质,歌单居然不能直接通过网页全部获取了,这让我使用爬虫方式一次性拖出所有的歌单的想法落了空。

不过好在发现了一个公开的接口,地址是:https://music.unmeta.cn/

提取了一下接口,并且考虑到兼容Navidrome的API,写了个函数:

def export_playlist(url: str):
  """
  Export a media playlist from the given URL using an external API.
  """
  regex = r'/playlist/([a-zA-Z0-9]{22})/'
  match = re.search(regex, url)
  if match: 
    playlist_id = match.group(1)
    name, songs = initialize_navidrome_api().export_playlist(playlist_id=playlist_id)
    smart_out(name, songs, format="Playlist Name: {}\nSongs: {}\n")
    exit(0)
 
  API_ENDPOINT = "https://sss.unmeta.cn/songlist?detailed=false&format=song-singer"
  response = requests.post(API_ENDPOINT, data={'url': url}, headers={
    'Content-Type': 'application/x-www-form-urlencoded',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36'
  })
  if response.status_code == 200 :
    data = response.json().get('data', None)
    if data and data['name'] and data['songs'] and len(data['songs']) > 0:
      smart_out(data['name'], data['songs'], format="Playlist Name: {}\nSongs: {}\n")
      exit(0)
    print("No data found in the response.", file=sys.stderr)
  print("Can't export playlist from the provided URL, might API-Endpoint issue.", file=sys.stderr)
  exit(1)

代码我会在最后的附件主题处给出,这里的smart_out也许会有点疑问,这个是兼容管道调用的智能输出函数。

歌单导入

注意在上面,我们的输出应该第一行是歌单的名字,第二行应该是一个列表,这里实际通过字面量解析即可获取。

总体来说没有什么特别需要注意的,检查Navidrome中的音乐库是否存在匹配项,然后创建或者添加已有的歌单即可:

def import_playlist(content: str, playlist_name: str = None):
  """
  Import a media playlist from the provided content using Navidrome API.
  """
  name, songs, *_ = content.strip().split('\n')
  navidrome = initialize_navidrome_api()
  if playlist_name:
    name = playlist_name
  try:
    import_songs = ast.literal_eval(songs)
    playlist_id = navidrome.get_playlist(name=name)[0]['id'] if navidrome.get_playlist(name=name) else navidrome.create_playlist(name=name)
    all_songs = navidrome.get_all_songs()
    import_ids = []
    actual_import_songs = []
    for import_song in import_songs:
      ratio, song_id = song_similarity(all_songs, import_song)
      if ratio >= MATCH_RATIO:
        import_ids.append(song_id)
        actual_import_songs.append(import_song)
      else:
        print(f"Song '{import_song}' not found in the server library.", file=sys.stderr)
    print(f"Found and added {len(import_ids)} songs to playlist {name}")
    print(f"Actual imported songs: {actual_import_songs}")
    navidrome.add_songs_to_playlist(playlist_id=playlist_id, song_ids=import_ids)
  except Exception as e:
    print("Error during import:", e, file=sys.stderr)
    exit(1)
  exit(0)

这里唯一需要做出说明的就是song_similarity这个函数,这个是用来判断从媒体库中查找匹配歌曲的函数,具体实现如下:

def clean(text: str) -> str:
    """
    Clean the input text by normalizing, stripping punctuation and whitespace.
    """
    return re.sub(r'[^\w\s]', '', unicodedata.normalize('NFKD', text).lower().strip())
 
 
def song_similarity(all_songs: list, test_song_name: str) -> tuple:
    """
    Calculate the similarity ratio between the test song name and all songs in the library.
    Returns the best similarity ratio and the corresponding song ID.
    """
    test_clean = clean(test_song_name)
    best_ratio = 0.0
    song_id = ''
    for song in all_songs:
        ratio = max(SequenceMatcher(None, test_clean, clean(f"{song['title']} {song['artist']}")).ratio(), SequenceMatcher(None, test_clean, clean(f"{song['artist']} {song['title']}")).ratio())
        if ratio > best_ratio:
          best_ratio = ratio
          song_id = song['id']
    return int(best_ratio * 100), song_id

注意这里通过歌曲名、歌曲作者进行判断,做字符串的相似度检查,然后返回最优匹配结果以及一个匹配程度,用来判断歌曲是否在曲库中存在,存在则导入。

Navidrome备份

注意Navidrome的备份我推荐直接备份sqlite数据库文件,这是最好的;这里的备份只是将歌单导出并存储了。

整体过程并不复杂,调用API后输出到对应的文件中即可:

def backup_playlist(directory: str):
  """
  Backup Navidrome playlists to the specified directory.
  """
  navidrome = initialize_navidrome_api()
  try:
    playlists = navidrome.get_playlist()
    if not os.path.exists(directory):
      os.makedirs(directory)
    for playlist in playlists:
      name, songs = navidrome.export_playlist(playlist_id=playlist['id'])
      backup_path = os.path.join(directory, f"{name}.playlist")
      with open(backup_path, 'w', encoding='utf-8') as f:
        f.write(f"{name}\n{songs}\n")
      print(f"Backed up playlist '{name}' to '{backup_path}'")
  except Exception as e:
    print("Error during backup:", e, file=sys.stderr)
    exit(1)
  exit(0) 

Navidrome还原

还原直接用歌单导入依次调用即可:

def rebuild_playlist(folder: str):
  """
  Rebuild Navidrome playlists from the specified folder.
  """
  for filename in os.listdir(folder):
    if filename.endswith(".playlist"):
      with open(os.path.join(folder, filename), 'r', encoding='utf-8') as f:
        content = f.read()
        try:
          import_playlist(content)
        except SystemExit as e:
          if e.code != 0:
            print(f"Failed to rebuild playlist from file '{filename}'", file=sys.stderr)
            exit(1)

附件 - 使用指南

备份Navidrome歌单

在脚本中设置好Navidrome相关设置,无需设置MATCH_RATIO属性:

 python3 .\playlist.py backup -d .\playlists\
Backed up playlist 'Test' to '.\playlists\Test.playlist'

还原Navidrome歌单

在脚本中设置好Navidrome相关设置,如果部分歌曲还原至歌单失败,则可以将MATCH_RATIO属性调低:

 python3 .\playlist.py rebuild -f .\playlists\
Found and added 1 songs to playlist 'Test': ['Test - Test Author']

导出各大平台歌单

无需设置,直接导出即可,注意链接为歌单的分享链接:

python3 .\playlist.py export "https://music.163.com/playlist?id=123&uct2=abc="
Playlist Name: Test
Songs: ['Test - Test Author']

注意如果需要将导出存储,请直接使用重定向到文件:

python3 .\playlist.py export "https://music.163.com/playlist?id=123&uct2=abc=" > test.playlist

这记录的值与输出其实不一致:

Test
['Test - Test Author']
 

该文件可被直接用于导入。

导入歌单

你可以通过-f指定歌单文件导入,也可以直接通过管道进行导入,通过文件方式:

 python3 .\playlist.py import -f .\english-1.playlist
Found and added 1 songs to playlist Test
Actual imported songs: ['Test - Test Author']

通过管道方式:

 cat .\english-1.playlist | python3 .\playlist.py import
Found and added 69 songs to playlist Test
Actual imported songs: ['Test - Test Author']

或者更快的方式:

python3 .\playlist.py export "https://music.163.com/playlist?id=123&uct2=abc=" | python3 .\playlist.py import
Found and added 69 songs to playlist Test
Actual imported songs: ['Test - Test Author']

附件 - 完整脚本

如果有人存在和我同样的需求,那么可以自取使用:

# -*- coding: UTF-8 -*-
 
import argparse
import ast
from difflib import SequenceMatcher
import re
import sys
import unicodedata
import requests
import os
 
NAVIDROME_URL = os.getenv("NAVIDROME_URL", "https://music.home.evalexp.top/")
NAVIDROME_USERNAME = os.getenv("NAVIDROME_USERNAME", "evalexp")
NAVIDROME_PASSWORD = os.getenv("NAVIDROME_PASSWORD", "d<+.Vaz6P;pf4#zZLcVe")
MATCH_RATIO = int(os.getenv("MATCH_RATIO", "90"))
 
NAVIDROME_API_INSTANCE = None
 
class NavidromeAPI:
  def __init__(self, url: str, username: str, password: str):
    self.url = url
    self.username = username
    self.password = password
    self.session = requests.Session()
    self.session.headers.update({
        "x-nd-authorization": f"Bearer {self.get_auth_token()}"
    })
 
  def get_auth_token(self) -> str:
    """
    Get the authentication token from the server.
    """
    data = {
        "username": self.username,
        "password": self.password
    }
    response = requests.post(self.url + "auth/login", json=data)
    if response.status_code != 200:
      raise Exception("Failed to get auth token: " + response.text)
    return response.json().get("token")
    
  def get_playlist(self, name: str = None) -> list | None: 
    """
    Get the playlist from the server using the authentication token.
    """
    response = self.session.get(self.url + "api/playlist")
    if response.status_code != 200:
      raise Exception("Failed to get playlist: " + response.text)
    if name:
      playlists = response.json()
      for playlist in playlists:
        if playlist['name'] == name:
          return [playlist]
      return None
    return response.json()
 
  def get_playlist_tracks(self, playlist_id: str) -> list:
    """
    Get the tracks of a specific playlist by its ID.
    """
    response = self.session.get(self.url + f"api/playlist/{playlist_id}/tracks")
    if response.status_code != 200:
      raise Exception("Failed to get playlist tracks: " + response.text)
    return response.json()
 
  def get_all_songs(self) -> list:
    """
    Get all songs from the server.
    """
    response = self.session.get(self.url + "api/song")
    if response.status_code != 200:
      raise Exception("Failed to get all songs: " + response.text)
    return response.json()
 
  def create_playlist(self, name: str) -> str:
    """
    Create a new playlist with the given name and return its ID.
    """
    exists_playlist = self.get_playlist()
    playlist_names = [playlist['name'] for playlist in exists_playlist]
    if name in playlist_names:
      raise Exception(f"Playlist '{name}' already exists.")
    data = {
        "name": name,
        "public": False
    }
    response = self.session.post(self.url + "api/playlist", json=data)
    if response.status_code == 200 and response.json().get("id") is not None:
        return response.json().get("id")
    else:
        raise Exception("Failed to create playlist: " + response.text)
    
  def add_songs_to_playlist(self, playlist_id: str, song_ids: list):
    """
    Add songs to a specific playlist by its ID.
    """
    be_added_ids = [id for id in song_ids if id not in [track['mediaFileId'] for track in self.get_playlist_tracks(playlist_id)]]
    data = {
        "ids": be_added_ids
    }
    response = self.session.post(self.url + f"api/playlist/{playlist_id}/tracks", json=data)
    if response.status_code != 200:
      raise Exception("Failed to add songs to playlist: " + response.text)
    
  def export_playlist(self, playlist_id: str) -> tuple:
    """
    Export a playlist by its ID.
    """
    response = self.session.get(self.url + f"api/playlist/{playlist_id}")
    if response.status_code != 200:
      raise Exception("Failed to export playlist: " + response.text)
    name = response.json().get("name", "")
    tracks = self.get_playlist_tracks(playlist_id)
    playlist = []
    for track in tracks:
      playlist.append(f"{track['title']} - {''.join(track['artist'])}")
    return name, playlist
 
def initialize_navidrome_api() -> NavidromeAPI:
  """
  Initialize a single NavidromeAPI instance with environment variables.
  """
  global NAVIDROME_API_INSTANCE
  if NAVIDROME_API_INSTANCE is None:
    NAVIDROME_API_INSTANCE = NavidromeAPI(
      url=NAVIDROME_URL,
      username=NAVIDROME_USERNAME,
      password=NAVIDROME_PASSWORD
    )
  return NAVIDROME_API_INSTANCE
 
def clean(text: str) -> str:
    """
    Clean the input text by normalizing, stripping whitespace.
    """
    return re.sub(r'\s', '', unicodedata.normalize('NFKD', text).lower().strip())
 
 
def song_similarity(all_songs: list, test_song_name: str) -> tuple:
    """
    Calculate the similarity ratio between the test song name and all songs in the library.
    Returns the best similarity ratio and the corresponding song ID.
    """
    test_clean = clean(test_song_name)
    best_ratio = 0.0
    song_id = ''
    # song_name = ''
    for song in all_songs:
        ratio = max(SequenceMatcher(None, test_clean, clean(f"{song['title']}-{song['artist']}")).ratio(), SequenceMatcher(None, test_clean, clean(f"{song['artist']}-{song['title']}")).ratio())
        if ratio > best_ratio:
          best_ratio = ratio
          song_id = song['id']
          # song_name = f"{song['title']}-{song['artist']}"
    return int(best_ratio * 100), song_id
 
 
def smart_out(*args, format: str = None, sep: str = ' ', end: str = '\n'):
  """
  A smart print function that handles terminal and non-terminal outputs.
  If the output is to a terminal, it uses formatted printing if a format string is provided.
  If the output is redirected (not a terminal), it prints each argument on a new line.
  """
  if sys.stdout.isatty():
    if format:
      try:
        print (format.format(*args), sep=sep, end=end)
      except Exception as e:
        print("Formatting error:", e, file=sys.stderr)
    else:
      print (*args, sep=sep, end=end)
  else:
    for arg in args:
      print (arg, end=end)
 
def from_pipe() -> str | None:
  """
  Read input from stdin if available.
  Returns the input as a string, or None if stdin is a terminal.
  """
  if sys.stdin.isatty():
    return None
  return smart_decode(sys.stdin.buffer.read())
 
def smart_decode(data: bytes) -> str | None:
  """
  Decode bytes data with multiple encoding attempts.
  Tries 'utf-8', 'utf-16', and 'gbk' encodings.
  Returns the decoded string, or None if all attempts fail.
  """
  encoding = ['utf-8', 'utf-16', 'gbk']
  for enc in encoding:
    try:
      return data.decode(enc)
    except UnicodeDecodeError:
      continue
  return None
 
def smart_read(path: str) -> str | None:
  """
  Read a file with multiple encoding attempts.
  Tries 'utf-8', 'utf-16', and 'gbk' encodings.
  Returns the content as a string, or None if all attempts fail.
  """
  with open(path, 'rb') as f:
    return smart_decode(f.read())
 
 
def export_playlist(url: str):
  """
  Export a media playlist from the given URL using an external API.
  """
  regex = r'/playlist/([a-zA-Z0-9]{22})/'
  match = re.search(regex, url)
  if match: 
    playlist_id = match.group(1)
    name, songs = initialize_navidrome_api().export_playlist(playlist_id=playlist_id)
    smart_out(name, songs, format="Playlist Name: {}\nSongs: {}\n")
    exit(0)
 
  API_ENDPOINT = "https://sss.unmeta.cn/songlist?detailed=false&format=song-singer"
  response = requests.post(API_ENDPOINT, data={'url': url}, headers={
    'Content-Type': 'application/x-www-form-urlencoded',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36'
  })
  if response.status_code == 200 :
    data = response.json().get('data', None)
    if data and data['name'] and data['songs'] and len(data['songs']) > 0:
      smart_out(data['name'], data['songs'], format="Playlist Name: {}\nSongs: {}\n")
      exit(0)
    print("No data found in the response.", file=sys.stderr)
  print("Can't export playlist from the provided URL, might API-Endpoint issue.", file=sys.stderr)
  exit(1)
 
def import_playlist(content: str, playlist_name: str = None):
  """
  Import a media playlist from the provided content using Navidrome API.
  """
  name, songs, *_ = content.strip().split('\n')
  navidrome = initialize_navidrome_api()
  if playlist_name:
    name = playlist_name
  try:
    import_songs = ast.literal_eval(songs)
    playlist_id = navidrome.get_playlist(name=name)[0]['id'] if navidrome.get_playlist(name=name) else navidrome.create_playlist(name=name)
    all_songs = navidrome.get_all_songs()
    import_ids = []
    actual_import_songs = []
    for import_song in import_songs:
      ratio, song_id = song_similarity(all_songs, import_song)
      if ratio >= MATCH_RATIO:
        import_ids.append(song_id)
        actual_import_songs.append(import_song)
      else:
        print(f"Song '{import_song}' not found in the server library, max similarity ratio: {ratio}", file=sys.stderr)
    print(f"Found and added {len(import_ids)} songs to playlist {name}")
    print(f"Actual imported songs: {actual_import_songs}")
    navidrome.add_songs_to_playlist(playlist_id=playlist_id, song_ids=import_ids)
  except Exception as e:
    print("Error during import:", e, file=sys.stderr)
    exit(1)
  exit(0)
 
def backup_playlist(directory: str):
  """
  Backup Navidrome playlists to the specified directory.
  """
  navidrome = initialize_navidrome_api()
  try:
    playlists = navidrome.get_playlist()
    if not os.path.exists(directory):
      os.makedirs(directory)
    for playlist in playlists:
      name, songs = navidrome.export_playlist(playlist_id=playlist['id'])
      backup_path = os.path.join(directory, f"{name}.playlist")
      with open(backup_path, 'w', encoding='utf-8') as f:
        f.write(f"{name}\n{songs}\n")
      print(f"Backed up playlist '{name}' to '{backup_path}'")
  except Exception as e:
    print("Error during backup:", e, file=sys.stderr)
    exit(1)
  exit(0) 
 
def rebuild_playlist(folder: str):
  """
  Rebuild Navidrome playlists from the specified folder.
  """
  for filename in os.listdir(folder):
    if filename.endswith(".playlist"):
      with open(os.path.join(folder, filename), 'r', encoding='utf-8') as f:
        content = f.read()
        try:
          import_playlist(content)
        except SystemExit as e:
          if e.code != 0:
            print(f"Failed to rebuild playlist from file '{filename}'", file=sys.stderr)
            exit(1)
 
def main():
  parser = argparse.ArgumentParser(
    description="A media playlist manager.",
    prog="playlist-manager"
  )
 
  subparsers = parser.add_subparsers(dest="command", required=True)
  export_parser = subparsers.add_parser(  
    "export",
    help="Export a media playlist from a given URL."
  )
  import_parser = subparsers.add_parser(
    "import",
    help="Import a media playlist from provided data."
  )
 
  export_parser.add_argument(
      "url",
      nargs="?",
      type=str,
      help="The URL of the media playlist to export."
  )
  import_parser.add_argument(
      "-f", "--file",
      type=str,
      help="The file of the playlist to import."
  )
  import_parser.add_argument(
      "-p", "--playlist",
      type=str,
      help="Navidrome playlist name, auto created if not exists."
  )
 
  backup_parser = subparsers.add_parser(
    "backup",
    help="Backup Navidrome playlist."
  )
  backup_parser.add_argument(
      "-d", "--directory",
      type=str,
      required=True, 
      help="The directory to save the backup files."
  )
 
  rebuild_parser = subparsers.add_parser(
    "rebuild",
    help="Rebuild Navidrome playlist."
  )
  rebuild_parser.add_argument(
    "-f", "--from-folder",
    type=str,
    required=True,
    help="The folder of the playlist to rebuild."
  )
 
  args = parser.parse_args()
  match args.command:
    case "export":
      export_playlist(args.url or from_pipe() or (print("No URL provided.", file=sys.stderr), exit(1))[0])
    case "import":
      if args.file:
        import_playlist(smart_read(args.file) or (print("Failed to read the file or unsupported encoding.", file=sys.stderr), exit(1))[0], playlist_name=args.playlist)
      else:
        import_playlist(from_pipe() or (print("No playlist data provided.", file=sys.stderr), exit(1))[0], playlist_name=args.playlist)
    case "backup":
      backup_playlist(args.directory)
    case "rebuild":
      rebuild_playlist(args.from_folder)
 
if __name__ == "__main__":
  main()
  print(from_pipe())