import requests import argparse import logging import sys import os import datetime from collections import defaultdict import urllib3 # Trying to deal with python's isnumeric() function # not recognizing negative numbers def is_integer(str): try: int(str) return True except ValueError: return False # Constants holding script run modes # Creat albums based on folder names and script arguments SCRIPT_MODE_CREATE = "CREATE" # Create album names based on folder names, but delete these albums SCRIPT_MODE_CLEANUP = "CLEANUP" # Delete ALL albums SCRIPT_MODE_DELETE_ALL = "DELETE_ALL" # Environment variable to check if the script is running inside Docker ENV_IS_DOCKER = "IS_DOCKER" # List of allowed share user roles SHARE_ROLES = ["editor", "viewer"] parser = argparse.ArgumentParser(description="Create Immich Albums from an external library path based on the top level folders", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("root_path", action='append', help="The external libarary's root path in Immich") parser.add_argument("api_url", help="The root API URL of immich, e.g. https://immich.mydomain.com/api/") parser.add_argument("api_key", help="The Immich API Key to use") parser.add_argument("-r", "--root-path", action="append", help="Additional external libarary root path in Immich; May be specified multiple times for multiple import paths or external libraries.") parser.add_argument("-u", "--unattended", action="store_true", help="Do not ask for user confirmation after identifying albums. Set this flag to run script as a cronjob.") parser.add_argument("-a", "--album-levels", default="1", type=str, help="Number of sub-folders or range of sub-folder levels below the root path used for album name creation. Positive numbers start from top of the folder structure, negative numbers from the bottom. Cannot be 0. If a range should be set, the start level and end level must be separated by a comma like ','. If negative levels are used in a range, must be less than or equal to .") parser.add_argument("-s", "--album-separator", default=" ", type=str, help="Separator string to use for compound album names created from nested folders. Only effective if -a is set to a value > 1") parser.add_argument("-c", "--chunk-size", default=2000, type=int, help="Maximum number of assets to add to an album with a single API call") parser.add_argument("-C", "--fetch-chunk-size", default=5000, type=int, help="Maximum number of assets to fetch with a single API call") parser.add_argument("-l", "--log-level", default="INFO", choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], help="Log level to use") parser.add_argument("-k", "--insecure", action="store_true", help="Set to true to ignore SSL verification") parser.add_argument("-i", "--ignore", default="", type=str, help="A string containing a list of folders, sub-folder sequences or file names separated by ':' that will be ignored.") parser.add_argument("-m", "--mode", default=SCRIPT_MODE_CREATE, choices=[SCRIPT_MODE_CREATE, SCRIPT_MODE_CLEANUP, SCRIPT_MODE_DELETE_ALL], help="Mode for the script to run with. CREATE = Create albums based on folder names and provided arguments; CLEANUP = Create album nmaes based on current images and script arguments, but delete albums if they exist; DELETE_ALL = Delete all albums. If the mode is anything but CREATE, --unattended does not have any effect. Only performs deletion if -d/--delete-confirm option is set, otherwise only performs a dry-run.") parser.add_argument("-d", "--delete-confirm", action="store_true", help="Confirm deletion of albums when running in mode "+SCRIPT_MODE_CLEANUP+" or "+SCRIPT_MODE_DELETE_ALL+". If this flag is not set, these modes will perform a dry run only. Has no effect in mode "+SCRIPT_MODE_CREATE) parser.add_argument("-x", "--share-with", action="append", help="A user name (or email address of an existing user) to share newly created albums with. Sharing only happens if the album was actually created, not if new assets were added to an existing album. If the the share role should be specified by user, the format = must be used, where must be one of 'viewer' or 'editor'. May be specified multiple times to share albums with more than one user.") parser.add_argument("-o", "--share-role", default="viewer", choices=['viewer', 'editor'], help="The default share role for users newly created albums are shared with. Only effective if --share-with is specified at least once and the share role is not specified within --share-with.") args = vars(parser.parse_args()) # set up logger to log in logfmt format logging.basicConfig(level=args["log_level"], stream=sys.stdout, format='time=%(asctime)s level=%(levelname)s msg=%(message)s') logging.Formatter.formatTime = (lambda self, record, datefmt=None: datetime.datetime.fromtimestamp(record.created, datetime.timezone.utc).astimezone().isoformat(sep="T",timespec="milliseconds")) root_paths = args["root_path"] root_url = args["api_url"] api_key = args["api_key"] number_of_images_per_request = args["chunk_size"] number_of_assets_to_fetch_per_request = args["fetch_chunk_size"] unattended = args["unattended"] album_levels = args["album_levels"] # Album Levels Range handling album_levels_range_arr = () album_level_separator = args["album_separator"] insecure = args["insecure"] ignore_albums = args["ignore"] mode = args["mode"] delete_confirm = args["delete_confirm"] share_with = args["share_with"] share_role = args["share_role"] # Override unattended if we're running in destructive mode if mode != SCRIPT_MODE_CREATE: unattended = False is_docker = os.environ.get(ENV_IS_DOCKER, False) logging.debug("root_path = %s", root_paths) logging.debug("root_url = %s", root_url) logging.debug("api_key = %s", api_key) logging.debug("number_of_images_per_request = %d", number_of_images_per_request) logging.debug("number_of_assets_to_fetch_per_request = %d", number_of_assets_to_fetch_per_request) logging.debug("unattended = %s", unattended) logging.debug("album_levels = %s", album_levels) #logging.debug("album_levels_range = %s", album_levels_range) logging.debug("album_level_separator = %s", album_level_separator) logging.debug("insecure = %s", insecure) logging.debug("ignore = %s", ignore_albums) logging.debug("mode = %s", mode) logging.debug("delete_confirm = %s", delete_confirm) logging.debug("is_docker = %s", is_docker) logging.debug("share_with = %s", share_with) logging.debug("share_role = %s", share_role) # Verify album levels if is_integer(album_levels) and album_levels == 0: parser.print_help() exit(1) if insecure: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Verify album levels range if not is_integer(album_levels): album_levels_range_split = album_levels.split(",") if (len(album_levels_range_split) != 2 or not is_integer(album_levels_range_split[0]) or not is_integer(album_levels_range_split[1]) or int(album_levels_range_split[0]) == 0 or int(album_levels_range_split[1]) == 0 or (int(album_levels_range_split[0]) >= 0 and int(album_levels_range_split[1]) < 0) or (int(album_levels_range_split[0]) < 0 and int(album_levels_range_split[1]) >= 0) or (int(album_levels_range_split[0]) < 0 and int(album_levels_range_split[1]) < 0) and int(album_levels_range_split[0]) > int(album_levels_range_split[1])): logging.error("Invalid album_levels range format! If a range should be set, the start level and end level must be separated by a comma like ','. If negative levels are used in a range, must be less than or equal to .") exit(1) album_levels_range_arr = album_levels_range_split # Convert to int album_levels_range_arr[0] = int(album_levels_range_split[0]) album_levels_range_arr[1] = int(album_levels_range_split[1]) # Special case: both levels are negative and end level is -1, which is equivalent to just negative album level of start level if(album_levels_range_arr[0] < 0 and album_levels_range_arr[1] == -1): album_levels = album_levels_range_arr[0] album_levels_range_arr = () logging.debug("album_levels is a range with negative start level and end level of -1, converted to album_levels = %d", album_levels) else: logging.debug("valid album_levels range argument supplied") logging.debug("album_levels_start_level = %d", album_levels_range_arr[0]) logging.debug("album_levels_end_level = %d", album_levels_range_arr[1]) # Deduct 1 from album start levels, since album levels start at 1 for user convenience, but arrays start at index 0 if album_levels_range_arr[0] > 0: album_levels_range_arr[0] -= 1 album_levels_range_arr[1] -= 1 if not ignore_albums == "": ignore_albums = ignore_albums.split(":") else: ignore_albums = False # Request arguments for API calls requests_kwargs = { 'headers' : { 'x-api-key': api_key, 'Content-Type': 'application/json', 'Accept': 'application/json' }, 'verify' : not insecure } # Yield successive n-sized # chunks from l. def divide_chunks(l, n): # looping till length l for i in range(0, len(l), n): yield l[i:i + n] def parseSeparatedString(s: str, seprator: str): """ Parse a key, value pair, separated by the provided separator. That's the reverse of ShellArgs. On the command line (argparse) a declaration will typically look like: foo=hello or foo="hello world" """ items = s.split(seprator) key = items[0].strip() # we remove blanks around keys, as is logical value = None if len(items) > 1: # rejoin the rest: value = seprator.join(items[1:]) return (key, value) def parseSeparatedStrings(items: list[str]) -> dict: """ Parse a series of key-value pairs and return a dictionary """ d = {} if items: for item in items: key, value = parseSeparatedString(item, '=') d[key] = value return d # Create album names from provided path_chunks string array # based on supplied album_levels argument (either by level range or absolute album levels) def create_album_name(path_chunks): album_name_chunks = () logging.debug("path chunks = %s", list(path_chunks)) # Check which path to take: album_levels_range or album_levels if len(album_levels_range_arr) == 2: if album_levels_range_arr[0] < 0: album_levels_start_level_capped = min(len(path_chunks), abs(album_levels_range_arr[0])) album_levels_end_level_capped = album_levels_range_arr[1]+1 album_levels_start_level_capped *= -1 else: album_levels_start_level_capped = min(len(path_chunks)-1, album_levels_range_arr[0]) # Add 1 to album_levels_end_level_capped to include the end index, which is what the user intended to. It's not a problem # if the end index is out of bounds. album_levels_end_level_capped = min(len(path_chunks)-1, album_levels_range_arr[1]) + 1 logging.debug("album_levels_start_level_capped = %d", album_levels_start_level_capped) logging.debug("album_levels_end_level_capped = %d", album_levels_end_level_capped) # album start level is not equal to album end level, so we want a range of levels if album_levels_start_level_capped is not album_levels_end_level_capped: # if the end index is out of bounds. if album_levels_end_level_capped < 0 and abs(album_levels_end_level_capped) >= len(path_chunks): album_name_chunks = path_chunks[album_levels_start_level_capped:] else: album_name_chunks = path_chunks[album_levels_start_level_capped:album_levels_end_level_capped] # album start and end levels are equal, we want exactly that level else: # create on-the-fly array with a single element taken from album_name_chunks = [path_chunks[album_levels_start_level_capped]] else: album_levels_int = int(album_levels) # either use as many path chunks as we have, # or the specified album levels album_name_chunk_size = min(len(path_chunks), abs(album_levels_int)) if album_levels_int < 0: album_name_chunk_size *= -1 # Copy album name chunks from the path to use as album name album_name_chunks = path_chunks[:album_name_chunk_size] if album_name_chunk_size < 0: album_name_chunks = path_chunks[album_name_chunk_size:] logging.debug("album_name_chunks = %s", album_name_chunks) return album_level_separator.join(album_name_chunks) # Fetches assets from the Immich API # Takes different API versions into account for compatibility def fetchServerVersion(): # This API call was only introduced with version 1.106.1, so it will fail # for older versions. # Initialize the version with the latest version without this API call version = {'major': 1, 'minor': 105, "patch": 1} r = requests.get(root_url+'server-info/version', **requests_kwargs) assert r.status_code == 200 or r.status_code == 404 if r.status_code == 200: version = r.json() logging.info("Detected Immich server version %s.%s.%s", version['major'], version['minor'], version['patch']) else: logging.info("Detected Immich server version %s.%s.%s or older", version['major'], version['minor'], version['patch']) return version # Fetches assets from the Immich API # Uses the /search/meta-data call. Much more efficient than the legacy method # since this call allows to filter for assets that are not in an album only. def fetchAssets(): assets = [] # prepare request body body = {} # only request images that are not in any album if we are running in CREATE mode, # otherwise we need all images, even if they are part of an album if mode == SCRIPT_MODE_CREATE: body['isNotInAlbum'] = 'true' # This API call allows a maximum page size of 1000 number_of_assets_to_fetch_per_request_search = min(1000, number_of_assets_to_fetch_per_request) body['size'] = number_of_assets_to_fetch_per_request_search # Initial API call, let's fetch our first chunk page = 1 body['page'] = str(page) r = requests.post(root_url+'search/metadata', json=body, **requests_kwargs) r.raise_for_status() responseJson = r.json() assetsReceived = responseJson['assets']['items'] logging.debug("Received %s assets with chunk %s", len(assetsReceived), page) assets = assets + assetsReceived # If we got a full chunk size back, let's perfrom subsequent calls until we get less than a full chunk size while len(assetsReceived) == number_of_assets_to_fetch_per_request_search: page += 1 body['page'] = page r = requests.post(root_url+'search/metadata', json=body, **requests_kwargs) assert r.status_code == 200 responseJson = r.json() assetsReceived = responseJson['assets']['items'] logging.debug("Received %s assets with chunk %s", len(assetsReceived), page) assets = assets + assetsReceived return assets # Fetches albums from the Immich API def fetchAlbums(): apiEndpoint = 'albums' r = requests.get(root_url+apiEndpoint, **requests_kwargs) r.raise_for_status() return r.json() # Deletes an album identified by album['id'] # Returns False if the album could not be deleted, otherwise True def deleteAlbum(album): apiEndpoint = 'albums' logging.debug("Album ID = %s, Album Name = %s", album['id'], album['albumName']) r = requests.delete(root_url+apiEndpoint+'/'+album['id'], **requests_kwargs) if r.status_code not in [200, 201]: logging.error("Error deleting album %s: %s", album['albumName'], r.reason) return False return True # Creates an album with the provided name and returns the ID of the # created album def createAlbum(albumName): apiEndpoint = 'albums' data = { 'albumName': albumName, 'description': albumName } r = requests.post(root_url+apiEndpoint, json=data, **requests_kwargs) assert r.status_code in [200, 201] return r.json()['id'] # Adds the provided assetIds to the provided albumId def addAssetsToAlbum(albumId, assets): apiEndpoint = 'albums' # Divide our assets into chunks of number_of_images_per_request, # So the API can cope assets_chunked = list(divide_chunks(assets, number_of_images_per_request)) for assets_chunk in assets_chunked: data = {'ids':assets_chunk} r = requests.put(root_url+apiEndpoint+f'/{albumId}/assets', json=data, **requests_kwargs) if r.status_code not in [200, 201]: print(album) print(r.json()) print(data) continue assert r.status_code in [200, 201] response = r.json() cpt = 0 for res in response: if not res['success']: if res['error'] != 'duplicate': logging.warning("Error adding an asset to an album: %s", res['error']) else: cpt += 1 if cpt > 0: logging.info("%d new assets added to %s", cpt, album) # Queries and returns all users def fetchUsers(): apiEndpoint = 'users' r = requests.get(root_url+apiEndpoint, **requests_kwargs) assert r.status_code in [200, 201] return r.json() # Shares the album with the provided album_id with all provided share_user_ids # using share_role as a role. def shareAlbumWithUserAndRole(album_id, share_user_ids, share_role): apiEndpoint = 'albums/'+album_id+'/users' # build payload album_users = [] for share_user_id in share_user_ids: share_info = dict() share_info['role'] = share_role share_info['userId'] = share_user_id album_users.append(share_info) data = { 'albumUsers': album_users } r = requests.put(root_url+apiEndpoint, json=data, **requests_kwargs) assert r.status_code in [200, 201] # append trailing slash to all root paths for i in range(len(root_paths)): if root_paths[i][-1] != '/': root_paths[i] = root_paths[i] + '/' # append trailing slash to root URL if root_url[-1] != '/': root_url = root_url + '/' version = fetchServerVersion() # Check version if version['major'] == 1 and version ['minor'] < 106: logging.fatal("This script only works with Immich Server v1.106.0 and newer! Update Immich Server or use script version 0.8.1!") exit(1) # Special case: Run Mode DELETE_ALL albums if mode == SCRIPT_MODE_DELETE_ALL: albums = fetchAlbums() logging.info("%d existing albums identified", len(albums)) # Delete Confirm check if not delete_confirm: album_names = [] for album in albums: album_names.append(album['albumName']) print("Would delete the following albums (ALL albums!):") print(album_names) if is_docker: print("Run the container with environment variable DELETE_CONFIRM set to 1 to actually delete these albums!") else: print("Call with --delete-confirm to actually delete albums!") exit(0) cpt = 0 for album in albums: if deleteAlbum(album): logging.info("Deleted album %s", album['albumName']) cpt += 1 logging.info("Deleted %d/%d albums", cpt, len(albums)) exit(0) logging.info("Requesting all assets") assets = fetchAssets() logging.info("%d photos found", len(assets)) logging.info("Sorting assets to corresponding albums using folder name") album_to_assets = defaultdict(list) for asset in assets: asset_path = asset['originalPath'] for root_path in root_paths: if root_path not in asset_path: continue # Check ignore_albums ignore = False if ignore_albums: for ignore_entry in ignore_albums: if ignore_entry in asset_path: ignore = True break if ignore: logging.debug("Ignoring asset %s due to ignore_albums setting!", asset_path) continue # Chunks of the asset's path below root_path path_chunks = asset_path.replace(root_path, '').split('/') # A single chunk means it's just the image file in no sub folder, ignore if len(path_chunks) == 1: continue # remove last item from path chunks, which is the file name del path_chunks[-1] album_name = create_album_name(path_chunks) if len(album_name) > 0: album_to_assets[album_name].append(asset['id']) else: logging.warning("Got empty album name for asset path %s, check your album_level settings!", asset_path) album_to_assets = {k:v for k, v in sorted(album_to_assets.items(), key=(lambda item: item[0]))} logging.info("%d albums identified", len(album_to_assets)) logging.info("Album list: %s", list(album_to_assets.keys())) if not unattended and mode == SCRIPT_MODE_CREATE: if is_docker: print("Check that this is the list of albums you want to create. Run the container with environment variable UNATTENDED set to 1 to actually create these albums.") exit(0) else: print("Press enter to create these albums, Ctrl+C to abort") input() album_to_id = {} logging.info("Listing existing albums on immich") albums = fetchAlbums() album_to_id = {album['albumName']:album['id'] for album in albums } logging.info("%d existing albums identified", len(albums)) # mode CLEANUP if mode == SCRIPT_MODE_CLEANUP: # Delete Confirm check if not delete_confirm: print("Would delete the following albums:") print(list(album_to_id.keys())) if is_docker: print("Run the container with environment variable DELETE_CONFIRM set to 1 to actually delete these albums!") else: print(" Call with --delete-confirm to actually delete albums!") exit(0) cpt = 0 for album in album_to_assets: if album in album_to_id: album_to_delete = dict() album_to_delete['id'] = album_to_id[album] album_to_delete['albumName'] = album if deleteAlbum(album_to_delete): logging.info("Deleted album %s", album_to_delete['albumName']) cpt += 1 logging.info("Deleted %d/%d albums", cpt, len(album_to_assets)) exit(0) # mode CREATE logging.info("Creating albums if needed") created_albums = dict() for album in album_to_assets: if album in album_to_id: continue album_id = createAlbum(album) album_to_id[album] = album_id created_albums[album] = album_id logging.info('Album %s added!', album) logging.info("%d albums created", len(created_albums)) # Share newly created albums with users if share_with is not None and len(created_albums) > 0: logging.info("Sharing created albums with users") share_user_roles = parseSeparatedStrings(share_with) logging.debug("Share User Roles: %s", share_user_roles) # Get all users users = fetchUsers() logging.debug("Found users: %s", users) # Initialize dicitionary of share roles to user IDs to share with roles_for_share_user_ids = dict() for allowed_role in SHARE_ROLES: roles_for_share_user_ids[allowed_role] = list() # Search user IDs of users to share with for share_user in share_user_roles.keys(): role = share_user_roles[share_user] # search user ID by name or email found_user = False if role == None: role = share_role logging.debug("No explicit share role passed for share user %s, using default role %s", share_user, share_role) elif role not in SHARE_ROLES: role = share_role logging.warning("Passed share role %s for user %s is not allowed, defaulting to %s", role, share_user, share_role) else: logging.debug("Explicit share role %s passed for share user %s", role, share_user) for user in users: # Search by name or mail address if user['name'] == share_user or user['email'] == share_user: share_user_id = user['id'] logging.debug("User %s has ID %s", share_user, share_user_id) roles_for_share_user_ids[role].append(share_user_id) found_user = True break if not found_user: logging.warning("User %s to share albums with does not exist!", share_user) shared_album_cnt = 0 # Only try sharing if we found at least one user ID to share with for share_album in created_albums.keys(): album_shared_successfully = False for role in roles_for_share_user_ids.keys(): share_user_ids = roles_for_share_user_ids[role] if len(share_user_ids) > 0: try: shareAlbumWithUserAndRole(created_albums[share_album], share_user_ids, role) logging.debug("Album %s shared with users IDs %s in role: %s)", share_album, share_user_ids, role) album_shared_successfully = True except: logging.warning("Error sharing album %s for users %s in role %s", share_album, share_user_ids, role) album_shared_successfully = False if album_shared_successfully: shared_album_cnt += 1 logging.info("Successfully shared %d/%d albums", shared_album_cnt, len(created_albums)) logging.info("Adding assets to albums") # Note: Immich manages duplicates without problem, # so we can each time ad all assets to same album, no photo will be duplicated for album, assets in album_to_assets.items(): id = album_to_id[album] addAssetsToAlbum(id, assets) logging.info("Done!")