3030from sqlalchemy_utils import UUIDType
3131
3232from airflow ._shared .secrets_masker import mask_secret
33- from airflow .configuration import ensure_secrets_loaded
33+ from airflow .configuration import conf , ensure_secrets_loaded
3434from airflow .models .base import ID_LEN , Base
3535from airflow .models .crypto import get_fernet
3636from airflow .models .team import Team
@@ -146,7 +146,7 @@ def get(
146146 # means SQLA etc is loaded, but we can't avoid that unless/until we add import shims as a big
147147 # back-compat layer
148148
149- # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps)
149+ # If this is set it means we are in some kind of execution context (Task, Dag Parse or Triggerer perhaps)
150150 # and should use the Task SDK API server path
151151 if hasattr (sys .modules .get ("airflow.sdk.execution_time.task_runner" ), "SUPERVISOR_COMMS" ):
152152 warnings .warn (
@@ -186,6 +186,7 @@ def set(
186186 value : Any ,
187187 description : str | None = None ,
188188 serialize_json : bool = False ,
189+ team_id : str | None = None ,
189190 session : Session | None = None ,
190191 ) -> None :
191192 """
@@ -197,13 +198,14 @@ def set(
197198 :param value: Value to set for the Variable
198199 :param description: Description of the Variable
199200 :param serialize_json: Serialize the value to a JSON string
201+ :param team_id: ID of the team associated to the variable (if any)
200202 :param session: optional session, use if provided or create a new one
201203 """
202204 # TODO: This is not the best way of having compat, but it's "better than erroring" for now. This still
203205 # means SQLA etc is loaded, but we can't avoid that unless/until we add import shims as a big
204206 # back-compat layer
205207
206- # If this is set it means are in some kind of execution context (Task, Dag Parse or Triggerer perhaps)
208+ # If this is set it means we are in some kind of execution context (Task, Dag Parse or Triggerer perhaps)
207209 # and should use the Task SDK API server path
208210 if hasattr (sys .modules .get ("airflow.sdk.execution_time.task_runner" ), "SUPERVISOR_COMMS" ):
209211 warnings .warn (
@@ -222,6 +224,11 @@ def set(
222224 )
223225 return
224226
227+ if team_id and not conf .getboolean ("core" , "multi_team" ):
228+ raise ValueError (
229+ "Multi-team mode is not configured in the Airflow environment. To assign a team to a variable, multi-mode must be enabled."
230+ )
231+
225232 # check if the secret exists in the custom secrets' backend.
226233 Variable .check_for_write_conflict (key = key )
227234 if serialize_json :
@@ -236,7 +243,7 @@ def set(
236243 ctx = create_session ()
237244
238245 with ctx as session :
239- new_variable = Variable (key = key , val = stored_value , description = description )
246+ new_variable = Variable (key = key , val = stored_value , description = description , team_id = team_id )
240247
241248 val = new_variable ._val
242249 is_encrypted = new_variable .is_encrypted
@@ -255,6 +262,7 @@ def set(
255262 val = val ,
256263 description = description ,
257264 is_encrypted = is_encrypted ,
265+ team_id = team_id ,
258266 )
259267
260268 # Apply dialect-specific upsert
@@ -264,6 +272,7 @@ def set(
264272 val = val ,
265273 description = description ,
266274 is_encrypted = is_encrypted ,
275+ team_id = team_id ,
267276 )
268277 else :
269278 # PostgreSQL and SQLite: ON CONFLICT DO UPDATE
@@ -273,6 +282,7 @@ def set(
273282 val = val ,
274283 description = description ,
275284 is_encrypted = is_encrypted ,
285+ team_id = team_id ,
276286 ),
277287 )
278288
0 commit comments