🕰️ 작성일 : 2024.10.21

Amazon Firehose를 활용한 아키텍처

기존의 Streaming platform(Spark Streaming, Flink)를 기반으로 Iceberg의 테이블에 실시간으로 반영하는 것이 아닌, Amazon Firehose를 활용하여 Aurora에서 변경되는 데이터를 Iceberg에 반영하는 아키텍처입니다. Amazon Firehose의 Iceberg 지원은 아직 서울 리전은 지원하지 않습니다. 서울 리전에 대한 확장은 서비스팀에 요청해두었습니다. 아래 테스트는 Virginia 리전에서 진행했습니다.

image.png

  1. Aurora의 binlog를 활성화시키고 DMS의 CDC 기능을 활용하여 Aurora의 변경 분을 추적합니다.
  2. DMS Replication 인스턴스가 Private Subnet에 위치 할 경우 Kinesis용 VPC Endpoint를 생성하고 Binlog를 Kinesis Data Streams에 적재합니다. 트랜잭션의 순서를 보장하기 위해 Shard의 수는 1개로 고정합니다. 샤드의 수를 늘릴 경우 향후 트랜잭션 순서 보장을 위한 후처리 과정이 필요합니다.
  3. binlog의 포맷을 Amazon Firehose의 Iceberg 기록을 위한 포맷으로 변경하며 Lambda를 기반으로 데이터를 전처리한 후 다시 Amazon Firehose에 기록합니다.
  4. 최종 변경 분을 Glue Data Catalog에 등록된 Iceberg 테이블에 기록합니다. 이 과정에서 Insert, Update, Delete를 처리 할 수 있으며 Binlog의 Metadata를 참고하여 Database, Table, Operation을 자동 구분하여 처리하게 됩니다.
  5. Iceberg의 특성으로 트랜잭션이 발생할 때 마다 Snapshot이 발생하며, 많은 스냅샷은 스토리지 비용의 향상과 읽기 쿼리의 성능 저하를 가져옵니다. Glue Catalog의 Compaction 기능을 활용하여 작은 파일을 큰 파일로 합치는 작업을 진행합니다. 이 외에도 Snapshot 삭제 및 참조하지 않는 파일 등 Iceberg 운영을 위한 작업을 Glue Data Catalog에서 제공합니다.

Amazon Firehose 구성

Lambda 코드

Kinesis Data Streams에서 전달 받은 Aurora Binlog를 Iceberg에 처리하기 위해 전처리 과정을 진행합니다. 전처리 중 'otfMetadata' 을 지정할 경우 Iceberg에 기록시 database, table, operation에 맞춰 Iceberg 테이블에 기록 할 수 있습니다.

import base64
import json

def lambda_handler(event, context):
    
    firehose_records_output = {}
    firehose_records_output['records'] = []

    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        parsed_data = json.loads(payload)
        
        data = json.dumps(parsed_data['data'])
        metadata = parsed_data['metadata']
        
        database = metadata['schema-name']
        table = metadata['table-name']
        operation = metadata['operation']
        
        firehose_record_output = {}
        
        firehose_record_output['data'] = base64.b64encode(data.encode('utf-8'))
        firehose_record_output['recordId'] =record['recordId']
        firehose_record_output['result'] =  'Ok'
        firehose_record_output['metadata'] = {
            'otfMetadata': {
                'destinationDatabaseName': database,
                'destinationTableName': table,
                'operation': operation
                }
            }
            
        firehose_records_output['records'].append(firehose_record_output)
        
    return firehose_records_output

테스트 하기

Iceberg 데이터베이스 및 테이블 생성하기

향후 Amazon Firehoes에서 records routing이 정상적으로 동작하는지 확인하기 위해 두 개의 Database 및 Table을 생성합니다. 아래 쿼리는 Athena에서 수행합니다.

CREATE DATABASE IF NOT EXISTS database1;
CREATE DATABASE IF NOT EXISTS database2;

CREATE TABLE database1.table1(
id int,
name string
)
LOCATION 's3://891377182095-iceberg-realtime/table1/'
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='parquet',
  'write_compression'='snappy',
  'optimize_rewrite_delete_file_threshold'='10'
);

CREATE TABLE database2.table2(
id int,
name string
)
LOCATION 's3://891377182095-iceberg-realtime/table2/'
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='parquet',
  'write_compression'='snappy',
  'optimize_rewrite_delete_file_threshold'='10'
);