🕰️ 작성일 : 2024.10.21
기존의 Streaming platform(Spark Streaming, Flink)를 기반으로 Iceberg의 테이블에 실시간으로 반영하는 것이 아닌, Amazon Firehose를 활용하여 Aurora에서 변경되는 데이터를 Iceberg에 반영하는 아키텍처입니다. Amazon Firehose의 Iceberg 지원은 아직 서울 리전은 지원하지 않습니다. 서울 리전에 대한 확장은 서비스팀에 요청해두었습니다. 아래 테스트는 Virginia 리전에서 진행했습니다.
Transform records
을 활성화하고 아래의 코드로 생성 된 Lambda를 지정합니다. Lambda가 처리할 Buffer Size 및 Buffer Interval을 설정하여 Lambda가 호출되는 주기를 조절 할 수 있습니다.
Destination settings
에서 Iceberg 테이블에 기록하기 위한 Buffer Size 및 Buffer Interval을 설정합니다.Unique key configuration
을 설정해야 합니다. 목적지 Database, Table, Unique Key(컬럼 명)을 입력합니다. 또는 Iceberg의 identifier-filed-ids를 지정할 수도 있습니다.
Kinesis Data Streams에서 전달 받은 Aurora Binlog를 Iceberg에 처리하기 위해 전처리 과정을 진행합니다. 전처리 중 'otfMetadata'
을 지정할 경우 Iceberg에 기록시 database, table, operation에 맞춰 Iceberg 테이블에 기록 할 수 있습니다.
import base64
import json
def lambda_handler(event, context):
firehose_records_output = {}
firehose_records_output['records'] = []
for record in event['records']:
payload = base64.b64decode(record['data']).decode('utf-8')
parsed_data = json.loads(payload)
data = json.dumps(parsed_data['data'])
metadata = parsed_data['metadata']
database = metadata['schema-name']
table = metadata['table-name']
operation = metadata['operation']
firehose_record_output = {}
firehose_record_output['data'] = base64.b64encode(data.encode('utf-8'))
firehose_record_output['recordId'] =record['recordId']
firehose_record_output['result'] = 'Ok'
firehose_record_output['metadata'] = {
'otfMetadata': {
'destinationDatabaseName': database,
'destinationTableName': table,
'operation': operation
}
}
firehose_records_output['records'].append(firehose_record_output)
return firehose_records_output
Iceberg 데이터베이스 및 테이블 생성하기
향후 Amazon Firehoes에서 records routing이 정상적으로 동작하는지 확인하기 위해 두 개의 Database 및 Table을 생성합니다. 아래 쿼리는 Athena에서 수행합니다.
CREATE DATABASE IF NOT EXISTS database1;
CREATE DATABASE IF NOT EXISTS database2;
CREATE TABLE database1.table1(
id int,
name string
)
LOCATION 's3://891377182095-iceberg-realtime/table1/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_compression'='snappy',
'optimize_rewrite_delete_file_threshold'='10'
);
CREATE TABLE database2.table2(
id int,
name string
)
LOCATION 's3://891377182095-iceberg-realtime/table2/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_compression'='snappy',
'optimize_rewrite_delete_file_threshold'='10'
);