Links: https://delta.io/learn/getting-started/ https://www.datacamp.com/pt/tutorial/delta-lake
https://datawaybr.medium.com/como-sair-do-zero-no-delta-lake-em-apenas-uma-aula-d152688a4cc8
Procedimento inicial para utilizar o delta lake com MinIO
-
O que é Delta Lake
Delta Lake é um framework de armazenamento transacional construído sobre o Apache Spark e projetado para melhorar a confiabilidade, qualidade e desempenho do Data Lake. Ele resolve problemas comuns em lagos de dados tradicionais, como arquivos corrompidos, falta de transações ACID e gerenciamento de versões dos dados (viagem no tempo).
-
O que é MinIO
é um armazenamento compatível com a API S3 da AWS, o que permite simular um ambiente similar ao que rodaria na nuvem
-
Passo 1
Criar uma pasta para desenvolver o projetinho Ex: dev/spark-docker
Para facilitar é possível abrir essa pasta diretamente no VsCode, em seguida criar o arquivo: docker-compose.yml e colocar o seguinte código:
version: "3.9"
services:
spark-master:
image: bitnami/spark:latest
container_name: spark-master
hostname: spark-master
ports:
- "8080:8080" # UI do Spark Master
- "7077:7077" # Porta do Spark Master
user: root
environment:
- SPARK_MODE=master
- SPARK_MASTER_HOST=spark-master
volumes:
- ./app:/opt/spark/app:rw
- ./app/libs:/opt/spark/libs # Volume para os JARs
networks:
- spark-network
spark-worker:
image: bitnami/spark:latest
container_name: spark-worker
hostname: spark-worker
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
networks:
- spark-network
minio:
image: minio/minio
container_name: minio
ports:
- "9000:9000"
- "9090:9090"
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: admin123
command: server /data --console-address ":9090"
volumes:
- minio-data:/data
networks:
- spark-network
python:
build: .
container_name: python
hostname: python
working_dir: /app
volumes:
- ./app:/app
networks:
- spark-network
depends_on:
- spark-master
- minio
entrypoint: ["tail", "-f", "/dev/null"]
volumes:
minio-data:
networks:
spark-network:
driver: bridge
Com isso será possível criar os container do MinIO, Spark e Python com suas respectivas imagens. Um adendo para o Python, é que ele não está instalado localmente, assim ele usará build do Dockerfile para instalar o pySpark toda vez que o container iniciar, pois ele usa a imagem bitnami/spark:latest que já vem com o Java e Spark instalados:
FROM bitnami/spark:latest
USER root
RUN pip install pyspark==3.5.3 delta-spark==3.3.0
WORKDIR /app
VOLUME ["/app"]
CMD ["python", "/app/teste_delta.py"]
Se atentar com a compatibilidade das versões do PySpark e do Delta lake, pois a não compatibilidade pode causar diversos erros na execução das bibliotecas.
Outro adendo é com relação a seguinte parte do código:
networks:
spark-network:
driver: bridge
Essa parte define a network dos serviços e para evitar erro todos devem utilizar a mesma network que é spark-network. abaixo estão alguns comandos para testar e realizar verificações da network Verificar se todos os contêineres estão na mesma rede:
docker network inspect spark-network
Recriar os containers e a rede:
O primeiro comando vai remover os contêineres existentes, volumes e órfãos (caso haja contêineres antigos que não estão mais sendo utilizados), e o segundo vai iniciar os contêineres novamente, criando a rede spark-network.
docker-compose down --volumes --remove-orphans
Criar rede manualmente
docker network create spark-network
Verificar redes existentes:
docker network ls
Remover rede:
docker network rm spark-network
verificar em quais redes os contêineres estão conectados:
docker ps --format '{{.Names}}: {{.Networks}}'
Após criar a rede corretamente é necessário subir os containers novamente:
docker-compose up -d
Com esse comando os containers definidos no arquivo docker serão iniciados.
Caso os comandos e alterações na rede não funcione pode ser necessário reiniciar o docker
sudo systemctl restart docker
docker-compose down
docker-compose up
Quando realizar uma alteração no docker compose execute os seguintes comandos para reiniciar os containers para aplicar as alterações:
docker-compose down
docker-compose up --build -d
Reconstruir a imagem
docker build -t spark-docker .
Com a configurações feitas, vamos testar com um arquivo de teste utilizando pySpark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DeltaLakeMinIO") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.master", "spark://spark-master:7077") \
.config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
.config("spark.hadoop.fs.s3a.access.key", "admin") \
.config("spark.hadoop.fs.s3a.secret.key", "admin123") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.getOrCreate()
#spark.sparkContext.setLogLevel("DEBUG")
try:
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
data.write.format("delta").mode("overwrite").save("s3a://meu-bucket/delta-table")
print("✅ Dados salvos com sucesso no Delta Lake no MinIO!")
except Exception as e:
print(f"Erro ao salvar no Delta Lake: {e}")
finally:
spark.stop()
A parte inicial do código irá importar o pySpark em seguida define o Delta lake o endereço de comunicação com o MinIO e as credenciais de acesso, para acessar a simulação da S3 da Aws o MinIO acesse: http://localhost:9090.
O código realiza a criação de um data frame com nome e id e em seguida formata para o delta lake e salva no caminho s3a://meu-bucket/delta-table
Dica: No MinIO, é necessário criar o bucket manualmente antes de salvar os dados.
Então como ele esta configurando para salvar no “meu-bucket” é preciso criar o esse bucket no MinIO (talvez definir o acesso para público).
Para executar o código pyhon é necessário entrar no container
docker exec -it python bash
Em seguida executar o script
python /app/teste_delta.py
Adicionando novos dados
delta_table_path = "s3a://meu-bucket/delta-table"
delta_table = DeltaTable.forPath(spark, delta_table_path)
new_data = spark.createDataFrame([(11, "Ana Clara"), (2, "Bob Antonio Fernandes")], ["id", "name"]).dropDuplicates(["id"])
if not DeltaTable.isDeltaTable(spark, delta_table_path):
new_data.write.format("delta").mode("overwrite").save(delta_table_path)
else:
print("✅ A tabela Delta já existe. Prosseguindo com o merge...")
print("📊 Dados atuais antes do merge:")
delta_table.toDF().show()
Aqui utilizando o merge do delta ele irá fazer uma comparação de acordo com a chave id dos dados já inseridos com os novos. Caso seja dados novos ele irá adicionar, caso a chave id corresponda a algum id da tabela dado ele irá fazer o update desse dado nesse sendo de Alice para Ana Alice e Bob Antonio para Bob Antonio Fernandes. Já o id 5 com o José será um novo dado introduzido na tabela.
delta_table = DeltaTable.forPath(spark, delta_table_path)
(
delta_table.alias("dados_atuais")
.merge(
new_data.alias("novos_dados"),
"dados_atuais.id = novos_dados.id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
O seguinte código utiliza a função history do delta que apresenta o histórico de tudo o que foi feito na tabela (como: WRITE, DELETE, MERGE) em cada versão da tabela. Sendo possível acessar versões anteriores de uma tabela (Conceito de viagem no tempo).
df_delta = delta_table.history()
(
df_delta
.select("version", "timestamp", "operation", "operationMetrics")
.show(truncate=False, vertical=True)
)
Viagem no tempo - visualizando tabelas de diferentes versões Ex com Versão 0:
(
spark
.read
.format("delta")
.option("versionAsOf", 8)
.load("s3a://meu-bucket/delta-table")
.show()
)