6638166e05
Added more clear user hints for how to run the container for creating/deleting albums depending on whether the script is running bare python or in Docker
485 lines
22 KiB
Python
485 lines
22 KiB
Python
|
|
import requests
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
import os
|
|
import datetime
|
|
from collections import defaultdict
|
|
import urllib3
|
|
|
|
# Trying to deal with python's isnumeric() function
|
|
# not recognizing negative numbers
|
|
def is_integer(str):
|
|
try:
|
|
int(str)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
# Constants holding script run modes
|
|
# Creat albums based on folder names and script arguments
|
|
SCRIPT_MODE_CREATE = "CREATE"
|
|
# Create album names based on folder names, but delete these albums
|
|
SCRIPT_MODE_CLEANUP = "CLEANUP"
|
|
# Delete ALL albums
|
|
SCRIPT_MODE_DELETE_ALL = "DELETE_ALL"
|
|
# Environment variable to check if the script is running inside Docker
|
|
ENV_IS_DOCKER = "IS_DOCKER"
|
|
|
|
|
|
parser = argparse.ArgumentParser(description="Create Immich Albums from an external library path based on the top level folders", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
parser.add_argument("root_path", action='append', help="The external libarary's root path in Immich")
|
|
parser.add_argument("api_url", help="The root API URL of immich, e.g. https://immich.mydomain.com/api/")
|
|
parser.add_argument("api_key", help="The Immich API Key to use")
|
|
parser.add_argument("-r", "--root-path", action="append", help="Additional external libarary root path in Immich; May be specified multiple times for multiple import paths or external libraries.")
|
|
parser.add_argument("-u", "--unattended", action="store_true", help="Do not ask for user confirmation after identifying albums. Set this flag to run script as a cronjob.")
|
|
parser.add_argument("-a", "--album-levels", default="1", type=str, help="Number of sub-folders or range of sub-folder levels below the root path used for album name creation. Positive numbers start from top of the folder structure, negative numbers from the bottom. Cannot be 0. If a range should be set, the start level and end level must be separated by a comma like '<startLevel>,<endLevel>'. If negative levels are used in a range, <startLevel> must be less than or equal to <endLevel>.")
|
|
parser.add_argument("-s", "--album-separator", default=" ", type=str, help="Separator string to use for compound album names created from nested folders. Only effective if -a is set to a value > 1")
|
|
parser.add_argument("-c", "--chunk-size", default=2000, type=int, help="Maximum number of assets to add to an album with a single API call")
|
|
parser.add_argument("-C", "--fetch-chunk-size", default=5000, type=int, help="Maximum number of assets to fetch with a single API call")
|
|
parser.add_argument("-l", "--log-level", default="INFO", choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], help="Log level to use")
|
|
parser.add_argument("-k", "--insecure", action="store_true", help="Set to true to ignore SSL verification")
|
|
parser.add_argument("-i", "--ignore", default="", type=str, help="A string containing a list of folders, sub-folder sequences or file names separated by ':' that will be ignored.")
|
|
parser.add_argument("-m", "--mode", default=SCRIPT_MODE_CREATE, choices=[SCRIPT_MODE_CREATE, SCRIPT_MODE_CLEANUP, SCRIPT_MODE_DELETE_ALL], help="Mode for the script to run with. CREATE = Create albums based on folder names and provided arguments; CLEANUP = Create album nmaes based on current images and script arguments, but delete albums if they exist; DELETE_ALL = Delete all albums. If the mode is anything but CREATE, --unattended does not have any effect. Only performs deletion if -d/--delete-confirm option is set, otherwise only performs a dry-run.")
|
|
parser.add_argument("-d", "--delete-confirm", action="store_true", help="Confirm deletion of albums when running in mode "+SCRIPT_MODE_CLEANUP+" or "+SCRIPT_MODE_DELETE_ALL+". If this flag is not set, these modes will perform a dry run only. Has no effect in mode "+SCRIPT_MODE_CREATE)
|
|
args = vars(parser.parse_args())
|
|
# set up logger to log in logfmt format
|
|
logging.basicConfig(level=args["log_level"], stream=sys.stdout, format='time=%(asctime)s level=%(levelname)s msg=%(message)s')
|
|
logging.Formatter.formatTime = (lambda self, record, datefmt=None: datetime.datetime.fromtimestamp(record.created, datetime.timezone.utc).astimezone().isoformat(sep="T",timespec="milliseconds"))
|
|
|
|
|
|
root_paths = args["root_path"]
|
|
root_url = args["api_url"]
|
|
api_key = args["api_key"]
|
|
number_of_images_per_request = args["chunk_size"]
|
|
number_of_assets_to_fetch_per_request = args["fetch_chunk_size"]
|
|
unattended = args["unattended"]
|
|
album_levels = args["album_levels"]
|
|
# Album Levels Range handling
|
|
album_levels_range_arr = ()
|
|
album_level_separator = args["album_separator"]
|
|
insecure = args["insecure"]
|
|
ignore_albums = args["ignore"]
|
|
mode = args["mode"]
|
|
delete_confirm = args["delete_confirm"]
|
|
|
|
# Override unattended if we're running in destructive mode
|
|
if mode != SCRIPT_MODE_CREATE:
|
|
unattended = False
|
|
|
|
is_docker = os.environ.get(ENV_IS_DOCKER, False)
|
|
|
|
logging.debug("root_path = %s", root_paths)
|
|
logging.debug("root_url = %s", root_url)
|
|
logging.debug("api_key = %s", api_key)
|
|
logging.debug("number_of_images_per_request = %d", number_of_images_per_request)
|
|
logging.debug("number_of_assets_to_fetch_per_request = %d", number_of_assets_to_fetch_per_request)
|
|
logging.debug("unattended = %s", unattended)
|
|
logging.debug("album_levels = %s", album_levels)
|
|
#logging.debug("album_levels_range = %s", album_levels_range)
|
|
logging.debug("album_level_separator = %s", album_level_separator)
|
|
logging.debug("insecure = %s", insecure)
|
|
logging.debug("ignore = %s", ignore_albums)
|
|
logging.debug("mode = %s", mode)
|
|
logging.debug("delete_confirm = %s", delete_confirm)
|
|
logging.debug("is_docker = %s", is_docker)
|
|
|
|
# Verify album levels
|
|
if is_integer(album_levels) and album_levels == 0:
|
|
parser.print_help()
|
|
exit(1)
|
|
|
|
if insecure:
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
# Verify album levels range
|
|
if not is_integer(album_levels):
|
|
album_levels_range_split = album_levels.split(",")
|
|
if (len(album_levels_range_split) != 2
|
|
or not is_integer(album_levels_range_split[0])
|
|
or not is_integer(album_levels_range_split[1])
|
|
or int(album_levels_range_split[0]) == 0
|
|
or int(album_levels_range_split[1]) == 0
|
|
or (int(album_levels_range_split[0]) >= 0 and int(album_levels_range_split[1]) < 0)
|
|
or (int(album_levels_range_split[0]) < 0 and int(album_levels_range_split[1]) >= 0)
|
|
or (int(album_levels_range_split[0]) < 0 and int(album_levels_range_split[1]) < 0) and int(album_levels_range_split[0]) > int(album_levels_range_split[1])):
|
|
logging.error("Invalid album_levels range format! If a range should be set, the start level and end level must be separated by a comma like '<startLevel>,<endLevel>'. If negative levels are used in a range, <startLevel> must be less than or equal to <endLevel>.")
|
|
exit(1)
|
|
album_levels_range_arr = album_levels_range_split
|
|
# Convert to int
|
|
album_levels_range_arr[0] = int(album_levels_range_split[0])
|
|
album_levels_range_arr[1] = int(album_levels_range_split[1])
|
|
# Special case: both levels are negative and end level is -1, which is equivalent to just negative album level of start level
|
|
if(album_levels_range_arr[0] < 0 and album_levels_range_arr[1] == -1):
|
|
album_levels = album_levels_range_arr[0]
|
|
album_levels_range_arr = ()
|
|
logging.debug("album_levels is a range with negative start level and end level of -1, converted to album_levels = %d", album_levels)
|
|
else:
|
|
logging.debug("valid album_levels range argument supplied")
|
|
logging.debug("album_levels_start_level = %d", album_levels_range_arr[0])
|
|
logging.debug("album_levels_end_level = %d", album_levels_range_arr[1])
|
|
# Deduct 1 from album start levels, since album levels start at 1 for user convenience, but arrays start at index 0
|
|
if album_levels_range_arr[0] > 0:
|
|
album_levels_range_arr[0] -= 1
|
|
album_levels_range_arr[1] -= 1
|
|
|
|
if not ignore_albums == "":
|
|
ignore_albums = ignore_albums.split(":")
|
|
else:
|
|
ignore_albums = False
|
|
|
|
# Request arguments for API calls
|
|
requests_kwargs = {
|
|
'headers' : {
|
|
'x-api-key': api_key,
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
},
|
|
'verify' : not insecure
|
|
}
|
|
|
|
# Yield successive n-sized
|
|
# chunks from l.
|
|
def divide_chunks(l, n):
|
|
|
|
# looping till length l
|
|
for i in range(0, len(l), n):
|
|
yield l[i:i + n]
|
|
|
|
# Create album names from provided path_chunks string array
|
|
# based on supplied album_levels argument (either by level range or absolute album levels)
|
|
def create_album_name(path_chunks):
|
|
album_name_chunks = ()
|
|
logging.debug("path chunks = %s", list(path_chunks))
|
|
# Check which path to take: album_levels_range or album_levels
|
|
if len(album_levels_range_arr) == 2:
|
|
if album_levels_range_arr[0] < 0:
|
|
album_levels_start_level_capped = min(len(path_chunks), abs(album_levels_range_arr[0]))
|
|
album_levels_end_level_capped = album_levels_range_arr[1]+1
|
|
album_levels_start_level_capped *= -1
|
|
else:
|
|
album_levels_start_level_capped = min(len(path_chunks)-1, album_levels_range_arr[0])
|
|
# Add 1 to album_levels_end_level_capped to include the end index, which is what the user intended to. It's not a problem
|
|
# if the end index is out of bounds.
|
|
album_levels_end_level_capped = min(len(path_chunks)-1, album_levels_range_arr[1]) + 1
|
|
logging.debug("album_levels_start_level_capped = %d", album_levels_start_level_capped)
|
|
logging.debug("album_levels_end_level_capped = %d", album_levels_end_level_capped)
|
|
# album start level is not equal to album end level, so we want a range of levels
|
|
if album_levels_start_level_capped is not album_levels_end_level_capped:
|
|
|
|
# if the end index is out of bounds.
|
|
if album_levels_end_level_capped < 0 and abs(album_levels_end_level_capped) >= len(path_chunks):
|
|
album_name_chunks = path_chunks[album_levels_start_level_capped:]
|
|
else:
|
|
album_name_chunks = path_chunks[album_levels_start_level_capped:album_levels_end_level_capped]
|
|
# album start and end levels are equal, we want exactly that level
|
|
else:
|
|
# create on-the-fly array with a single element taken from
|
|
album_name_chunks = [path_chunks[album_levels_start_level_capped]]
|
|
else:
|
|
album_levels_int = int(album_levels)
|
|
# either use as many path chunks as we have,
|
|
# or the specified album levels
|
|
album_name_chunk_size = min(len(path_chunks), abs(album_levels_int))
|
|
if album_levels_int < 0:
|
|
album_name_chunk_size *= -1
|
|
|
|
# Copy album name chunks from the path to use as album name
|
|
album_name_chunks = path_chunks[:album_name_chunk_size]
|
|
if album_name_chunk_size < 0:
|
|
album_name_chunks = path_chunks[album_name_chunk_size:]
|
|
logging.debug("album_name_chunks = %s", album_name_chunks)
|
|
return album_level_separator.join(album_name_chunks)
|
|
|
|
# Fetches assets from the Immich API
|
|
# Takes different API versions into account for compatibility
|
|
def fetchServerVersion():
|
|
# This API call was only introduced with version 1.106.1, so it will fail
|
|
# for older versions.
|
|
# Initialize the version with the latest version without this API call
|
|
version = {'major': 1, 'minor': 105, "patch": 1}
|
|
r = requests.get(root_url+'server-info/version', **requests_kwargs)
|
|
assert r.status_code == 200 or r.status_code == 404
|
|
if r.status_code == 200:
|
|
version = r.json()
|
|
logging.info("Detected Immich server version %s.%s.%s", version['major'], version['minor'], version['patch'])
|
|
else:
|
|
logging.info("Detected Immich server version %s.%s.%s or older", version['major'], version['minor'], version['patch'])
|
|
return version
|
|
|
|
# Fetches assets from the Immich API
|
|
# Takes different API versions into account for compatibility
|
|
def fetchAssets():
|
|
if version['major'] == 1 and version['minor'] <= 105:
|
|
return fetchAssetsLegacy()
|
|
else:
|
|
return fetchAssetsMinorV106()
|
|
|
|
|
|
# Fetches assets from the Immich API
|
|
# Uses the legacy GET /asset call which only exists up to v1.105.x
|
|
def fetchAssetsLegacy():
|
|
assets = []
|
|
# Initial API call, let's fetch our first chunk
|
|
r = requests.get(root_url+'asset?take='+str(number_of_assets_to_fetch_per_request), **requests_kwargs)
|
|
assert r.status_code == 200
|
|
logging.debug("Received %s assets with chunk 1", len(r.json()))
|
|
assets = assets + r.json()
|
|
|
|
# If we got a full chunk size back, let's perfrom subsequent calls until we get less than a full chunk size
|
|
skip = 0
|
|
while len(r.json()) == number_of_assets_to_fetch_per_request:
|
|
skip += number_of_assets_to_fetch_per_request
|
|
r = requests.get(root_url+'asset?take='+str(number_of_assets_to_fetch_per_request)+'&skip='+str(skip), **requests_kwargs)
|
|
if skip == number_of_assets_to_fetch_per_request and assets == r.json():
|
|
logging.info("Non-chunked Immich API detected, stopping fetching assets since we already got all in our first call")
|
|
break
|
|
assert r.status_code == 200
|
|
logging.debug("Received %s assets with chunk", len(r.json()))
|
|
assets = assets + r.json()
|
|
return assets
|
|
|
|
# Fetches assets from the Immich API
|
|
# Uses the /search/meta-data call. Much more efficient than the legacy method
|
|
# since this call allows to filter for assets that are not in an album only.
|
|
def fetchAssetsMinorV106():
|
|
assets = []
|
|
# prepare request body
|
|
body = {}
|
|
# only request images that are not in any album if we are running in CREATE mode,
|
|
# otherwise we need all images, even if they are part of an album
|
|
if mode == SCRIPT_MODE_CREATE:
|
|
body['isNotInAlbum'] = 'true'
|
|
|
|
# This API call allows a maximum page size of 1000
|
|
number_of_assets_to_fetch_per_request_search = min(1000, number_of_assets_to_fetch_per_request)
|
|
body['size'] = number_of_assets_to_fetch_per_request_search
|
|
# Initial API call, let's fetch our first chunk
|
|
page = 1
|
|
body['page'] = str(page)
|
|
r = requests.post(root_url+'search/metadata', json=body, **requests_kwargs)
|
|
r.raise_for_status()
|
|
responseJson = r.json()
|
|
assetsReceived = responseJson['assets']['items']
|
|
logging.debug("Received %s assets with chunk %s", len(assetsReceived), page)
|
|
|
|
assets = assets + assetsReceived
|
|
# If we got a full chunk size back, let's perfrom subsequent calls until we get less than a full chunk size
|
|
while len(assetsReceived) == number_of_assets_to_fetch_per_request_search:
|
|
page += 1
|
|
body['page'] = page
|
|
r = requests.post(root_url+'search/metadata', json=body, **requests_kwargs)
|
|
assert r.status_code == 200
|
|
responseJson = r.json()
|
|
assetsReceived = responseJson['assets']['items']
|
|
logging.debug("Received %s assets with chunk %s", len(assetsReceived), page)
|
|
assets = assets + assetsReceived
|
|
return assets
|
|
|
|
|
|
# Fetches albums from the Immich API
|
|
# Takes different API versions into account for compatibility
|
|
def fetchAlbums():
|
|
apiEndpoint = 'albums'
|
|
if version['major'] == 1 and version['minor'] <= 105:
|
|
apiEndpoint = 'album'
|
|
|
|
r = requests.get(root_url+apiEndpoint, **requests_kwargs)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
# Deletes an album identified by album['id']
|
|
# Takes different API versions into account for compatibility
|
|
# Returns False if the album could not be deleted, otherwise True
|
|
def deleteAlbum(album):
|
|
apiEndpoint = 'albums'
|
|
if version['major'] == 1 and version['minor'] <= 105:
|
|
apiEndpoint = 'album'
|
|
logging.debug("Album ID = %s, Album Name = %s", album['id'], album['albumName'])
|
|
r = requests.delete(root_url+apiEndpoint+'/'+album['id'], **requests_kwargs)
|
|
if r.status_code not in [200, 201]:
|
|
logging.error("Error deleting album %s: %s", album['albumName'], r.reason)
|
|
return False
|
|
return True
|
|
|
|
# Creates an album with the provided name and returns the ID of the
|
|
# created album
|
|
def createAlbum(albumName):
|
|
apiEndpoint = 'albums'
|
|
if version['major'] == 1 and version['minor'] <= 105:
|
|
apiEndpoint = 'album'
|
|
data = {
|
|
'albumName': albumName,
|
|
'description': albumName
|
|
}
|
|
r = requests.post(root_url+apiEndpoint, json=data, **requests_kwargs)
|
|
assert r.status_code in [200, 201]
|
|
return r.json()['id']
|
|
|
|
# Adds the provided assetIds to the provided albumId
|
|
def addAssetsToAlbum(albumId, assets):
|
|
apiEndpoint = 'albums'
|
|
if version['major'] == 1 and version['minor'] <= 105:
|
|
apiEndpoint = 'album'
|
|
# Divide our assets into chunks of number_of_images_per_request,
|
|
# So the API can cope
|
|
assets_chunked = list(divide_chunks(assets, number_of_images_per_request))
|
|
for assets_chunk in assets_chunked:
|
|
data = {'ids':assets_chunk}
|
|
r = requests.put(root_url+apiEndpoint+f'/{albumId}/assets', json=data, **requests_kwargs)
|
|
if r.status_code not in [200, 201]:
|
|
print(album)
|
|
print(r.json())
|
|
print(data)
|
|
continue
|
|
assert r.status_code in [200, 201]
|
|
response = r.json()
|
|
|
|
cpt = 0
|
|
for res in response:
|
|
if not res['success']:
|
|
if res['error'] != 'duplicate':
|
|
logging.warning("Error adding an asset to an album: %s", res['error'])
|
|
else:
|
|
cpt += 1
|
|
if cpt > 0:
|
|
logging.info("%d new assets added to %s", cpt, album)
|
|
|
|
# append trailing slash to all root paths
|
|
for i in range(len(root_paths)):
|
|
if root_paths[i][-1] != '/':
|
|
root_paths[i] = root_paths[i] + '/'
|
|
# append trailing slash to root URL
|
|
if root_url[-1] != '/':
|
|
root_url = root_url + '/'
|
|
|
|
version = fetchServerVersion()
|
|
# Special case: Run Mode DELETE_ALL albums
|
|
if mode == SCRIPT_MODE_DELETE_ALL:
|
|
albums = fetchAlbums()
|
|
logging.info("%d existing albums identified", len(albums))
|
|
# Delete Confirm check
|
|
if not delete_confirm:
|
|
album_names = []
|
|
for album in albums:
|
|
album_names.append(album['albumName'])
|
|
print("Would delete the following albums (ALL albums!):")
|
|
print(album_names)
|
|
if is_docker:
|
|
print("Run the container with environment variable DELETE_CONFIRM set to 1 to actually delete these albums!")
|
|
else:
|
|
print("Call with --delete-confirm to actually delete albums!")
|
|
exit(0)
|
|
cpt = 0
|
|
for album in albums:
|
|
if deleteAlbum(album):
|
|
logging.info("Deleted album %s", album['albumName'])
|
|
cpt += 1
|
|
logging.info("Deleted %d/%d albums", cpt, len(albums))
|
|
exit(0)
|
|
|
|
logging.info("Requesting all assets")
|
|
assets = fetchAssets()
|
|
logging.info("%d photos found", len(assets))
|
|
|
|
|
|
|
|
logging.info("Sorting assets to corresponding albums using folder name")
|
|
album_to_assets = defaultdict(list)
|
|
for asset in assets:
|
|
asset_path = asset['originalPath']
|
|
for root_path in root_paths:
|
|
if root_path not in asset_path:
|
|
continue
|
|
# Check ignore_albums
|
|
ignore = False
|
|
if ignore_albums:
|
|
for ignore_entry in ignore_albums:
|
|
if ignore_entry in asset_path:
|
|
ignore = True
|
|
break
|
|
if ignore:
|
|
logging.debug("Ignoring asset %s due to ignore_albums setting!", asset_path)
|
|
continue
|
|
|
|
# Chunks of the asset's path below root_path
|
|
path_chunks = asset_path.replace(root_path, '').split('/')
|
|
# A single chunk means it's just the image file in no sub folder, ignore
|
|
if len(path_chunks) == 1:
|
|
continue
|
|
|
|
# remove last item from path chunks, which is the file name
|
|
del path_chunks[-1]
|
|
album_name = create_album_name(path_chunks)
|
|
if len(album_name) > 0:
|
|
album_to_assets[album_name].append(asset['id'])
|
|
else:
|
|
logging.warning("Got empty album name for asset path %s, check your album_level settings!", asset_path)
|
|
|
|
album_to_assets = {k:v for k, v in sorted(album_to_assets.items(), key=(lambda item: item[0]))}
|
|
|
|
logging.info("%d albums identified", len(album_to_assets))
|
|
logging.info("Album list: %s", list(album_to_assets.keys()))
|
|
|
|
if not unattended and mode == SCRIPT_MODE_CREATE:
|
|
if is_docker:
|
|
print("Check that this is the list of albums you want to create. Run the container with environment variable UNATTENDED set to 1 to actually create these albums.")
|
|
exit(0)
|
|
else:
|
|
print("Press enter to create these albums, Ctrl+C to abort")
|
|
input()
|
|
|
|
album_to_id = {}
|
|
|
|
logging.info("Listing existing albums on immich")
|
|
|
|
albums = fetchAlbums()
|
|
album_to_id = {album['albumName']:album['id'] for album in albums }
|
|
logging.info("%d existing albums identified", len(albums))
|
|
|
|
# mode CLEANUP
|
|
if mode == SCRIPT_MODE_CLEANUP:
|
|
# Delete Confirm check
|
|
if not delete_confirm:
|
|
print("Would delete the following albums:")
|
|
print(list(album_to_id.keys()))
|
|
if is_docker:
|
|
print("Run the container with environment variable DELETE_CONFIRM set to 1 to actually delete these albums!")
|
|
else:
|
|
print(" Call with --delete-confirm to actually delete albums!")
|
|
exit(0)
|
|
|
|
cpt = 0
|
|
for album in album_to_assets:
|
|
if album in album_to_id:
|
|
album_to_delete = dict()
|
|
album_to_delete['id'] = album_to_id[album]
|
|
album_to_delete['albumName'] = album
|
|
if deleteAlbum(album_to_delete):
|
|
logging.info("Deleted album %s", album_to_delete['albumName'])
|
|
cpt += 1
|
|
logging.info("Deleted %d/%d albums", cpt, len(album_to_assets))
|
|
exit(0)
|
|
|
|
|
|
# mode CREATE
|
|
logging.info("Creating albums if needed")
|
|
cpt = 0
|
|
for album in album_to_assets:
|
|
if album in album_to_id:
|
|
continue
|
|
album_to_id[album] = createAlbum(album)
|
|
logging.info('Album %s added!', album)
|
|
cpt += 1
|
|
logging.info("%d albums created", cpt)
|
|
|
|
|
|
logging.info("Adding assets to albums")
|
|
# Note: Immich manages duplicates without problem,
|
|
# so we can each time ad all assets to same album, no photo will be duplicated
|
|
for album, assets in album_to_assets.items():
|
|
id = album_to_id[album]
|
|
addAssetsToAlbum(id, assets)
|
|
|
|
logging.info("Done!")
|