IT

Airflow Multi-tenant Guide

concave 2021. 9. 21. 20:00
반응형

Airflow Multi-tenant?

airflow는 Web Server, Scheduler 그리고 Worker로 구성되어 있으며 기본적으로 제공하는 권한은 Admin, Op, User, Viewer, Public로 5가지이다. Airflow에 접속하여 Dag를 실행하는 그룹이 하나밖에 없다면 크게 상관이 없지만 사용자 또는 그룹이 대규모라면 그룹 권한 정책을 적용하지 않으면 개발보다는 운영에 많은 시간을 소비하게 된다. 경험상 이러한 루틴의 반복은 개발과 운영의 라이프 사이클을 떨어뜨리는 요소로 작용하고 시스템 업그레이드와 패치에 상당한 준비시간이 필요하게하고 결국에는 업그레이드와 패치가 불가능하게되어 시스템 운영에 초점을 맞추어 개발과 운영이 진행되게 된다. 

 

Airflow architecture overview

이글에서는 Airflow login with oidc 적용 글에 이어서 OIDC의 Role을 활용한 multi-tenacy 구현 방법을 설명하려고 한다. Airflow를 Data Engineering과 Data Science 그룹에서 사용하는것으로 가정하고 글을 작성하였다. 이번 내용도 마찬가지로 관련 내용을 찾기 힘들어 전체적인 구현과 테스트에 상당한 시간이 소모되었다.

 

Prerequisite

  • Airfow >= 2.1.3
  • Keycloak >= 14.0.0
  • Python > 3.7

 

Configuring Keycloak

airflow에서 사용할 multi-tenancy 그룹에 매핑되는 Role을 먼저 생성한다. 이 글에서는 Data Engineering과 Data Science팀에서 사용하는것으로 가정하였으므로 data_engineering과 data_science role을 생성하였다.

Role은 Realm -> Configure -> Clients에서 airflow client의 roles 탭에서 add role 버튼으로 생성 가능하다. 생성하고 나면 아래와 같이 2개의 role이 추가되어 표시된다.

Role이 생성완료되었으면 Role에 대응되는 그룹을 생성하도록 한다. 그룹은 Realm -> Manage -> Groups에서 New 버튼으로 생성가능하다. 정상적으로 생성 완료되었으면 Groups 트리에 추가한 그룹이 표시된다. 

이번에는 그룹에 Role을 매핑하도록 한다. role을 사용자에게 바로 바인딩해서 사용해도 되지만 쉬운 관리를 위해서는 role을 그룹에 매핑하고 그룹에 사용자를 추가하거나 제거하여 관리하면 더욱 편리하다. 위에서 만든 2개의 그룹의 Role Mappings 탭으로 이동하여 Role을 매핑하도록 한다. role이 정상적으로 바인딩되었다면 Assigned roles에 표시되는걸 확인할 수 있다.

group에 role을 매핑하는것까지 완료되었다면 이번에는 group에 사용자를 추가하도록 하자. group에 사용자가 추가되는것까지 완료되어야 JWT(Jason Web Token)에서 role을 사용할 수 있다.

 

Configuring Airflow

airflow에서 사용할 role과 group 설정이 완료되었으면 airflow에서 해당 role에 대한 권한을 설정하도록 한다. 이 글에서는 최종 목표가 airflow의 multi-tenancy이므로 전체 적인 설정이 완료되었다면 data engineering 팀은 data_engineering_k8spodoperator만 보여야하며 data science팀은 data_science_k8spodoperator만 보여야한다. 지금은 Admin role 계정으로 로그인하여 2개가 모두 표시된다.

먼저 Airflow에서 해당 role을 먼저 생성하도록 한다. role의 권한은 user role을 복사하여 사용하며 Security -> List Roles에서 User role을 선택 후 Actions에서 Copy Role 을 클릭한다.

role 복사가 완료되면 복사된 role을 선택 후 Edit Record 버튼을 클릭하여 role 이름을 위에서 설정한 data_engineering으로 설정하고 role 권한에서 "can read on DAGs"와 "can edit on DAGs"를 제거하도록 한다.

삭제가 완료되면 "can read on DAG:data_engineering_k8spodoperator"와 "can edit on DAG:data_engineering_k8spodoperator"를 추가하여 data_engineering_k8spodoperator DAG에 대한 read와 edit 권한을 추가한다.

data_engineering role에 대한 설정이 완료되면 이어서 data_science role에 대한 작업도 진행하도록 한다. 동일한 설정 작업이므로 설정과 이미지 캡쳐는 생략하였다. 

 

Edit webserver_config.py

이번에도 Airflow login with oidc 적용 글과 동일하게 oidc에 설정된 role을 airflow에 설정이 되도록 webserver_config.py 파일 수정해주어야한다. auth_user_oauth function이 추가되었으며 role을 검색하여 추가하는 function이다. 전체 소스는 아래와 같다.

 

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
   webserver_config
   Referencies
     - https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
"""

import os
import logging
import jwt

from flask import redirect, session
from flask_appbuilder import expose
from flask_appbuilder.security.manager import AUTH_OAUTH
from flask_appbuilder.security.views import AuthOAuthView
from flask_appbuilder.security.sqla.models import Role

from airflow.www.security import AirflowSecurityManager

basedir = os.path.abspath(os.path.dirname(__file__))
log = logging.getLogger(__name__)

MY_PROVIDER = 'keycloak'

AUTH_TYPE = AUTH_OAUTH
AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = "Public"
AUTH_ROLES_SYNC_AT_LOGIN = True
AUTH_ROLE_PREFIX='aiplatform'

PERMANENT_SESSION_LIFETIME = 1800

AUTH_ROLES_MAPPING = {
  "airflow_admin": ["Admin"],
  "airflow_op": ["Op"],
  "airflow_user": ["User"],
  "airflow_viewer": ["Viewer"],
  "airflow_public": ["Public"],
}

OAUTH_PROVIDERS = [
  {
   'name': 'keycloak',
   'icon': 'fa-circle-o',
   'token_key': 'access_token', 
   'remote_app': {
     'client_id': 'airflow',
     'client_secret': 'dfd9c0d6-ac73-41c1-a97f-5b05948cee0e',
     'client_kwargs': {
       'scope': 'email profile'
     },
     'api_base_url': 'https://cumulus.koreacentral.cloudapp.azure.com/auth/realms/homelab/protocol/openid-connect/',
     'request_token_url': None,
     'access_token_url': 'https://cumulus.koreacentral.cloudapp.azure.com/auth/realms/homelab/protocol/openid-connect/token',
     'authorize_url': 'https://cumulus.koreacentral.cloudapp.azure.com/auth/realms/homelab/protocol/openid-connect/auth',
    },
  },
]

class CustomAuthRemoteUserView(AuthOAuthView):
  @expose("/logout/")
  def logout(self):
    """Delete access token before logging out."""
    return super().logout()

class CustomSecurityManager(AirflowSecurityManager):
  authoauthview = CustomAuthRemoteUserView

  def oauth_user_info(self, provider, response):
    if provider == MY_PROVIDER:
      token = response["access_token"]
      me = jwt.decode(token, algorithms="RS256", verify=False)
      # {
      #   "resource_access": { "airflow": { "roles": ["airflow_admin"] }}
      # }
      groups = me["resource_access"]["airflow"]["roles"] # unsafe
      # log.info("groups: {0}".format(groups))
      if len(groups) < 1:
        groups = ["airflow_public"]
      # else:
      #   groups = [str for str in groups if "aiplatform" in str]
      # role_objects = [str for str in groups if "aiplatform" in str]

      userinfo = {
        "username": me.get("preferred_username"),
        "email": me.get("email"),
        "first_name": me.get("given_name"),
        "last_name": me.get("family_name"),
        "role_keys": groups,
        # "role_objects": role_objects,
      }
      log.info("user info: {0}".format(userinfo))
      return userinfo
    else:
      return {}
  
  def auth_user_oauth(self, userinfo):
    """
        Method for authenticating user with OAuth.
        :userinfo: dict with user information
                    (keys are the same as User model columns)
    """
    # extract the username from `userinfo`
    if "username" in userinfo:
      username = userinfo["username"]
    elif "email" in userinfo:
      username = userinfo["email"]
    else:
      log.error(
          "OAUTH userinfo does not have username or email {0}".format(userinfo)
      )
      return None

    # If username is empty, go away
    if (username is None) or username == "":
      return None
    
    # Search the DB for this user
    user = self.find_user(username=username)

    # If user is not active, go away
    if user and (not user.is_active):
      return None

    # If user is not registered, and not self-registration, go away
    if (not user) and (not self.auth_user_registration):
      return None

    # Sync the user's roles
    if user and self.auth_roles_sync_at_login:
      user_role_objects = set()
      user_role_objects.add(self.find_role(AUTH_USER_REGISTRATION_ROLE))
      
      for item in userinfo.get("role_keys", []):
        fab_role = self.find_role(item)
        if fab_role:
          user_role_objects.add(fab_role)
        user.roles = list(user_role_objects)

      log.debug(
          "Calculated new roles for user='{0}' as: {1}".format(
              username, user.roles
          )
      )

    # If the user is new, register them
    if (not user) and self.auth_user_registration:
      user_role_objects = set()
      user_role_objects.add(self.find_role(AUTH_USER_REGISTRATION_ROLE))

      for item in userinfo.get("role_keys", []):
        fab_role = self.find_role(item)
        if fab_role:
          user_role_objects.add(fab_role)
      
      user = self.add_user(
        username=username,
        first_name=userinfo.get("first_name", ""),
        last_name=userinfo.get("last_name", ""),
        email=userinfo.get("email", "") or f"{username}@email.notfound",
        role=list(user_role_objects),
      )
      log.debug("New user registered: {0}".format(user))

      # If user registration failed, go away
      if not user:
        log.error("Error creating a new OAuth user {0}".format(username))
        return None

    # LOGIN SUCCESS (only if user is now registered)
    if user:
      self.update_user_auth_stat(user)
      return user
    else:
      return None

SECURITY_MANAGER_CLASS = CustomSecurityManager

APP_THEME = "simplex.css"

 

소스 변경 후 airflow를 webserver를 다시 실행하도록 한다. data engineering 관련 계정으로 로그인하면 data_engineering_k8spodoperator만 표시된다.

 

마치며

Airflow에 OIDC를 연동하는 방법을 2회에 걸쳐 정리해보았습니다. 구글과 스택오버플로우에서 자료를 찾기 힘들어 내용 정리에 생각보다 시간이 많이 소요되었습니다. 관련 내용을 찾고 계시는분들에게 많은 도움이 되었으면 좋겠습니다. 그리고 해당 내용으로 시간을 많이 아끼신 분들께서는 커피 한잔 기부 부탁드립니다. (동네 까페에서 아이스아메리카노 먹으면서 글을 작성할 수 있도록 도움 부탁드립니다.)

 

그리고 구직도 하고 있습니다. ㅠㅠ 관심있으신분들은 댓글 부탁드립니다.

 

donaricano-btn

 

Referencies

https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html

 

Architecture Overview — Airflow Documentation

 

airflow.apache.org

https://stackoverflow.com/questions/68123460/airflow-2-0-1-filter-dags-by-owner

 

 

Airflow 2.0.1 filter DAGs by owner

How can I restrict the DAGs that are related to a particular owner in airflow 2.0.1? I have enabled the enter image description here

stackoverflow.com

https://www.youtube.com/watch?v=s6TygwvYdN0&t=812s

 

반응형