Cómo limpiar una base de datos de emails inválidos

Te enseño a crear un script en Python para limpiar una base de datos de E-Mails incorrectos o inválidos. Este script hace cuatro tipos de limpieza:

  • Sintáctica: Comprueba si está mal escrito el correo electrónico
  • Dominio: Comprueba con una consulta DNS si el dominio del correo existe
  • MX Record: Verifica que el dominio sea de correo chequeando la entrada MX en las DNS de ese dominio
  • SMTP: Verifica si la dirección de email existe consultándole directamente al dominio

Dificultad:

  • Conocimientos previos: Saber ejecutar scripts en Python. No hace falta saber programar en Python.
  • Tiempo de implementación: 10 min
  • Nivel de dificultad: Media

Script de verificación y limpieza de emails

Crea un archivo llamado limpiar-emails.py, e introduce este código en él:

# -------------------------------------------------------------
# Autor: Daniel Pajuelo
# Web: https://danielpajuelo.com
# Versión: 1.0
# -------------------------------------------------------------

import re
import sys
import socket
import smtplib
import time
import dns.resolver
from openpyxl import load_workbook, Workbook
from email_validator import validate_email, EmailNotValidError
import argparse

def load_whitelist():
    with open('whitelist.txt', 'r') as file:
        return [line.strip() for line in file]

def is_valid_email(email):
    try:
        validate_email(email)
        return True
    except EmailNotValidError:
        return False

def domain_exists(domain, whitelist):
    domain = domain.strip().lower()  # elimina espacios en blanco y convierte a minúsculas
    if domain in (w.strip().lower() for w in whitelist):  # elimina espacios en blanco y convierte a minúsculas
        return True
    try:
        socket.gethostbyname(domain)
        return True
    except socket.gaierror:
        return False

def mx_record_exists(domain, whitelist):
    domain = domain.strip().lower()  # elimina espacios en blanco y convierte a minúsculas
    if domain in (w.strip().lower() for w in whitelist):  # elimina espacios en blanco y convierte a minúsculas
        return True
    try:
        records = dns.resolver.resolve(domain, 'MX')
        return len(records) > 0
    except:
        return False

def smtp_verification(email, interval, verbose=False):
    domain = email.split('@')[1]
    try:
        server = smtplib.SMTP()
        server.set_debuglevel(1 if verbose else 0)  # Set debug level to 1 if verbose is True
        server.connect(domain)  # Usar 'domain' en lugar de 'mx_record'
        server.helo(server.local_hostname)
        server.mail('')
        code, message = server.rcpt(email)
        server.quit()
        if verbose:
            print(f'SMTP response code: {code}, message: {message}')
        return code == 250
    except Exception as e:
        if verbose:
            print(f'Error: {e}')
        return False
    finally:
        time.sleep(interval)


def get_last_column(sheet):
    for row in sheet.iter_rows(min_row=1, max_row=1):
        for cell in reversed(row):
            if cell.value is not None:
                return cell.column        

def main(args):
    whitelist = load_whitelist()
    wb = load_workbook(args.input_file)
    ws = wb.active
    new_wb = Workbook()
    new_ws = new_wb.active
    for row in ws.iter_rows(min_row=1, max_row=1):
        new_ws.append([cell.value for cell in row])

    verification_choice = int(input('Ingrese 1 para Sintaxis, 2 para Dominio, 3 para Registro MX, o 4 para SMTP: '))

    last_column = get_last_column(new_ws)
    next_column = last_column + 1 if last_column else 1  # Si no hay ninguna columna con datos, empieza en la columna 1

    header_value = ''
    if verification_choice == 1:
        header_value = 'Sintaxis'
    elif verification_choice == 2:
        header_value = 'Dominio'
    elif verification_choice == 3:
        header_value = 'MX'
    elif verification_choice == 4:
        header_value = 'SMTP'

    new_ws.cell(row=1, column=next_column, value=header_value)

    total_emails_checked = 0
    total_errors = 0

    try:
        for row_idx, row in enumerate(ws.iter_rows(min_row=2), start=2):
            email = row[0].value
            domain = email.split('@')[1]
            total_emails_checked += 1
            row_data = [cell.value for cell in row]

            if verification_choice == 1:
                valid = is_valid_email(email)
                row_data.append('SI' if valid else 'NO')
                if not valid:
                    total_errors += 1
                    print(f'Error producido, {email} afectado, Recuento total de emails chequeados: {total_emails_checked}, Recuento total de fallos: {total_errors}')

            elif verification_choice == 2:
                is_domain_whitelisted = domain in whitelist
                exists = is_domain_whitelisted or domain_exists(domain, whitelist)
                row_data.append('SI' if exists else 'NO')
                if not exists:
                    total_errors += 1
                    print(f'Error producido, {email} afectado, Recuento total de emails chequeados: {total_emails_checked}, Recuento total de fallos: {total_errors}')

            elif verification_choice == 3:
                is_domain_whitelisted = domain in whitelist  # Verificar si el dominio está en la lista blanca
                mx_exists = is_domain_whitelisted or mx_record_exists(domain, whitelist)  # si está en la lista blanca, se considera que existe un registro MX
                row_data.append('SI' if mx_exists else 'NO')
                if not mx_exists:
                    total_errors += 1
                    print(f'Error producido, {email} afectado, Total de emails chequeados: {total_emails_checked}, Total de fallos: {total_errors}')

            elif verification_choice == 4:
                verbose_response = input("¿Desea un reporte detallado del proceso SMTP? (si/no): ")
                verbose = verbose_response.lower() == 'si'
                warning_response = input("La verificación SMTP puede resultar en una IP baneada como spammer. ¿Desea continuar? (si/no): ")
                if warning_response.lower() == 'si':
                    interval = int(input('Ingrese el tiempo de espera en segundos entre cada petición SMTP: '))
                    smtp_valid = smtp_verification(email, interval, verbose)  # Pass verbose to smtp_verification
                    row_data.append('SI' if smtp_valid else 'NO')
                    if not smtp_valid:
                        total_errors += 1
                        print(f'Error producido, {email} afectado, Recuento total de emails chequeados: {total_emails_checked}, Recuento total de fallos: {total_errors}')

            new_ws.append(row_data)

    except KeyboardInterrupt:
        print("\nInterrupción detectada. Guardando el archivo antes de salir...")

    finally:
        new_wb.save(args.output_file)
        print("Archivo guardado. Saliendo del programa.")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Script de verificación de email.')
    parser.add_argument('input_file', help='Nombre del archivo de entrada.')
    parser.add_argument('output_file', help='Nombre del archivo de salida.')
    args = parser.parse_args()

    if not args.input_file or not args.output_file:
        print('Uso: python script.py <nombre_del_archivo_de_entrada> <nombre_del_archivo_de_salida>')
        sys.exit(1)

    main(args)

Cómo ejecutar este script en un entorno cerrado

Crear un entorno virtual en la carpeta donde tienes tu script:

python -m venv myenv

Activar el entorno virtual:

myenv\Scripts\activate

Instalar las bibliotecas necesarias en el entorno virtual:

pip install openpyxl dnspython validate_email py3dns email-validator openpyxl

Ejecutar el script:

python limpiar-emails.py

Qué hace este script

A través de la línea de comandos le pasas el archivo en Excel donde tengas los emails, el script considera que están en la segunda columna, y también le pasas el nombre del fichero Excel que quieres que cree con la misma base de datos pero una nueva columna indicando si cada email ha pasado o no la verificación.

A continuación te explico el código en detalle para que puedas hacer tus propias modificaciones:

  1. Importación de Bibliotecas y Módulos:
    • Se importan varios módulos y bibliotecas necesarios para ejecutar diferentes partes del script como re, sys, socket, smtplib, time, dns.resolver, openpyxl y email_validator.
  2. Funciones Auxiliares:
    • load_whitelist: Carga una lista de dominios aprobados desde un archivo llamado ‘whitelist.txt’.
    • is_valid_email: Verifica si un email es válido en términos de sintaxis usando la biblioteca email_validator.
    • domain_exists: Chequea si un dominio existe haciendo una consulta DNS.
    • mx_record_exists: Verifica si existe un registro MX en el dominio.
    • smtp_verification: Realiza una verificación SMTP de una dirección de correo electrónico, lo que incluye conectarse a un servidor SMTP y tratar de validar la dirección de correo electrónico.
    • get_last_column: Obtiene el índice de la última columna con datos en una hoja de cálculo.
  3. Función Principal (main):
    • Carga una hoja de cálculo existente y crea una nueva hoja de cálculo.
    • Pide al usuario que elija un tipo de verificación (sintaxis, dominio, registro MX, SMTP).
    • Itera sobre las filas de la hoja de cálculo de entrada, realiza la verificación seleccionada y agrega los resultados a la nueva hoja de cálculo.
    • Guarda la nueva hoja de cálculo en un archivo de salida.
  4. Ejecución del Script:
    • Se define un argument parser para tomar los nombres de los archivos de entrada y salida desde la línea de comando.
    • Se verifica que los argumentos de entrada y salida estén presentes, y se llama a la función principal.

Nota sobre la verificación SMTP

No recomiendo usar la verificación de SMTP con grandes cantidades de emails, ya que los servidores de correo que consultamos podrían considerarnos atacantes o spammers al realizar tantas peticiones. Usa esta funcionalidad con moderación.

Daniel Pajuelo
Daniel Pajuelo es ingeniero informático y SEO Senior, actualmente trabajando en Guruwalk e impartiendo clases en BIG School (antes BIGSEO Academy). Ver más

Continua leyendo

Herramienta para limpiar de emails peligrosos las listas de correo

Siguiendo estos pasos aprenderás a programar una herramienta para limpiar tu base de datos de direcciones de correo electrónico peligrosas e indeseadas.
Script gratuito para extraer emails de un fichero

Script en Python para extraer emails de un archivo de texto

En esta publicación aprenderás a extraer el listado de emails de un fichero.