Data Quality em arquivos CSV

Lucas Misael
10 min readJan 31, 2023

--

O que é exatamente big data?

A definição de big data são dados que contêm maior variedade, chegando em volumes crescentes e com mais velocidade. Isso também é conhecido como os três Vs.

Simplificando, big data é um conjunto de dados maior e mais complexo, especialmente de novas fontes de dados. Esses conjuntos de dados são tão volumosos que o software tradicional de processamento de dados simplesmente não consegue gerenciá-los. No entanto, esses grandes volumes de dados podem ser usados para resolver problemas de negócios que você não conseguiria resolver antes. Mas para não aprofundarmos muito nesse assunto, eu escrevi um pouco melhor sobre isso aqui nesse artigo.

Agora você vai me perguntar, Tá Lucas, se não vamos falar sobre Big Data, porque iniciamos falando sobre isso nesse post?

Porque em um dos V’s de Big Data temos um ponto muito interessante para o nosso post, que é a variedade de dados. E isso não quer dizer apenas que seus dados vão vir com textos e formas diferentes, mas sim que no mundo de Big data, não sabemos ao certo como será nossa fonte de dados, e isso basicamente implica que podemos ter dados vindo de um banco relacional, banco não relacional, (até ai tudo bem né?!), mas podemos ter algumas outras fontes de dados, como um áudio, vídeo, e até mesmo arquivos, que muitas das vezes, tendem a não ter um padrão.

E quando falamos em arquivos já posso imaginar os engenheiros de dados assim:

Eu te entendo amigo, eu também não gostava muito de trabalhar com arquivos manuais (sejam eles csv, xlsx, txt…) rsrsr

Mas quando comecei a trabalhar aqui no Boticário, comecei a ter outra visão em se trabalhar com arquivos manuais, onde não os vejo mais como algo ruim, sem padrão e que sempre vão dar problemas ao pipeline, e sim como uma oportunidade de iniciarmos o projeto, ou darmos continuidade, conseguindo levar dados de valor ao time de negócio, e no final das contas é isso que buscamos né? Levar valor aos nossos stakeholders com nossos lindos Pipelines.

Bom, mas é inegável que muitas das vezes podemos sim ter problemas com arquivos manuais, e normalmente isso acontece pois por algum motivo o arquivo não vem mais como foi pensado, planejado e incorporado no inicio do projeto, e isso muitas das vezes ocasionará em erros em nosso pipeline.

E acredite, já passamos bastante por isso aqui, e foi por esse motivo que o time de engenharia de dados do Boticário criou formas para antecipar esses problemas, com data quality de 1° ordem, analisando os dados de origem no momento de utiliza-los. Porém ainda sim tínhamos bastante problemas com os dados lá na origem, pois como dito, muitas das vezes o arquivo enviado na pasta raiz vinha com campos faltantes, ou com formatos não esperados pela aplicação. Pensando nisso, criei uma aplicação utilizando Cloud Functions no GCP (Google Cloud Platform) em Python, para tentar auxiliar com esses problemas, onde pode ser usada em qualquer lugar, apenas passando por parâmetro algumas informações, e enviando uma requisição https, para os que deseja validar. Sabemos que já existem várias bibliotecas que fazem validações para data quality, porém decidimos ir por esse caminho para termos uma maior flexibilidade.

Ps.: Para facilitar nosso desenvolvimento, vamos construir esse dataquality sem subir para as cloud functions, mas com a mesma estrutura.

Então para darmos início, quais as validações que buscaremos resolver com essa aplicação e como iremos dividir nossas classes?

Para facilitar o desenvolvimento, e até mesmo a manutenção do nosso código, iremos dividir nossa aplicação em módulos, assim facilitará muito a visualização e responsabilidade de cada classe.

A primeira classe que iremos criar chamaremos de ModuleEncode, ela será responsável, por validar o encode de um arquivo CSV. Para esse caso em específico, estaremos lendo um arquivo de um bucket (Cloud Storage do Google Cloud Platform).

Falando em bucket, bora criar um arquivo CSV para guardarmos em nosso bucket?

Criei esse arquivo com apenas 3 linhas, contendo um campo inteiro e um campo de data, e salvaremos esse arquivo em uma pasta dentro do nosso bucket. o arquivo ficara assim:

Buckets/seu_bucket/teste_dataquality/arquivo_teste.csv

Para funcionar nosso método receberá por parâmetro as informações pertinentes ao arquivo, como, qual é o bucket, o nome do arquivo, projeto que o bucket foi criado, e qual o encode esperado para o arquivo que esta sendo validado.

def check_encode(bucket_name: str, file_name: str, project: str, expected_encoding: str)-> dict:

O primeiro passo dessa função, será criar uma equivalência de tipos de encode, ou seja, quando for esperado um arquivo com o encode utf-8, os tipos aceitos serão (ascii, utf-8-sig…), e assim você pode criar as equivalências que desejar.

file_equivalence = {
'utf-8': ['ascii', 'utf-8-sig'],
'utf-8-sig': ['ascii', 'utf-8']
}

Feito isso, precisaremos ler efetivamente o arquivo, para que possamos continuar, para isso estaremos usando o blob do Google, que é uma lib que nos permite fazer a leitura de arquivos direto do Storage. (Documentação do client aqui).

storage_client = storage.Client(project=project)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
raw_data = blob.open("rb").read()

Para a validação do encode do arquivo, utilizaremos uma biblioteca chamada Chardet, que é uma biblioteca open source onde apenas passamos o objeto com o arquivo aberto, e ele detecta o encode do arquivo, utilizando a função detect da lib, quando executado, será retornado uma lista, portanto se acessarmos result[“encode”], conseguiremos recuperar essa informação facilmente.

result = chardet.detect(raw_data)
print(result['encoding'])

E por ultimo, faremos uma validação, para verificarmos se o arquivo esta com o encode esperado. Caso esteja, será retornado o Status de Sucesso para a classe principal, e caso contrário, será retornado um Status Erro, e qual o erro encontrado na validação. O nosso primeiro módulo ficou assim:


import chardet
from google.cloud import storage


class ModuleEncode:

@staticmethod
def check_encode(bucket_name: str, file_name: str, project: str, expected_encoding: str)-> dict:
file_equivalence = {
'utf-8': ['ascii', 'utf-8-sig'],
'utf-8-sig': ['ascii', 'utf-8']
}

storage_client = storage.Client(project=project)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)

raw_data = blob.open("rb").read()

result = chardet.detect(raw_data)

if result['encoding'].lower() == expected_encoding.lower() or (
expected_encoding.lower() in file_equivalence
and result['encoding'].lower() in file_equivalence[expected_encoding.lower()]
):
return {"Status": "Sucesso", "msg": []}
else:
return {"Status": "Erro", "msg": f"O arquivo não esta com o encode esperado ({expected_encoding}), encode do arquivo:{result['encoding']}"}

Vale lembrar que a biblioteca Chardet tem várias outras funções bem interessantes, que vale a pena visitar para conhecer. (Documentação Chardet).

Perfeito agora que o módulo foi criado, vamos construir um teste unitário para ver esse módulo funcionando ?

dentro da pasta raiz, vamos criar uma pasta de testes -> tests/src/dataquality_test.py

dentro da nossa classe de unittest, iremos criar vários testes durante esse post. Portanto para facilitar, vamos criar uma função chamada get_data que simplesmente irá retornar um dicionário com os parâmetros que usaremos em todas as classes. E em seguida criaremos o nosso primeiro teste de execução, e o chamaremos de test_encode_execution. Esse teste unitário, irá executar o módulo de encode, e validará se a resposta for igual a que esperamos (status : Sucesso). E de igual modo, faremos um teste unitário para testar a falha de execução, e o chamaremos de test_encode_execution_failed.

import unittest

class TestCase(unittest.TestCase):
@classmethod
def get_data(cls):
data = {
"bucket_project_id" : "ID_PROJETO",
"bucket_name": "ID_BUCKET",
"bucket_folder": "teste_dataquality",
"file_name": "arquivo_teste.csv",
"delimiter": ",",
"encoding": "UTF-8",

}

return data

def test_encode_execution(self):

params = self.get_data()
result = ModuleEncode.check_encode(params["bucket_name"], f'{params["bucket_folder"]}/{params["file_name"]}',
params["bucket_project_id"], params["encoding"])

expect = {'Status': "Sucesso", 'msg': ''}
self.assertEqual(expect["Status"], result["Status"])


def test_encode_execution_failed(self):

params = self.get_data()
params["encoding"] = "utf-error"

result = ModuleEncode.check_encode(params["bucket_name"], f'{params["bucket_folder"]}/{params["file_name"]}',
params["bucket_project_id"], params["encoding"])

expect = {'Status': "Erro", 'msg': ''}
self.assertEqual(expect["Status"], result["Status"])

if __name__ == '__main__':
unittest.main()

Pronto, com isso terminamos a nossa primeira classe de validação!

O segundo módulo que iremos construir será responsável por checar se o arquivo enviado possuí a quantidade de colunas esperado. Para isso utilizaremos o pandas, onde será passado por parâmetro um Data Frame, e a quantidade de colunas esperadas, e com isso, iremos checar se as quantidades de colunas estão idênticas, se sim, será retornado uma mensagem positiva, caso contrario, retornamos uma mensagem informando que as quantidades de colunas não estão como a esperada.

import pandas as pd


class ModuleSchema:

@staticmethod
def check_columns_quantity(df: pd.DataFrame, expected_csv_columns: int) -> dict:

if len(df.columns)==expected_csv_columns:
return {"Status": True, "msg": ""}
else:
return {"Status": False, "msg": f'A quantidade de colunas esperada é de {expected_csv_columns} e a quantidade recebida foi de {len(df.columns)}.'}

O próximo passo será criar um teste unitário para esse modulo também.

A função abaixo irá testar se a quantidade de colunas de um Data Frame se encontra como o esperado, e após uma função para testar a falha do mesmo.

    def test_column_quantity(self):

data = [ ["abc", "def", "ghi"], [ "mno", "pqr", "asd"]]
df = pd.DataFrame(data=data, columns=["coluna1", "coluna2" , "coluna3"] )
result = ModuleSchema.check_columns_quantity(df,3)
expect = {'Status': True, 'msg': ''}
self.assertEqual(expect, result)

def test_column_quantity_failed(self):

data = [ ["abc", "def", "ghi"], [ "mno", "pqr", "asd"]]
df = pd.DataFrame(data=data, columns=["coluna1", "coluna2" , "coluna3"] )
result = ModuleSchema.check_columns_quantity(df,4)
expect = {'Status': False, 'msg': ''}
self.assertEqual(expect["Status"], result["Status"])

Ao executar os testes unitários você deverá ver uma mensagem como essa, e caso receba algum diferente, pode ser que seus testes estejam falhando.

O terceiro módulo que iremos criar, terá um pouco mais de funcionalidades, dentro desse módulo é que criaremos funções como por exemplo, validar se todos os dados de uma coluna estão como inteiros, ou um formato de data específico, etc.

a primeira função que criaremos será a função de validar se todos os dados de uma coluna estão vindo como Inteiros

import pandas as pd

class ModuleFields():

def check_int_values(df: object, params: object) -> dict:
df_without_nan = pd.DataFrame()
df_without_nan[params['name']] = df[params['name']].dropna()
try:
df_without_nan[params['name']].astype(int)
return {"Status": True, "msg": ""}
except:
return {"Status": False, "msg": f"Tipagem incorreta para coluna {params['name']},Tipagem esperada:Integer"}

O primeiro passo dentro dessa função, será remover os registros nulos, pois eles não são importantes para nosso teste, e em seguida tentaremos converter a coluna para inteiro. Caso o pandas consiga converter todos os dados da coluna com sucesso, será retornado a mensagem de sucesso, e caso contrário, uma mensagem de tipagem incorreta para a coluna em questão.

A segunda função que iremos criar, irá validar valores do tipo Float, e faremos as mesmas transformações e validações que fizemos para o campo do tipo inteiro.

    def check_float_values(df: object, params: object) -> dict:

df_without_nan = pd.DataFrame()
df_without_nan[params['name']] = df[params['name']].dropna()
try:
df_without_nan[params['name']].astype(float)
return {"Status": True, "msg": ""}
except:
return {"Status": False, "msg": f"Tipagem incorreta para coluna {params['name']},Tipagem esperada:Float"}

A próxima função será responsável por validar datas, e para esse caso como sabemos, temos vários formatos de dados existentes, portanto, para resolver essa questão, será passado por parâmetro qual o formato de data que espera validar, e o pandas.to_datetime irá tentar converter esse dado com o formato esperado.

    def check_date_format(df:object, params:object)->dict:

try:
pd.to_datetime(df[params['name']], format=params["date_format"], errors='raise')
return {"Status": True, "msg":""}
except ValueError:
return {"Status": False, "msg": f"Formato incorreto para coluna {params['name']}, Formato esperado: {params['date_format']}"}

A próxima função irá validar um campo que não pode ter valores nulos, ele varrerá todos testando se são nulos ou não, quando nulo, será True, e caso contrário False, e com isso validamos no final, se houver algum registro nulo, retornamos a mensagem que há dados nulos na coluna, e caso contrário, será retornado Sucesso.

    def check_is_not_null(df:object, params:object)->dict:

result = df[params['name']].isnull()
if not True in result.values:
return {"Status": True, "msg":""}
else:
return {"Status": False, "msg": f"Coluna {params['name']} esta vazia."}

Agora que nossa classe esta basicamente pronta, bora criar nossos testes unitários?

Na mesma classe de teste que já criamos alguns testes, vamos adicionar dois testes para cada função, um que vai testar o sucesso e outro para testar a falha.

O primeiro teste será referente aos valores Inteiros, iremos criar um Data Frame com alguns dados inteiros, e nulos para testarmos, ao final esperamos que o resultado seja de Status: Sucesso.

E o segundo teste, será para testarmos a falha do mesmo, para isso passaremos um valor diferente de Inteiro para ele validar (“12a”), e esperamos que ele retorne Status: False.

def test_int_values(self):

params = {'name': 'coluna_int', 'type': 'integer'}
data = [[1234], ["1234"], [], [None], [154]]
df = pd.DataFrame(data=data, columns=["coluna_int"] )
result = ModuleFields.check_int_values(df,params)
expect = {'Status': True, 'msg': ''}
self.assertEqual(expect, result)


def test_int_values_failed(self):

params = {'name': 'coluna_int', 'type': 'integer'}
data = [["12.5a"],[202205], [], [202207],[]]
df = pd.DataFrame(data=data, columns=["coluna_int"] )
result = ModuleFields.check_int_values(df,params)
expect = {'Status': False, 'msg': ''}
self.assertEqual(expect["Status"], result["Status"])

Ao executar os testes, você deverá ver uma mensagem como essa:

Testes unitários para valores Float seguirão os mesmos testes dos valores inteiros, onde testaremos dados que rodarão com sucesso, e para falhar colocaremos um valor indevido com letras.

def test_float_values(self):

params = {'name': 'teste_numerico'}
data = [[12], [12.5],["4.5"], [], [4.7], [None]]
df = pd.DataFrame(data=data, columns=["teste_numerico"])
result = ModuleFields.check_float_values(df,params)
expect = {'Status': True, 'msg': ''}
self.assertEqual(expect, result)

def test_float_values_failed(self):

params = {'name': 'teste_numerico'}
data = [[12], [12.5],["asd"], [4.7]]
df = pd.DataFrame(data=data, columns=["teste_numerico"])
result = ModuleFields.check_float_values(df,params)
expect = {'Status': False, 'msg': ''}
self.assertEqual(expect["Status"], result["Status"])

Os próximos testes serão para a função de datas, para esse caso, também utilizaremos a mesma estratégia dos anteriores.

def test_date_yyyy_mm_dd(self):

params = {"name":"data_labs" "date_format":"%Y%m%d"}
data = [[20170213],[20170214], [20170213], [20170213],[20170213]]
df = pd.DataFrame(data=data, columns=["data_labs"] )
result = ModuleFields.check_date_format(df,params)
expect = {'Status': True, 'msg': ''}
self.assertEqual(expect, result)

def test_date_yyyy_mm_dd_failed(self):

params = {"name":"data_labs" "date_format":"%Y-%m-%d"}
data = [["10/11/2022"],["20/12/2014"]]
df = pd.DataFrame(data=data, columns=["data_labs"] )
result = ModuleFields.check_date_format(df,params)
expect = {'Status': False, 'msg': ''}
self.assertEqual(expect["Status"], result["Status"])

E por ultimo mas não menos importante, validaremos os dados nulos.

def test_value_is_not_null(self):

params = {'name': 'teste_numerico'}
data = [[12], [125],[123], [4.7]]
df = pd.DataFrame(data=data, columns=["teste_numerico"])
result = ModuleFields.check_is_not_null(df,params)
expect = {'Status': True, 'msg': ''}
self.assertEqual(expect["Status"], result["Status"])

def test_value_is_not_null_failed(self):
"""_summary_ test_int_values
Check if a column have only integer (don't consider null values)
"""
params = {'name': 'teste_numerico'}
data = [[12], [],[None], [4.7]]
df = pd.DataFrame(data=data, columns=["teste_numerico"])
result = ModuleFields.check_is_not_null(df,params)
expect = {'Status': False, 'msg': ''}
self.assertEqual(expect["Status"], result["Status"])

Então é isso pessoal, essa é a primeira versão dessa aplicação e já estamos construindo algumas melhorias, espero que essas funcionalidades possam trazer ganhos para vocês assim como tem nos ajudado.

Caso queira conversar mais sobre o assunto pode me chamar no Linkedin ;)

https://www.linkedin.com/in/lucasmisael/

--

--