-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
125 lines (93 loc) · 3.41 KB
/
main.py
File metadata and controls
125 lines (93 loc) · 3.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import os
from alembic import command
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from sqlalchemy import Column, DateTime, create_engine
from sqlalchemy.orm import sessionmaker
from db.models import Base, Model, Tag
def get_alembic_cfg():
return Config("alembic.ini")
def get_db_url(alembic_cfg):
return alembic_cfg.get_main_option("sqlalchemy.url")
def get_db_name(alembic_cfg):
return get_db_url(alembic_cfg).split("/")[-1]
def reset_db(alembic_cfg):
db_name = get_db_name(alembic_cfg)
if os.path.exists(db_name):
print(f"Removing existing database: {db_name}")
os.remove(db_name)
def init_db(alembic_cfg, revision="head"):
db_url = get_db_url(alembic_cfg)
print(f"Database URL: {db_url}")
# Check if we need to upgrade
engine = create_engine(db_url)
conn = engine.connect()
context = MigrationContext.configure(conn)
current_rev = context.get_current_revision()
script = ScriptDirectory.from_config(alembic_cfg)
if revision == "head":
target_rev = script.get_current_head()
else:
target_rev = revision
print(f"Current revision: {current_rev}")
print(f"Target revision: {target_rev}")
if current_rev != target_rev:
# Database is not up to date, run upgrade
command.upgrade(alembic_cfg, target_rev)
print(f"Database upgraded from {current_rev} to {target_rev}")
else:
print("Database is up to date")
return engine
def register_date_added_column():
col = Column(
"date_added", DateTime
)
setattr(Model, col.name, col)
if __name__ == "__main__":
alembic_cfg = get_alembic_cfg()
reset_db(alembic_cfg)
# Initialise database to first revision
engine = init_db(alembic_cfg, "15d874f688e8")
Session = sessionmaker(bind=engine)
# Seed database
with Session() as session:
readme_model = Model(
path="readme.md",
name="Readme",
description="Project readme",
type="readme",
architecture="markdown"
)
session.add(readme_model)
session.add(Model(
path="requirements.txt",
name="Requirements",
description="Project requirements",
type="requirements",
architecture="text"
))
my_first_tag = Tag(name="my first tag")
session.add(my_first_tag)
session.commit()
# Add a tag to the readme model
readme_model.tags.append(my_first_tag)
session.commit()
tagged_models = (
session.query(Model)
.join(Model.tags)
.filter(Tag.name == my_first_tag.name)
.all()
)
print(f"Tagged models: {len(tagged_models)}")
print(f"All models: {len(session.query(Model).all())}")
# Dynamically register our date_added column, normally this would just be part of the model
# but for this example we need to add it at runtime so the initial seeding doesnt try to set it
register_date_added_column()
# Upgrade to latest revision
engine = init_db(alembic_cfg)
Session = sessionmaker(bind=engine)
# Print all models with their date_added, this should now be populated by the migration
with Session() as session:
for model in session.query(Model).all():
print(model.path, model.date_added)