생각지도 못했던 오류들!
gcp에서는 덮어쓰기 기능이 없는듯하다.. 412 오류가 발생해서 이래저래 검색을 해보니, 똑같은 이름의 디렉터리에 파일명은 허용하지 않는 것 마냥. 오류가 계속해서 발생했다. 아니 왜 덮어쓰기가 안된단 말인가..😢😢😢 그래서 document도 뒤져보고 여러 가지를 해보았다. 나의 삽질을 기록해 본다.
나의 삽질들!
document를 보면서 오지게 삽질을 했는데! 여러 가지 함수들을 찾았다. 그리고 물론 모르는 것들도 다수 발견했다.
- 덮어쓰기 하는 방법 찾기
- 단위테스트 중요도를 너무 느끼게 되었다.
- json encoding이 깨지는 문제를 발견했다.
- scheduler 사용에 문제점 발견
덮어쓰기 하는 방법 찾기
GCS는 기본적으로 덮어쓰기를 금지하는 거 같은 느낌이 든다.. Object형태로 저장되기 때문일까..? 기본적으로 제공되는 update
라는 함수와 update_storage_class라는
함수가 있지만 이건 storage_class를 변경하는 함수들이었다. 그리고 update
함수는 빌링 관련 업데이트 같고..
몇 가지를 찾아보니 upload_from_string이라는
함수가 있었다. 이것은 blob
파일 객체를 string의 형태로 만들어서 넣어주는데, 저장 및 변경이 가능하다고 한다.
다른 한 가지 방법으로는 blob.exists()라는
함수를 활용하면 그 해당 blob
명이 존재할 경우 boolean
값을 반환해 주는 함수가 존재했는데, 이 함수를 활용해서 적재하기 전에 내가 처음에 저장한 파일이 존재하는지 존재한다면 해당 파일을 지우고 다시 파일을 적재하는 코드를 짜볼까 했지만,, 빅데이터 관점으로 봤을 때 별로 효율적이지 못하다는 생각이 들었다.
하지만 string을 update 하는 건 효율적인가에 대해서 생각해 봤을 때 이것 역시 효율적이지 못하다고 생각은 변하지 않는다.. 방법을 알고 싶다 ㅠㅠ
unit test!
airflow도 unittest 하는 게 분명히 존재할 것이다. 하지만 나는 아직 그 정도는 못 도달했고, 함수 하나하나 테스트해보면서 하기로 결정했다. 왜냐하면 덮어쓰기 하는 방법들을 하나하나 적용하고 dag을 돌려보고 하기에는.. 너무 비효율적이었기 때문에..
local에서 테스트할 때는 IAM에 등록해야 되고 뭐 이래 저래 설정할게 많았다. 하지만 구글 코랩에서는 간단한 코드 몇 줄이면 바로 적용이 되어서 코랩으로 진행하였다.
from google.colab import auth
auth.authenticate_user()
print('Authenticated')
위에 코드를 돌리고 실행하면 인증 완료다.!
json 파일을 받아오는 코드를 돌려보았다.
update 하는 파일을 두고 테스트해보았다. gcs에는 해당파일이 존재한다. 원래 다시 파일을 업로드하면 오류가 발생하였다. upload_from_string
함수를 사용하고 서는 오류가 발생하지 않았다.
from google.cloud import storage
def update_blob(bucket_name, blob_name, content):
# Instantiate a client
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_string(json.dumps(content, ensure_ascii = False).encode(encoding="EUC-KR"))
print(f"Blob {blob_name} in {bucket_name} was updated with new content: {content}.")
bucket_name = 'corded-palisade-378506-import'
blob_name = 'file.json'
file_path = './file.json'
# Example usage:
update_blob(bucket_name, blob_name, getJsondata())
getJsondata()
란 함수를 만들어서 string으로 넣어줬더니 정상 작동한다! 하지만 세 번째 문제 파일이 전부 다 깨져있었다. 딱 봐도 encoding 문제라는 생각이 들었다.
encoding..
encoding을 하는 여러 방법이 있는 건 알겠다. 왜인지 모르겠지만 기존에 알고 있던 방법으로 다 잘 되지 않아서 여러 가지를 시도해 보았다.
dumps() 안에
ensure_ascii = Faslse 넣기 , 이거 적용하니까 아스키코드로 변환이 안되긴 하였지만 원하는 게 아니었다.- encode 함수를 사용해서 encoding을 UTF-8로 적용하였으나 원하는 형태로 바뀌지 않았고
EUC-KR
로 변경하니 정상 작동되었다.
그리고 여러 가지를 확인해 보니 dump와 dumps의 차이가 존재함을 확인할 수 있었는데
dumps : Python dict object를 JSON 문자열로 변환할 수 있다.
dump : Json 파일에 write 할 때 사용할 수 있는 차이가 존재했다.
자 이제 airflow에 적용을 해보니 몇 번 시도해도 에러가 나지 않는다!! 이제 다 도달했다..
스케줄로 1시간에 한 번씩 다시 읽고 적재만 해보자!!!
Scheduling
기존의 코드에 scheduler만 바꿔주면 제대로 돌아가겠지 하고 시도했는데 웬일 계속 변경되지 않고 Scheduling에 계속해서 None이라고 나와있는 것이다.
확인해 보니 stack overflow에 default_args에 schedule_interval을 설정하면 되는 줄 알았는데 DAG에 None이라고
명시되어 있기 때문에 안 되는 것이었다. 그래서 보기 론 default_args에 설정하는 것보다 DAG 바로 옆에 property를 우선시하는 거처럼 보인다. 하지만 아니었다..ㅋㅋㅋ;;
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'schedule_interval':'*/20 * * * *',
'start_date': datetime(2023, 3, 7, tzinfo=local_tz)
}
with DAG('facebook_to_gcs_dag',
default_args=default_args,
schedule_interval='None') as dag:
t1 = PythonOperator(
task_id='get_facebook_posts',
python_callable=get_facebook_posts,
)
t2 = PythonOperator(
task_id='upload_to_gcs',
python_callable=update_blob,
op_kwargs={
'bucket_name': 'corded-palisade-378506-import',
'blob_name': 'file.json',
'content': get_facebook_posts()
}
)
t1 >> t2
그래서 아래 코드로 수정했다. 이러면 돌아가야지 했지만 scheduled의 날짜가 1 day 00:00:00
으로 변경되어 있었다. 아니. 왜 적용이 안되는 거야.. 생각하고 기본적인 Airflow의 상태를 확인하는 곳의 코드를 확인해 보니 schedule_interval
이 default_args
가 아닌 DAG 안에 설정되어 었었다
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'schedule_interval':'*/20 * * * *',
'start_date': datetime(2023, 3, 7, tzinfo=local_tz)
}
with DAG('facebook_to_gcs_dag',
default_args=default_args,
) as dag:
t1 = PythonOperator(
task_id='get_facebook_posts',
python_callable=get_facebook_posts,
)
t2 = PythonOperator(
task_id='upload_to_gcs',
python_callable=update_blob,
op_kwargs={
'bucket_name': 'corded-palisade-378506-import',
'blob_name': 'file.json',
'content': get_facebook_posts()
}
)
t1 >> t2
그래서 코드를 다시 수정했다. 이렇게 수정하니까 원하는 데로 스케줄러가 작동하는 것을 확인할 수 있었다. 여기서 주의할 점은 처음에 20분씩 호출했는데, 하루 지나니까 호출할 수 있는 수가 초과돼버렸다.. ㅠㅠ 이런;;;
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime(2023, 3, 7, tzinfo=local_tz)
}
with DAG('facebook_to_gcs_dag',
default_args=default_args,
schedule_interval='@daily',
max_active_runs=2,) as dag:
t1 = PythonOperator(
task_id='get_facebook_posts',
python_callable=get_facebook_posts,
)
t2 = PythonOperator(
task_id='upload_to_gcs',
python_callable=update_blob,
op_kwargs={
'bucket_name': 'corded-palisade-378506-import',
'blob_name': 'file.json',
'content': get_facebook_posts()
}
)
결론
- meta graph api 사용
- cloud composer 사용 및 스케줄 등록 (다양한 덱들은 좀 더 진행 예정)
- encoding 방법 숙지
- cloud storage의 기능에 대해 습득
정도 숙지 된 거 같다. airflow 기반의 composer는 굉장히 강력한 기능인 거 같다. 하지만 scheduling 좀 조심해야 할 거 같고.. 어떻게 하면 더 나이스하게 작성할 수 있을지 고민해 보자!
개선할 점
- graph API 가 원하는 정보를 다 가져오지 않는 거 같다. 확인이 필요하다! page에 정보를 가지고 오고 싶은데 안된다.
- gcs로 가져오지 않고 바로 bigquery로 전송하는 파이프 라인을 만들어보자!