본문 바로가기

카테고리 없음

[GCP] API로 받은 정보 Cloud Composer를 이용해서 GCS에 적재하기 - 5

생각지도 못했던 오류들!

gcp에서는 덮어쓰기 기능이 없는듯하다.. 412 오류가 발생해서 이래저래 검색을 해보니, 똑같은 이름의 디렉터리에 파일명은 허용하지 않는 것 마냥. 오류가 계속해서 발생했다. 아니 왜 덮어쓰기가 안된단 말인가..😢😢😢 그래서 document도 뒤져보고 여러 가지를 해보았다. 나의 삽질을 기록해 본다.

나의 삽질들!

document를 보면서 오지게 삽질을 했는데! 여러 가지 함수들을 찾았다. 그리고 물론 모르는 것들도 다수 발견했다.

  1. 덮어쓰기 하는 방법 찾기
  2. 단위테스트 중요도를 너무 느끼게 되었다.
  3. json encoding이 깨지는 문제를 발견했다.
  4. scheduler 사용에 문제점 발견

덮어쓰기 하는 방법 찾기

GCS는 기본적으로 덮어쓰기를 금지하는 거 같은 느낌이 든다.. Object형태로 저장되기 때문일까..? 기본적으로 제공되는 update라는 함수와 update_storage_class라는 함수가 있지만 이건 storage_class를 변경하는 함수들이었다. 그리고 update 함수는 빌링 관련 업데이트 같고..

몇 가지를 찾아보니 upload_from_string이라는 함수가 있었다. 이것은 blob 파일 객체를 string의 형태로 만들어서 넣어주는데, 저장 및 변경이 가능하다고 한다.

다른 한 가지 방법으로는 blob.exists()라는 함수를 활용하면 그 해당 blob명이 존재할 경우 boolean값을 반환해 주는 함수가 존재했는데, 이 함수를 활용해서 적재하기 전에 내가 처음에 저장한 파일이 존재하는지 존재한다면 해당 파일을 지우고 다시 파일을 적재하는 코드를 짜볼까 했지만,, 빅데이터 관점으로 봤을 때 별로 효율적이지 못하다는 생각이 들었다.

하지만 string을 update 하는 건 효율적인가에 대해서 생각해 봤을 때 이것 역시 효율적이지 못하다고 생각은 변하지 않는다.. 방법을 알고 싶다 ㅠㅠ

unit test!

airflow도 unittest 하는 게 분명히 존재할 것이다. 하지만 나는 아직 그 정도는 못 도달했고, 함수 하나하나 테스트해보면서 하기로 결정했다. 왜냐하면 덮어쓰기 하는 방법들을 하나하나 적용하고 dag을 돌려보고 하기에는.. 너무 비효율적이었기 때문에..

local에서 테스트할 때는 IAM에 등록해야 되고 뭐 이래 저래 설정할게 많았다. 하지만 구글 코랩에서는 간단한 코드 몇 줄이면 바로 적용이 되어서 코랩으로 진행하였다.

from google.colab import auth
auth.authenticate_user()
print('Authenticated')

위에 코드를 돌리고 실행하면 인증 완료다.!

json 파일을 받아오는 코드를 돌려보았다.

update 하는 파일을 두고 테스트해보았다. gcs에는 해당파일이 존재한다. 원래 다시 파일을 업로드하면 오류가 발생하였다. upload_from_string함수를 사용하고 서는 오류가 발생하지 않았다.

from google.cloud import storage

def update_blob(bucket_name, blob_name, content):
    # Instantiate a client
    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.upload_from_string(json.dumps(content, ensure_ascii = False).encode(encoding="EUC-KR"))
    print(f"Blob {blob_name} in {bucket_name} was updated with new content: {content}.")

bucket_name = 'corded-palisade-378506-import'
blob_name = 'file.json'
file_path = './file.json'

# Example usage:
update_blob(bucket_name, blob_name, getJsondata())

getJsondata()란 함수를 만들어서 string으로 넣어줬더니 정상 작동한다! 하지만 세 번째 문제 파일이 전부 다 깨져있었다. 딱 봐도 encoding 문제라는 생각이 들었다.

encoding..

encoding을 하는 여러 방법이 있는 건 알겠다. 왜인지 모르겠지만 기존에 알고 있던 방법으로 다 잘 되지 않아서 여러 가지를 시도해 보았다.

  1. dumps() 안에 ensure_ascii = Faslse 넣기 , 이거 적용하니까 아스키코드로 변환이 안되긴 하였지만 원하는 게 아니었다.
  2. encode 함수를 사용해서 encoding을 UTF-8로 적용하였으나 원하는 형태로 바뀌지 않았고 EUC-KR로 변경하니 정상 작동되었다.

그리고 여러 가지를 확인해 보니 dump와 dumps의 차이가 존재함을 확인할 수 있었는데
dumps : Python dict object를 JSON 문자열로 변환할 수 있다.
dump : Json 파일에 write 할 때 사용할 수 있는 차이가 존재했다.

자 이제 airflow에 적용을 해보니 몇 번 시도해도 에러가 나지 않는다!! 이제 다 도달했다..

스케줄로 1시간에 한 번씩 다시 읽고 적재만 해보자!!!

Scheduling

기존의 코드에 scheduler만 바꿔주면 제대로 돌아가겠지 하고 시도했는데 웬일 계속 변경되지 않고 Scheduling에 계속해서 None이라고 나와있는 것이다.

확인해 보니 stack overflow에 default_args에 schedule_interval을 설정하면 되는 줄 알았는데 DAG에 None이라고 명시되어 있기 때문에 안 되는 것이었다. 그래서 보기 론 default_args에 설정하는 것보다 DAG 바로 옆에 property를 우선시하는 거처럼 보인다. 하지만 아니었다..ㅋㅋㅋ;;

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'schedule_interval':'*/20 * * * *',
    'start_date': datetime(2023, 3, 7, tzinfo=local_tz)
}

with DAG('facebook_to_gcs_dag', 
         default_args=default_args, 
         schedule_interval='None') as dag:

    t1 = PythonOperator(
        task_id='get_facebook_posts',
        python_callable=get_facebook_posts,
    )

    t2 = PythonOperator(
        task_id='upload_to_gcs',
        python_callable=update_blob,
        op_kwargs={
            'bucket_name': 'corded-palisade-378506-import',
            'blob_name': 'file.json',
            'content': get_facebook_posts()
        }
    )

t1 >> t2

그래서 아래 코드로 수정했다. 이러면 돌아가야지 했지만 scheduled의 날짜가 1 day 00:00:00으로 변경되어 있었다. 아니. 왜 적용이 안되는 거야.. 생각하고 기본적인 Airflow의 상태를 확인하는 곳의 코드를 확인해 보니 schedule_intervaldefault_args가 아닌 DAG 안에 설정되어 었었다

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'schedule_interval':'*/20 * * * *',
    'start_date': datetime(2023, 3, 7, tzinfo=local_tz)
}

with DAG('facebook_to_gcs_dag', 
         default_args=default_args, 
         ) as dag:

    t1 = PythonOperator(
        task_id='get_facebook_posts',
        python_callable=get_facebook_posts,
    )

    t2 = PythonOperator(
        task_id='upload_to_gcs',
        python_callable=update_blob,
        op_kwargs={
            'bucket_name': 'corded-palisade-378506-import',
            'blob_name': 'file.json',
            'content': get_facebook_posts()
        }
    )

t1 >> t2

그래서 코드를 다시 수정했다. 이렇게 수정하니까 원하는 데로 스케줄러가 작동하는 것을 확인할 수 있었다. 여기서 주의할 점은 처음에 20분씩 호출했는데, 하루 지나니까 호출할 수 있는 수가 초과돼버렸다.. ㅠㅠ 이런;;;

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 7, tzinfo=local_tz)
}

with DAG('facebook_to_gcs_dag', 
         default_args=default_args, 
         schedule_interval='@daily',
         max_active_runs=2,) as dag:

    t1 = PythonOperator(
        task_id='get_facebook_posts',
        python_callable=get_facebook_posts,
    )

    t2 = PythonOperator(
        task_id='upload_to_gcs',
        python_callable=update_blob,
        op_kwargs={
            'bucket_name': 'corded-palisade-378506-import',
            'blob_name': 'file.json',
            'content': get_facebook_posts()
        }
    )

결론

  1. meta graph api 사용
  2. cloud composer 사용 및 스케줄 등록 (다양한 덱들은 좀 더 진행 예정)
  3. encoding 방법 숙지
  4. cloud storage의 기능에 대해 습득

정도 숙지 된 거 같다. airflow 기반의 composer는 굉장히 강력한 기능인 거 같다. 하지만 scheduling 좀 조심해야 할 거 같고.. 어떻게 하면 더 나이스하게 작성할 수 있을지 고민해 보자!

개선할 점

  1. graph API 가 원하는 정보를 다 가져오지 않는 거 같다. 확인이 필요하다! page에 정보를 가지고 오고 싶은데 안된다.
  2. gcs로 가져오지 않고 바로 bigquery로 전송하는 파이프 라인을 만들어보자!