Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``6222ce48e289`` (head) | ``134de42d3cb0`` | ``3.2.0`` | Add partition fields to DagModel. |
| ``888b59e02a5b`` (head) | ``6222ce48e289`` | ``3.2.0`` | Fix migration file ORM inconsistencies. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``6222ce48e289`` | ``134de42d3cb0`` | ``3.2.0`` | Add partition fields to DagModel. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``134de42d3cb0`` | ``e42d9fcd10d9`` | ``3.2.0`` | Add partition_key to backfill_dag_run. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
16 changes: 14 additions & 2 deletions airflow-core/src/airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,16 @@ def string_lower_type(val):
action="store_true",
default=False,
)
ARG_DB_USE_MIGRATION_FILES = Arg(
("-m", "--use-migration-files"),
help=(
"Use migration files to create the database instead of the ORM. "
"This is useful for verifying that the migration files produce the "
"same schema as the ORM models."
),
action="store_true",
default=False,
)

ARG_DB_MANAGER_PATH = Arg(
("import_path",),
Expand Down Expand Up @@ -1521,7 +1531,7 @@ class GroupCommand(NamedTuple):
name="reset",
help="Burn down and rebuild the metadata database",
func=lazy_load_command("airflow.cli.commands.db_command.resetdb"),
args=(ARG_YES, ARG_DB_SKIP_INIT, ARG_VERBOSE),
args=(ARG_YES, ARG_DB_SKIP_INIT, ARG_DB_USE_MIGRATION_FILES, ARG_VERBOSE),
),
ActionCommand(
name="migrate",
Expand All @@ -1541,6 +1551,7 @@ class GroupCommand(NamedTuple):
ARG_DB_SQL_ONLY,
ARG_DB_FROM_REVISION,
ARG_DB_FROM_VERSION,
ARG_DB_USE_MIGRATION_FILES,
ARG_VERBOSE,
),
),
Expand Down Expand Up @@ -1878,7 +1889,7 @@ class GroupCommand(NamedTuple):
name="reset",
help="Burn down and rebuild the specified external database",
func=lazy_load_command("airflow.cli.commands.db_manager_command.resetdb"),
args=(ARG_DB_MANAGER_PATH, ARG_YES, ARG_DB_SKIP_INIT, ARG_VERBOSE),
args=(ARG_DB_MANAGER_PATH, ARG_YES, ARG_DB_SKIP_INIT, ARG_DB_USE_MIGRATION_FILES, ARG_VERBOSE),
),
ActionCommand(
name="migrate",
Expand All @@ -1899,6 +1910,7 @@ class GroupCommand(NamedTuple):
ARG_DB_SQL_ONLY,
ARG_DB_FROM_REVISION,
ARG_DB_FROM_VERSION,
ARG_DB_USE_MIGRATION_FILES,
ARG_VERBOSE,
),
),
Expand Down
24 changes: 18 additions & 6 deletions airflow-core/src/airflow/cli/commands/db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow.utils import cli as cli_utils, db
from airflow.utils.db import _REVISION_HEADS_MAP
from airflow.utils.db_cleanup import config_dict, drop_archived_tables, export_archived_records, run_cleanup
from airflow.utils.db_manager import _callable_accepts_use_migration_files
from airflow.utils.process_utils import execute_interactive
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

Expand All @@ -47,7 +48,7 @@ def resetdb(args):
print(f"DB: {settings.get_engine().url!r}")
if not (args.yes or input("This will drop existing tables if they exist. Proceed? (y/n)").upper() == "Y"):
raise SystemExit("Cancelled")
db.resetdb(skip_init=args.skip_init)
db.resetdb(skip_init=args.skip_init, use_migration_files=args.use_migration_files)


def _get_version_revision(version: str, revision_heads_map: dict[str, str] | None = None) -> str | None:
Expand Down Expand Up @@ -127,15 +128,26 @@ def run_db_migrate_command(args, command, revision_heads_map: dict[str, str]):
elif args.to_revision:
to_revision = args.to_revision

use_migration_files = getattr(args, "use_migration_files", False)

if not args.show_sql_only:
log.info("Performing upgrade to the metadata database", url=db_url)
else:
log.info("Generating sql for upgrade -- upgrade commands will *not* be submitted.")
command(
to_revision=to_revision,
from_revision=from_revision,
show_sql_only=args.show_sql_only,
)

kwargs: dict = {
"to_revision": to_revision,
"from_revision": from_revision,
"show_sql_only": args.show_sql_only,
}
if _callable_accepts_use_migration_files(command):
kwargs["use_migration_files"] = use_migration_files
elif use_migration_files:
log.warning(
"The upgrade command %r does not support '--use-migration-files'; the flag will be ignored.",
getattr(command, "__qualname__", repr(command)),
)
command(**kwargs)
if not args.show_sql_only:
log.info("Database migration done!")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from airflow.cli.commands.db_command import run_db_downgrade_command, run_db_migrate_command
from airflow.configuration import conf
from airflow.utils import cli as cli_utils
from airflow.utils.db_manager import _callable_accepts_use_migration_files
from airflow.utils.providers_configuration_loader import providers_configuration_loaded


Expand All @@ -38,7 +39,11 @@ def resetdb(args):
db_manager = _get_db_manager(args.import_path)
if not (args.yes or input("This will drop existing tables if they exist. Proceed? (y/n)").upper() == "Y"):
raise SystemExit("Cancelled")
db_manager(settings.Session()).resetdb(skip_init=args.skip_init)
manager = db_manager(settings.Session())
kwargs: dict = {"skip_init": args.skip_init}
if _callable_accepts_use_migration_files(manager.resetdb):
kwargs["use_migration_files"] = getattr(args, "use_migration_files", False)
manager.resetdb(**kwargs)


@cli_utils.action_cli(check_db=False)
Expand Down
215 changes: 205 additions & 10 deletions airflow-core/src/airflow/migrations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op):
CONSTRAINT_NAME = '{constraint_name}' AND
CONSTRAINT_TYPE = 'FOREIGN KEY'
) THEN
ALTER TABLE {table_name}
DROP CONSTRAINT {constraint_name};
ALTER TABLE `{table_name}`
DROP CONSTRAINT `{constraint_name}`;
Comment thread
ephraimbuddy marked this conversation as resolved.
ELSE
SELECT 1;
END IF;
Expand All @@ -89,20 +89,21 @@ def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op):
def mysql_drop_index_if_exists(index_name, table_name, op):
"""Older Mysql versions do not support DROP INDEX IF EXISTS."""
op.execute(f"""
CREATE PROCEDURE DropIndexIfExists()
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.TABLE_CONSTRAINTS
FROM information_schema.STATISTICS
WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
CONSTRAINT_NAME = '{index_name}' AND
CONSTRAINT_TYPE = 'INDEX'
INDEX_NAME = '{index_name}'
) THEN
ALTER TABLE {table_name}
DROP INDEX {index_name};
ELSE
SELECT 1;
DROP INDEX `{index_name}` ON `{table_name}`;
END IF;
END;
CALL DropIndexIfExists();
DROP PROCEDURE DropIndexIfExists;
""")


Expand All @@ -112,3 +113,197 @@ def ignore_sqlite_value_error():
if op.get_bind().dialect.name == "sqlite":
return contextlib.suppress(ValueError)
return contextlib.nullcontext()


def get_dialect_name(op) -> str:
conn = op.get_bind()
return conn.dialect.name if conn is not None else op.get_context().dialect.name


def create_index_if_not_exists(op, index_name, table_name, columns, unique=False) -> None:
"""
Create an index if it does not already exist.

MySQL does not support CREATE INDEX IF NOT EXISTS, so a stored procedure is used.
PostgreSQL and SQLite support it natively.
"""
dialect_name = get_dialect_name(op)

if dialect_name == "mysql":
unique_kw = "UNIQUE " if unique else ""
col_list = ", ".join(f"`{c}`" for c in columns)
op.execute(
text(f"""
DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
CREATE PROCEDURE CreateIndexIfNotExists()
Comment thread
ephraimbuddy marked this conversation as resolved.
BEGIN
IF NOT EXISTS (
SELECT 1
FROM information_schema.STATISTICS
WHERE
TABLE_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
INDEX_NAME = '{index_name}'
) THEN
CREATE {unique_kw}INDEX `{index_name}` ON `{table_name}` ({col_list});
END IF;
END;
CALL CreateIndexIfNotExists();
DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
""")
)
else:
op.create_index(index_name, table_name, columns, unique=unique, if_not_exists=True)


def drop_index_if_exists(op, index_name, table_name) -> None:
"""
Drop an index if it exists.

Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL.
SQLite and PostgreSQL support DROP INDEX IF EXISTS natively.
MySQL requires a stored procedure since it does not support IF EXISTS for DROP INDEX.
"""
dialect_name = get_dialect_name(op)

if dialect_name == "mysql":
op.execute(
text(f"""
CREATE PROCEDURE DropIndexIfExists()
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.STATISTICS
WHERE
TABLE_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
INDEX_NAME = '{index_name}'
) THEN
DROP INDEX `{index_name}` ON `{table_name}`;
END IF;
END;
CALL DropIndexIfExists();
DROP PROCEDURE DropIndexIfExists;
""")
)
else:
# PostgreSQL and SQLite both support DROP INDEX IF EXISTS
op.drop_index(index_name, table_name=table_name, if_exists=True)


def drop_unique_constraints_on_columns(op, table_name, columns) -> None:
"""
Drop all unique constraints covering any of the given columns, regardless of constraint name.

Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL.
SQLite falls back to batch mode and requires a live connection.
"""
import sqlalchemy as sa

dialect_name = get_dialect_name(op)

if dialect_name == "postgresql":
cols_array = ", ".join(f"'{c}'" for c in columns)
op.execute(
text(f"""
DO $$
DECLARE r record;
BEGIN
FOR r IN
SELECT DISTINCT tc.constraint_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.table_name = '{table_name}'
AND tc.constraint_type = 'UNIQUE'
AND kcu.column_name = ANY(ARRAY[{cols_array}]::text[])
LOOP
EXECUTE 'ALTER TABLE ' || quote_ident('{table_name}') || ' DROP CONSTRAINT IF EXISTS '
|| quote_ident(r.constraint_name);
END LOOP;
END $$
""")
)
elif dialect_name == "mysql":
cols_in = ", ".join(f"'{c}'" for c in columns)
op.execute(
text(f"""
CREATE PROCEDURE DropUniqueOnColumns()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE v_name VARCHAR(255);
DECLARE cur CURSOR FOR
SELECT DISTINCT kcu.CONSTRAINT_NAME
FROM information_schema.KEY_COLUMN_USAGE kcu
JOIN information_schema.TABLE_CONSTRAINTS tc
ON kcu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
AND kcu.TABLE_SCHEMA = tc.TABLE_SCHEMA
AND kcu.TABLE_NAME = tc.TABLE_NAME
WHERE kcu.TABLE_NAME = '{table_name}'
AND kcu.TABLE_SCHEMA = DATABASE()
AND tc.CONSTRAINT_TYPE = 'UNIQUE'
AND kcu.COLUMN_NAME IN ({cols_in});
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
OPEN cur;
drop_loop: LOOP
FETCH cur INTO v_name;
IF done THEN LEAVE drop_loop; END IF;
SET @stmt = CONCAT('ALTER TABLE `{table_name}` DROP INDEX `', v_name, '`');
PREPARE s FROM @stmt;
EXECUTE s;
DEALLOCATE PREPARE s;
END LOOP;
CLOSE cur;
END;
CALL DropUniqueOnColumns();
DROP PROCEDURE DropUniqueOnColumns;
""")
)
else:
# SQLite — batch mode rewrites the table; requires a live connection
with op.batch_alter_table(table_name, schema=None) as batch_op:
for uq in sa.inspect(op.get_bind()).get_unique_constraints(table_name):
if any(col in uq["column_names"] for col in columns):
batch_op.drop_constraint(uq["name"], type_="unique")


def drop_unique_constraint_if_exists(op, table_name, constraint_name) -> None:
"""
Drop a unique constraint by name if it exists.

Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL.
SQLite falls back to batch mode and requires a live connection.
"""
dialect_name = get_dialect_name(op)

if dialect_name == "postgresql":
op.execute(text(f'ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS "{constraint_name}"'))
elif dialect_name == "mysql":
op.execute(
text(f"""
CREATE PROCEDURE DropUniqueIfExists()
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.TABLE_CONSTRAINTS
WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
CONSTRAINT_NAME = '{constraint_name}' AND
CONSTRAINT_TYPE = 'UNIQUE'
) THEN
ALTER TABLE `{table_name}` DROP INDEX `{constraint_name}`;
ELSE
SELECT 1;
END IF;
END;
CALL DropUniqueIfExists();
DROP PROCEDURE DropUniqueIfExists;
""")
)
else:
# SQLite — batch mode rewrites the table; requires a live connection
with op.batch_alter_table(table_name, schema=None) as batch_op:
with contextlib.suppress(ValueError):
batch_op.drop_constraint(constraint_name, type_="unique")
Loading
Loading