16
16
# under the License.
17
17
from __future__ import annotations
18
18
19
- from functools import cached_property
20
19
from typing import TYPE_CHECKING , Any
21
20
from urllib .parse import urljoin
22
21
22
+ import requests
23
23
from fastapi import FastAPI
24
- from starlette .middleware .sessions import SessionMiddleware
25
24
26
25
from airflow .api_fastapi .app import AUTH_MANAGER_FASTAPI_APP_PREFIX
27
26
from airflow .api_fastapi .auth .managers .base_auth_manager import BaseAuthManager , T
28
27
from airflow .configuration import conf
28
+ from airflow .exceptions import AirflowException
29
+ from airflow .providers .keycloak .auth_manager .resources import KeycloakResource
29
30
from airflow .providers .keycloak .auth_manager .user import KeycloakAuthManagerUser
30
31
31
32
if TYPE_CHECKING :
@@ -52,10 +53,6 @@ class KeycloakAuthManager(BaseAuthManager[KeycloakAuthManagerUser]):
52
53
Leverages Keycloak to perform authentication and authorization in Airflow.
53
54
"""
54
55
55
- @cached_property
56
- def api_server_endpoint (self ) -> str :
57
- return conf .get ("api" , "base_url" , fallback = "/" )
58
-
59
56
def deserialize_user (self , token : dict [str , Any ]) -> KeycloakAuthManagerUser :
60
57
return KeycloakAuthManagerUser (
61
58
user_id = token .pop ("user_id" ),
@@ -73,7 +70,8 @@ def serialize_user(self, user: KeycloakAuthManagerUser) -> dict[str, Any]:
73
70
}
74
71
75
72
def get_url_login (self , ** kwargs ) -> str :
76
- return urljoin (self .api_server_endpoint , f"{ AUTH_MANAGER_FASTAPI_APP_PREFIX } /login" )
73
+ base_url = conf .get ("api" , "base_url" , fallback = "/" )
74
+ return urljoin (base_url , f"{ AUTH_MANAGER_FASTAPI_APP_PREFIX } /login" )
77
75
78
76
def is_authorized_configuration (
79
77
self ,
@@ -82,7 +80,18 @@ def is_authorized_configuration(
82
80
user : KeycloakAuthManagerUser ,
83
81
details : ConfigurationDetails | None = None ,
84
82
) -> bool :
85
- return True
83
+ config_section = details .section if details else None
84
+ return self ._is_authorized (
85
+ method = method , resource_type = KeycloakResource .CONFIGURATION , user = user
86
+ ) or (
87
+ config_section is not None
88
+ and self ._is_authorized (
89
+ method = method ,
90
+ resource_type = KeycloakResource .CONFIGURATION ,
91
+ user = user ,
92
+ resource_id = config_section ,
93
+ )
94
+ )
86
95
87
96
def is_authorized_connection (
88
97
self ,
@@ -91,7 +100,13 @@ def is_authorized_connection(
91
100
user : KeycloakAuthManagerUser ,
92
101
details : ConnectionDetails | None = None ,
93
102
) -> bool :
94
- return True
103
+ connection_id = details .conn_id if details else None
104
+ return self ._is_authorized (method = method , resource_type = KeycloakResource .CONNECTION , user = user ) or (
105
+ connection_id is not None
106
+ and self ._is_authorized (
107
+ method = method , resource_type = KeycloakResource .CONNECTION , user = user , resource_id = connection_id
108
+ )
109
+ )
95
110
96
111
def is_authorized_dag (
97
112
self ,
@@ -106,12 +121,24 @@ def is_authorized_dag(
106
121
def is_authorized_backfill (
107
122
self , * , method : ResourceMethod , user : KeycloakAuthManagerUser , details : BackfillDetails | None = None
108
123
) -> bool :
109
- return True
124
+ backfill_id = str (details .id ) if details else None
125
+ return self ._is_authorized (method = method , resource_type = KeycloakResource .BACKFILL , user = user ) or (
126
+ backfill_id is not None
127
+ and self ._is_authorized (
128
+ method = method , resource_type = KeycloakResource .BACKFILL , user = user , resource_id = backfill_id
129
+ )
130
+ )
110
131
111
132
def is_authorized_asset (
112
133
self , * , method : ResourceMethod , user : KeycloakAuthManagerUser , details : AssetDetails | None = None
113
134
) -> bool :
114
- return True
135
+ asset_id = details .id if details else None
136
+ return self ._is_authorized (method = method , resource_type = KeycloakResource .ASSET , user = user ) or (
137
+ asset_id is not None
138
+ and self ._is_authorized (
139
+ method = method , resource_type = KeycloakResource .ASSET , user = user , resource_id = asset_id
140
+ )
141
+ )
115
142
116
143
def is_authorized_asset_alias (
117
144
self ,
@@ -120,25 +147,57 @@ def is_authorized_asset_alias(
120
147
user : KeycloakAuthManagerUser ,
121
148
details : AssetAliasDetails | None = None ,
122
149
) -> bool :
123
- return True
150
+ asset_alias_id = details .id if details else None
151
+ return self ._is_authorized (method = method , resource_type = KeycloakResource .ASSET_ALIAS , user = user ) or (
152
+ asset_alias_id is not None
153
+ and self ._is_authorized (
154
+ method = method ,
155
+ resource_type = KeycloakResource .ASSET_ALIAS ,
156
+ user = user ,
157
+ resource_id = asset_alias_id ,
158
+ )
159
+ )
124
160
125
161
def is_authorized_variable (
126
162
self , * , method : ResourceMethod , user : KeycloakAuthManagerUser , details : VariableDetails | None = None
127
163
) -> bool :
128
- return True
164
+ variable_key = details .key if details else None
165
+ return self ._is_authorized (method = method , resource_type = KeycloakResource .VARIABLE , user = user ) or (
166
+ variable_key is not None
167
+ and self ._is_authorized (
168
+ method = method , resource_type = KeycloakResource .VARIABLE , user = user , resource_id = variable_key
169
+ )
170
+ )
129
171
130
172
def is_authorized_pool (
131
173
self , * , method : ResourceMethod , user : KeycloakAuthManagerUser , details : PoolDetails | None = None
132
174
) -> bool :
133
- return True
175
+ pool_name = details .name if details else None
176
+ return self ._is_authorized (method = method , resource_type = KeycloakResource .POOL , user = user ) or (
177
+ pool_name is not None
178
+ and self ._is_authorized (
179
+ method = method , resource_type = KeycloakResource .POOL , user = user , resource_id = pool_name
180
+ )
181
+ )
134
182
135
183
def is_authorized_view (self , * , access_view : AccessView , user : KeycloakAuthManagerUser ) -> bool :
136
- return True
184
+ return self ._is_authorized (
185
+ method = "GET" , resource_type = KeycloakResource .VIEW , user = user
186
+ ) or self ._is_authorized (
187
+ method = "GET" ,
188
+ resource_type = KeycloakResource .VIEW ,
189
+ user = user ,
190
+ resource_id = access_view .value ,
191
+ )
137
192
138
193
def is_authorized_custom_view (
139
194
self , * , method : ResourceMethod | str , resource_name : str , user : KeycloakAuthManagerUser
140
195
) -> bool :
141
- return True
196
+ return self ._is_authorized (
197
+ method = method , resource_type = KeycloakResource .CUSTOM , user = user
198
+ ) or self ._is_authorized (
199
+ method = method , resource_type = KeycloakResource .CUSTOM , user = user , resource_id = resource_name
200
+ )
142
201
143
202
def filter_authorized_menu_items (
144
203
self , menu_items : list [MenuItem ], * , user : KeycloakAuthManagerUser
@@ -156,12 +215,54 @@ def get_fastapi_app(self) -> FastAPI | None:
156
215
"This sub application provides login routes."
157
216
),
158
217
)
159
- # authlib requires ``SessionMiddleware``
160
- app .add_middleware (
161
- SessionMiddleware ,
162
- secret_key = conf .get ("api_auth" , "jwt_secret" ),
163
- https_only = False ,
164
- )
165
218
app .include_router (login_router )
166
219
167
220
return app
221
+
222
+ def _is_authorized (
223
+ self ,
224
+ * ,
225
+ method : ResourceMethod | str ,
226
+ resource_type : KeycloakResource ,
227
+ user : KeycloakAuthManagerUser ,
228
+ resource_id : str | None = None ,
229
+ ) -> bool :
230
+ client_id = conf .get ("keycloak_auth_manager" , "client_id" )
231
+ realm = conf .get ("keycloak_auth_manager" , "realm" )
232
+ server_url = conf .get ("keycloak_auth_manager" , "server_url" )
233
+
234
+ permission = (
235
+ f"{ resource_type .value } :{ resource_id } #{ method } "
236
+ if resource_id
237
+ else f"{ resource_type .value } #{ method } "
238
+ )
239
+ resp = requests .post (
240
+ self ._get_token_url (server_url , realm ),
241
+ data = self ._get_payload (client_id , permission ),
242
+ headers = self ._get_headers (user .access_token ),
243
+ )
244
+
245
+ if resp .status_code == 200 :
246
+ return True
247
+ if resp .status_code == 403 :
248
+ return False
249
+ raise AirflowException (f"Unexpected error: { resp .status_code } - { resp .text } " )
250
+
251
+ @staticmethod
252
+ def _get_token_url (server_url , realm ):
253
+ return f"{ server_url } /realms/{ realm } /protocol/openid-connect/token"
254
+
255
+ @staticmethod
256
+ def _get_payload (client_id , permission ):
257
+ return {
258
+ "grant_type" : "urn:ietf:params:oauth:grant-type:uma-ticket" ,
259
+ "audience" : client_id ,
260
+ "permission" : permission ,
261
+ }
262
+
263
+ @staticmethod
264
+ def _get_headers (access_token ):
265
+ return {
266
+ "Authorization" : f"Bearer { access_token } " ,
267
+ "Content-Type" : "application/x-www-form-urlencoded" ,
268
+ }
0 commit comments