diff --git a/immich_auto_album.py b/immich_auto_album.py index 48c1de8..f9b28de 100644 --- a/immich_auto_album.py +++ b/immich_auto_album.py @@ -1,4 +1,5 @@ +from typing import Tuple import requests import argparse import logging @@ -148,19 +149,17 @@ requests_kwargs = { 'verify' : not insecure } -# Yield successive n-sized -# chunks from l. -def divide_chunks(l, n): - +def divide_chunks(l: list, n: int): + """Yield successive n-sized chunks from l. """ # looping till length l for i in range(0, len(l), n): yield l[i:i + n] -def parseSeparatedString(s: str, seprator: str): +def parseSeparatedString(s: str, seprator: str) -> Tuple[str, str]: """ Parse a key, value pair, separated by the provided separator. + That's the reverse of ShellArgs. - On the command line (argparse) a declaration will typically look like: foo=hello or @@ -186,9 +185,15 @@ def parseSeparatedStrings(items: list[str]) -> dict: d[key] = value return d -# Create album names from provided path_chunks string array -# based on supplied album_levels argument (either by level range or absolute album levels) -def create_album_name(path_chunks): +def create_album_name(path_chunks: list[str], album_separator: str) -> str: + """ + Create album names from provided path_chunks string array. + + The method uses global variables album_levels_range_arr or album_levels to + generate ablum names either by level range or absolute album levels. If multiple + album path chunks are used for album names they are separated by album_separator. + """ + album_name_chunks = () logging.debug("path chunks = %s", list(path_chunks)) # Check which path to take: album_levels_range or album_levels @@ -229,11 +234,22 @@ def create_album_name(path_chunks): if album_name_chunk_size < 0: album_name_chunks = path_chunks[album_name_chunk_size:] logging.debug("album_name_chunks = %s", album_name_chunks) - return album_level_separator.join(album_name_chunks) + return album_separator.join(album_name_chunks) -# Fetches assets from the Immich API -# Takes different API versions into account for compatibility -def fetchServerVersion(): +def fetchServerVersion() -> dict: + """ + Fetches the API version from the immich server. + + If the API endpoint does not yet exist, returns the latest version + before that API endpoint was introduced: 1.105.1 + + Returns + ------- + Dictionary with keys + - major + - minor + - patch + """ # This API call was only introduced with version 1.106.1, so it will fail # for older versions. # Initialize the version with the latest version without this API call @@ -247,17 +263,28 @@ def fetchServerVersion(): logging.info("Detected Immich server version %s.%s.%s or older", version['major'], version['minor'], version['patch']) return version -# Fetches assets from the Immich API -# Uses the /search/meta-data call. Much more efficient than the legacy method -# since this call allows to filter for assets that are not in an album only. -def fetchAssets(): + +def fetchAssets(isNotInAlbum: bool) -> list: + """ + Fetches assets from the Immich API. + + Uses the /search/meta-data call. Much more efficient than the legacy method + since this call allows to filter for assets that are not in an album only. + + Parameters + ---------- + isNotInAlbum : bool + Flag indicating whether to fetch only assets that are not part + of an album or not. + Returns + --------- + An array of asset objects + """ + assets = [] # prepare request body body = {} - # only request images that are not in any album if we are running in CREATE mode, - # otherwise we need all images, even if they are part of an album - if mode == SCRIPT_MODE_CREATE: - body['isNotInAlbum'] = 'true' + body['isNotInAlbum'] = isNotInAlbum # This API call allows a maximum page size of 1000 number_of_assets_to_fetch_per_request_search = min(1000, number_of_assets_to_fetch_per_request) @@ -285,17 +312,32 @@ def fetchAssets(): return assets -# Fetches albums from the Immich API def fetchAlbums(): + """Fetches albums from the Immich API""" + apiEndpoint = 'albums' r = requests.get(root_url+apiEndpoint, **requests_kwargs) r.raise_for_status() return r.json() -# Deletes an album identified by album['id'] -# Returns False if the album could not be deleted, otherwise True -def deleteAlbum(album): +def deleteAlbum(album: dict): + """ + Deletes an album identified by album['id'] + + If the album could not be deleted, logs an error. + + Parameters + ---------- + album : dict + Dictionary with the following keys: + - id + - albumName + + Returns + --------- + True if the album was deleted, otherwise False + """ apiEndpoint = 'albums' logging.debug("Album ID = %s, Album Name = %s", album['id'], album['albumName']) @@ -305,9 +347,26 @@ def deleteAlbum(album): return False return True -# Creates an album with the provided name and returns the ID of the -# created album -def createAlbum(albumName): + +def createAlbum(albumName: str) -> str: + """ + Creates an album with the provided name and returns the ID of the created album + + + Parameters + ---------- + albumName : str + Name of the album to create + + Returns + --------- + True if the album was deleted, otherwise False + + Raises + ---------- + Exception if the API call failed + """ + apiEndpoint = 'albums' data = { @@ -318,8 +377,23 @@ def createAlbum(albumName): assert r.status_code in [200, 201] return r.json()['id'] -# Adds the provided assetIds to the provided albumId -def addAssetsToAlbum(albumId, assets): + +def addAssetsToAlbum(albumId: str, assets: list[str]): + """ + Adds the assets IDs provided in assets to the provided albumId. + + If assets if larger than number_of_images_per_request, the list is chunked + and one API call is performed per chunk. + Only logs errors and successes. + + + Parameters + ---------- + albumId : str + The ID of the album to add assets to + assets: list[str] + A list of asset IDs to add to the album + """ apiEndpoint = 'albums' # Divide our assets into chunks of number_of_images_per_request, @@ -346,19 +420,39 @@ def addAssetsToAlbum(albumId, assets): if cpt > 0: logging.info("%d new assets added to %s", cpt, album) -# Queries and returns all users def fetchUsers(): + """Queries and returns all users""" + apiEndpoint = 'users' r = requests.get(root_url+apiEndpoint, **requests_kwargs) assert r.status_code in [200, 201] return r.json() -# Shares the album with the provided album_id with all provided share_user_ids -# using share_role as a role. -def shareAlbumWithUserAndRole(album_id, share_user_ids, share_role): +def shareAlbumWithUserAndRole(album_id: str, share_user_ids: list[str], share_role: str): + """ + Shares the album with the provided album_id with all provided share_user_ids + using share_role as a role. + + Parameters + ---------- + album_id : str + The ID of the album to share + share_user_ids: list[str] + IDs of users to share the album with + share_role: str + The share role to use when sharing the album, valid values are + "viewer" or "editor" + Raises + ---------- + Exception if share_role contains an invalid value + Exception if the API call failed + """ + apiEndpoint = 'albums/'+album_id+'/users' + assert share_role in SHARE_ROLES + # build payload album_users = [] for share_user_id in share_user_ids: @@ -414,7 +508,12 @@ if mode == SCRIPT_MODE_DELETE_ALL: exit(0) logging.info("Requesting all assets") -assets = fetchAssets() +# only request images that are not in any album if we are running in CREATE mode, +# otherwise we need all images, even if they are part of an album +if mode == SCRIPT_MODE_CREATE: + assets = fetchAssets(True) +else: + assets = fetchAssets(False) logging.info("%d photos found", len(assets)) @@ -445,7 +544,7 @@ for asset in assets: # remove last item from path chunks, which is the file name del path_chunks[-1] - album_name = create_album_name(path_chunks) + album_name = create_album_name(path_chunks, album_level_separator) if len(album_name) > 0: album_to_assets[album_name].append(asset['id']) else: