-
Airflow authentication with RBAC and KeycloakIT 2021. 9. 20. 19:47반응형
Apache Airflow?
Data 분석 업무 진행시 가장 중요한 업무는 데이터를 정확한 시간에 정확하게 처리하여 정확한 곳에 저장하는 일이 가장 중요하다. 이는 데이터 사이즈와 업무 복잡도에 따라 구현 난이도가 달라지게 되며 개발 완료된 파이프 라인에 대한 운영 또한 상당한 리소스가 필요하다. 레거시한 방식으로는 데이터 전처리 프로그램을 작성 후 스케줄러에 등록하여 실행하는 방법이 대부분이었으나 최근에는 오픈소스 ETL 툴을 사용하여 이러한 작업을 많이 자동화 하는 추세이다. 데이터 분류에 따라 오픈소스 ETL Tool도 약간씩 달라지며 Airflow는 Data Science 관련 업무에서 많이 사용하는 툴이다.
Airflow 자체가 워낙 잘 만들어진 툴이고 제공되는 기능도 다양하여 사용하는데 큰 문제가 없지만 운영하는 시스템 규모가 매우 크고 접속하는 개발자 또는 데이터 사이언티스트가 매우 많다면 Airflow에서 제공하는 심플한 인증 방식으로는 매우 아쉬운게 사실이다. 심플한 인증에 대한 대안으로 Airflow에서는 LDAP과 OAuth 인증을 제공해주고 있으며 이 둘중에 하나를 연결하여 사용하면 이러한 문제도 간편하게 해결 가능하다.
이글에서는 현재시점('21년 9월)에서 구글 검색이나 스택오버플로우에서 관련 내용을 찾기가 힘든 Airflow의 OAuth 인증 연동에 대한 방법을 소개하고자 한다.
Prerequisite
- Airfow >= 2.1.3
설치시 google_auth 패키지와 함께 설치되어야 합니다. (ex: apache-airflow[google_auth]==2.1.3) - Keycloak >= 14.0.0
- Python > 3.7
Configuring Keycloak
Keycloak의 realm에 airflow를 위한 client를 추가하도록 한다.(Keycloak에 대한 자세한 설치 및 설정 방법은 이 블로그에 있는 Keycloak 설치 방법을 검색하기 바란다.) client는 Realm -> Configure -> Clients -> Create에서 추가 가능하다.
client가 추가되면 OAuth를 위한 세부 설정을 진행한다.
- Access type: Confidential
- Authorization Enabled: On
- Valid Redirect URIs: "*" (redirect URI를 "*"로 지정하는 방법은 굉장히 위험한 설정으로 운영 환경에서는 정확한 Airflow callback 주소를 입력하도록 한다.)
Airflow에서 사용되는 Role을 추가한다. Role은 Realm -> Configure -> Client -> Airflow -> Roles에서 추가 가능하다. airflow에서는 기본적으로 5가지 ROLE이 미리 정의되어 있다. 정의된 Role은 Admin, Op, User, View, Public이며 5개 모두 생성하는것을 추천한다.
Role이 생성 완료되었으면 Role을 바인딩할 그룹을 생성한다. 그룹은 Manage -> Groups -> New로 생성 가능하다. ROLE과 1:1로 매핑해야하므로 가능하면 5개 모두 만들도록 하자.
Group이 생성완료되었으면 Group에 Role을 매핑하도록 한다. 매핑은 Realms -> Manage -> Groups에서 해당 그룹을 선택하여 Edit 버튼으로 매핑 가능하다. Role Mapping 탭에서 Client roles을 airflow로 선택 후 위에서 설명한 5개의 Role을 모두 매핑해주도록 하자.
마지막으로 사용자를 해당 그룹에 추가하면 최종 Role을 사용자에게 매핑하는 작업이 끝나게 된다. Realm -> Manage -> Users에서 바인딩할 사용자 검색 후 Groups로 이동하여 그룹 선택 후 Join을 선택하여 바인딩하도록 한다.
모든 설정이 완료되었으면 curl을 사용하여 토큰키를 제대로 받아오는지 테스트 하도록 한다. URL과 Arguments는 자신의 keycloak 서버에 맞는 값으로 변경해서 테스트 한다. 아래와 같은 형식으로 값이 출력되면 정상적으로 설정된것이다. 추가로 access_token 값을 jwt.io에서 확인하여 Role이 정상적으로 바인딩되었는지 확인한다.
$ curl -s -X POST https://keycloak.homelab.local/auth/realms/homelab/protocol/openid-connect/token \ --header 'Content-Type: application/x-www-form-urlencoded' \ --data-urlencode 'grant_type=password' \ --data-urlencode 'client_id=${client_id}' \ --data-urlencode 'client_secret=${client_secret}' \ --data-urlencode 'username=${username}' \ --data-urlencode 'password=${password}' | jq { "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICI3Ukg4ZHdjVjhtbExjcTVOR2Y0WnFt........", "expires_in": 28800, "refresh_expires_in": 1800, "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICIzZjFmZGMwNS04MjA0LTQxYWQtODQ........", "token_type": "Bearer", "not-before-policy": 0, "session_state": "682786d4-d6af-41fd-b00a-f55a5e7dca37", "scope": "profile email" } $
아래는 jwt.io에서 token을 디코드한 값으로 client인 airflow role에 airflow_op가 할당되어 있는것을 확인 할 수 있다.
Configuring Airflow
Airflow 설정은 webserver_config.py, airflow.cfg 그리고 restapi를 위한 추가 설정으로 OAuth를 완벽하게 적용할 수 있다.
changing webserver_config.py
Airflow 2.X 부터는 Flask App Builder(FAB)으로 변경되어 FAB에 관련 설정을 변경하여 OAuth 관련 설정을 진행해야 다. 레퍼런스의 FAB Github 소스를 확인해보면(Authentication: OAuth) webserver_config.py 파일을 생성하여 관련 설정을 진행하는것을 확인할 수 있다. 먼저 webserver_config.py을 생성하여 AIRFLOW_HOME 경로 아래에 복사해 넣도록 하자. 만약 airflow를 Google, Github 또는 다른 OpenID 인증 서버에 연결해야 한다면 FAB 관련 홈페이지에서 세부 항목을 먼저 확인 후에 설정을 진행하도록 하자. 그리고 User의 속성 정보가 다르다면 oauth_user_info 함수에서 사용자 정보 매핑 부분을 수정하도록 하자.
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ webserver_config Referencies - https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth """ import os import logging import jwt from flask import redirect, session from flask_appbuilder import expose from flask_appbuilder.security.manager import AUTH_OAUTH from flask_appbuilder.security.views import AuthOAuthView from airflow.www.security import AirflowSecurityManager basedir = os.path.abspath(os.path.dirname(__file__)) log = logging.getLogger(__name__) MY_PROVIDER = 'keycloak' CLIENT_ID = 'client_id' CLIENT_SECRET = 'client_secret' KEYCLOAK_BASE_URL = 'https://keycloak.homelab.local/auth/realms/homelab/protocol/openid-connect/' KEYCLOAK_TOKEN_URL = 'https://keycloak.homelab.local/auth/realms/homelab/protocol/openid-connect/token' KEYCLOAK_AUTH_URL = 'https://keycloak.homelab.local/auth/realms/homelab/protocol/openid-connect/auth' AUTH_TYPE = AUTH_OAUTH AUTH_USER_REGISTRATION = True AUTH_USER_REGISTRATION_ROLE = "Public" AUTH_ROLES_SYNC_AT_LOGIN = True PERMANENT_SESSION_LIFETIME = 1800 AUTH_ROLES_MAPPING = { "airflow_admin": ["Admin"], "airflow_op": ["Op"], "airflow_user": ["User"], "airflow_viewer": ["Viewer"], "airflow_public": ["Public"], } OAUTH_PROVIDERS = [ { 'name': 'keycloak', 'icon': 'fa-circle-o', 'token_key': 'access_token', 'remote_app': { 'client_id': CLIENT_ID, 'client_secret': CLIENT_SECRET, 'client_kwargs': { 'scope': 'email profile' }, 'api_base_url': KEYCLOAK_BASE_URL, 'request_token_url': None, 'access_token_url': KEYCLOAK_TOKEN_URL, 'authorize_url': KEYCLOAK_AUTH_URL, }, }, ] class CustomAuthRemoteUserView(AuthOAuthView): @expose("/logout/") def logout(self): """Delete access token before logging out.""" return super().logout() class CustomSecurityManager(AirflowSecurityManager): authoauthview = CustomAuthRemoteUserView def oauth_user_info(self, provider, response): if provider == MY_PROVIDER: token = response["access_token"] me = jwt.decode(token, algorithms="RS256", verify=False) # sample of resource_access # { # "resource_access": { "airflow": { "roles": ["airflow_admin"] }} # } groups = me["resource_access"]["airflow"]["roles"] # unsafe # log.info("groups: {0}".format(groups)) if len(groups) < 1: groups = ["airflow_public"] else: groups = [str for str in groups if "airflow" in str] userinfo = { "username": me.get("preferred_username"), "email": me.get("email"), "first_name": me.get("given_name"), "last_name": me.get("family_name"), "role_keys": groups, } log.info("user info: {0}".format(userinfo)) return userinfo else: return {} SECURITY_MANAGER_CLASS = CustomSecurityManager APP_THEME = "simplex.css"
changing user_auth.py
user_auth.py는 REST API에서 OAuth를 사용할 경우 설정해야하는 항목이다. REST API에서 JWT Token을 사용할 필요가 없다면 설정을 진행하지 않아도 된다. 설정하지 않으면 ID, PASSWORD 방식인 심플 인증 방식을 사용하여 REST API를 호출해야 한다. user_auth.py 파일은 PYTHONPATH 환경 변수의 경로에 복사해두록 한다.
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ User authentication backend Referencies - https://flask-appbuilder.readthedocs.io/en/latest/_modules/flask_appbuilder/security/manager.html - https://github.com/apache/airflow/blob/main/airflow/api/auth/backend/basic_auth.py """ import logging import jwt from functools import wraps from typing import Any, Callable, Optional, Tuple, TypeVar, Union, cast from flask import Response, current_app, request from flask_appbuilder.const import AUTH_OAUTH, AUTH_LDAP, AUTH_DB from flask_appbuilder.security.sqla.models import User from flask_login import login_user CLIENT_AUTH: Optional[Union[Tuple[str, str], Any]] = None log = logging.getLogger(__name__) def init_app(_): """Initializes authentication backend""" T = TypeVar("T", bound=Callable) def auth_current_user() -> Optional[User]: """Authenticate and set current user if Authorization header exists""" ab_security_manager = current_app.appbuilder.sm user = None if ab_security_manager.auth_type == AUTH_OAUTH: token = request.headers['Authorization'] if token is None: return None tokeninfo = token.split() if tokeninfo is None or len(tokeninfo) < 2: return None me = jwt.decode(tokeninfo[1], algorithms="RS256", verify=False) groups = me["resource_access"]["airflow"]["roles"] # unsafe if len(groups) < 1: groups = ["airflow_public"] else: groups = [str for str in groups if "airflow" in str] userinfo = { "username": me.get("preferred_username"), "email": me.get("email"), "first_name": me.get("given_name"), "last_name": me.get("family_name"), "role_keys": groups, } user = ab_security_manager.auth_user_oauth(userinfo) else: auth = request.authorization if auth is None or not auth.username or not auth.password: return None if ab_security_manager.auth_type == AUTH_LDAP: user = ab_security_manager.auth_user_ldap(auth.username, auth.password) if ab_security_manager.auth_type == AUTH_DB: user = ab_security_manager.auth_user_db(auth.username, auth.password) log.info("user: {0}".format(user)) if user is not None: login_user(user, remember=False) return user def requires_authentication(function: T): """Decorator for functions that require authentication""" @wraps(function) def decorated(*args, **kwargs): if auth_current_user() is not None: return function(*args, **kwargs) else: return Response("Unauthorized", 401, {"WWW-Authenticate": "Basic"}) return cast(T, decorated)
changing airflow.cfg
user_auth.py를 auth_backend에 반영해야 하므로 airflow.cfg 파일의 auth_backend에 설정하도록 하자. auth_backend는 "[api]"에 위치한다.
[api] auth_backend = user_auth
initialize airflow db
airflow db를 초기화하여 설정을 반영한다.
$ airflow db init airflow-VW-bsevg 19:06:16 DB: sqlite:////Users/awslife/Projects/airflow/airflow.db [2021-09-20 19:06:22,466] {db.py:702} INFO - Creating tables INFO [alembic.runtime.migration] Context impl SQLiteImpl. INFO [alembic.runtime.migration] Will assume non-transactional DDL. WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted. /Users/awslife/.local/share/virtualenvs/airflow-VW-bsevg/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1Volume`. /Users/awslife/.local/share/virtualenvs/airflow-VW-bsevg/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`. Initialization done $
Login check Web UI and calling REST API
Web UI Login
airflow webserver를 실행하여 브라우져에서 로그인해보면 keycloak과 연동되어 로그인되는것을 확인할 수 있다.
calling REST API
keycloak 서버에서 token을 받아 REST API를 호출해보면 정상적으로 수행됨을 확인 가능하다.
$ TOKEN=$(curl -s -X POST https://keycloak.homelab.local/auth/realms/homelab/protocol/openid-connect/token \ --header 'Content-Type: application/x-www-form-urlencoded' \ --data-urlencode 'grant_type=password' \ --data-urlencode 'client_id=${client_id}' \ --data-urlencode 'client_secret=${client_secret}' \ --data-urlencode 'username=${username}' \ --data-urlencode 'password=${password}' | jq -r '.access_token') $ curl -s -X GET -H "application/json" -H "Authorization: Token $TOKEN " http://localhost:8080/api/v1/users | jq 19:26:54 { "total_entries": 3, "users": [ { "active": true, "changed_on": "2021-09-18T21:16:54.337525", "created_on": "2021-09-18T21:16:54.337517", "email": "awslife.song@gmail.com", "fail_login_count": 0, "first_name": "awslife", "last_login": "2021-09-20T19:26:56.399315", "last_name": "song", "login_count": 20, "roles": [ { "name": "Public" }, { "name": "Admin" } ], "username": "awslife" }, { "active": true, "changed_on": "2021-09-19T21:19:17.894711", "created_on": "2021-09-19T21:19:17.894471", "email": "airflow.op@homelab.local", "fail_login_count": 0, "first_name": "airflow", "last_login": "2021-09-20T19:22:25.654638", "last_name": "op", "login_count": 11, "roles": [ { "name": "Public" }, { "name": "Op" } ], "username": "airflow.op" }, { "active": true, "changed_on": "2021-09-19T21:21:06.515305", "created_on": "2021-09-19T21:21:06.515112", "email": "airflow.user@homelab.local", "fail_login_count": 0, "first_name": "airflow", "last_login": "2021-09-19T21:21:06.518191", "last_name": "user", "login_count": 1, "roles": [ { "name": "Public" }, { "name": "User" } ], "username": "airflow.user" } ] } $
마치며
airflow에 OAuth 인증을 붙이기 위해 삽질한 내용을 정리해보았습니다. 전체적인 내용을 보면 간단할 수 있지만 OAuth와 Flask에 대한 배경 지식이 없다면 설정에 상당한 시간을 소모할 것으로 생각됩니다. 이 글이 많은 도움이 되어 설정 보다는 아름다운 모델을 만드는데 많은 시간을 보내시길 바랍니다.
이 글을 읽고 감명은 받지 않으시더라도 쓸데 없는 시간을 많이 줄이셨다면 커피 한잔 부탁드립니다. 가난한 개발자 많은 도움 부탁드립니다.Referencies
반응형'IT' 카테고리의 다른 글
Continuous Machine Learning 환경을 위한 Lab 환경 구축하기 (0) 2021.09.27 Airflow Multi-tenant Guide (0) 2021.09.21 Kubernetes SSO with OIDC and Keycloak (0) 2021.08.20 ROOT CA 생성 및 Self Signed Certificate 만들기 (0) 2021.08.15 storage class in Kubernetes backed by Synology NFS (0) 2021.08.14 - Airfow >= 2.1.3