Te enseño a crear un script en Python para limpiar una base de datos de E-Mails incorrectos o inválidos. Este script hace cuatro tipos de limpieza:
- Sintáctica: Comprueba si está mal escrito el correo electrónico
- Dominio: Comprueba con una consulta DNS si el dominio del correo existe
- MX Record: Verifica que el dominio sea de correo chequeando la entrada MX en las DNS de ese dominio
- SMTP: Verifica si la dirección de email existe consultándole directamente al dominio
Dificultad:
- Conocimientos previos: Saber ejecutar scripts en Python. No hace falta saber programar en Python.
- Tiempo de implementación: 10 min
- Nivel de dificultad: Media
Script de verificación y limpieza de emails
Crea un archivo llamado limpiar-emails.py, e introduce este código en él:
# -------------------------------------------------------------
# Autor: Daniel Pajuelo
# Web: https://danielpajuelo.com
# Versión: 1.0
# -------------------------------------------------------------
import re
import sys
import socket
import smtplib
import time
import dns.resolver
from openpyxl import load_workbook, Workbook
from email_validator import validate_email, EmailNotValidError
import argparse
def load_whitelist():
with open('whitelist.txt', 'r') as file:
return [line.strip() for line in file]
def is_valid_email(email):
try:
validate_email(email)
return True
except EmailNotValidError:
return False
def domain_exists(domain, whitelist):
domain = domain.strip().lower() # elimina espacios en blanco y convierte a minúsculas
if domain in (w.strip().lower() for w in whitelist): # elimina espacios en blanco y convierte a minúsculas
return True
try:
socket.gethostbyname(domain)
return True
except socket.gaierror:
return False
def mx_record_exists(domain, whitelist):
domain = domain.strip().lower() # elimina espacios en blanco y convierte a minúsculas
if domain in (w.strip().lower() for w in whitelist): # elimina espacios en blanco y convierte a minúsculas
return True
try:
records = dns.resolver.resolve(domain, 'MX')
return len(records) > 0
except:
return False
def smtp_verification(email, interval, verbose=False):
domain = email.split('@')[1]
try:
server = smtplib.SMTP()
server.set_debuglevel(1 if verbose else 0) # Set debug level to 1 if verbose is True
server.connect(domain) # Usar 'domain' en lugar de 'mx_record'
server.helo(server.local_hostname)
server.mail('')
code, message = server.rcpt(email)
server.quit()
if verbose:
print(f'SMTP response code: {code}, message: {message}')
return code == 250
except Exception as e:
if verbose:
print(f'Error: {e}')
return False
finally:
time.sleep(interval)
def get_last_column(sheet):
for row in sheet.iter_rows(min_row=1, max_row=1):
for cell in reversed(row):
if cell.value is not None:
return cell.column
def main(args):
whitelist = load_whitelist()
wb = load_workbook(args.input_file)
ws = wb.active
new_wb = Workbook()
new_ws = new_wb.active
for row in ws.iter_rows(min_row=1, max_row=1):
new_ws.append([cell.value for cell in row])
verification_choice = int(input('Ingrese 1 para Sintaxis, 2 para Dominio, 3 para Registro MX, o 4 para SMTP: '))
last_column = get_last_column(new_ws)
next_column = last_column + 1 if last_column else 1 # Si no hay ninguna columna con datos, empieza en la columna 1
header_value = ''
if verification_choice == 1:
header_value = 'Sintaxis'
elif verification_choice == 2:
header_value = 'Dominio'
elif verification_choice == 3:
header_value = 'MX'
elif verification_choice == 4:
header_value = 'SMTP'
new_ws.cell(row=1, column=next_column, value=header_value)
total_emails_checked = 0
total_errors = 0
try:
for row_idx, row in enumerate(ws.iter_rows(min_row=2), start=2):
email = row[0].value
domain = email.split('@')[1]
total_emails_checked += 1
row_data = [cell.value for cell in row]
if verification_choice == 1:
valid = is_valid_email(email)
row_data.append('SI' if valid else 'NO')
if not valid:
total_errors += 1
print(f'Error producido, {email} afectado, Recuento total de emails chequeados: {total_emails_checked}, Recuento total de fallos: {total_errors}')
elif verification_choice == 2:
is_domain_whitelisted = domain in whitelist
exists = is_domain_whitelisted or domain_exists(domain, whitelist)
row_data.append('SI' if exists else 'NO')
if not exists:
total_errors += 1
print(f'Error producido, {email} afectado, Recuento total de emails chequeados: {total_emails_checked}, Recuento total de fallos: {total_errors}')
elif verification_choice == 3:
is_domain_whitelisted = domain in whitelist # Verificar si el dominio está en la lista blanca
mx_exists = is_domain_whitelisted or mx_record_exists(domain, whitelist) # si está en la lista blanca, se considera que existe un registro MX
row_data.append('SI' if mx_exists else 'NO')
if not mx_exists:
total_errors += 1
print(f'Error producido, {email} afectado, Total de emails chequeados: {total_emails_checked}, Total de fallos: {total_errors}')
elif verification_choice == 4:
verbose_response = input("¿Desea un reporte detallado del proceso SMTP? (si/no): ")
verbose = verbose_response.lower() == 'si'
warning_response = input("La verificación SMTP puede resultar en una IP baneada como spammer. ¿Desea continuar? (si/no): ")
if warning_response.lower() == 'si':
interval = int(input('Ingrese el tiempo de espera en segundos entre cada petición SMTP: '))
smtp_valid = smtp_verification(email, interval, verbose) # Pass verbose to smtp_verification
row_data.append('SI' if smtp_valid else 'NO')
if not smtp_valid:
total_errors += 1
print(f'Error producido, {email} afectado, Recuento total de emails chequeados: {total_emails_checked}, Recuento total de fallos: {total_errors}')
new_ws.append(row_data)
except KeyboardInterrupt:
print("\nInterrupción detectada. Guardando el archivo antes de salir...")
finally:
new_wb.save(args.output_file)
print("Archivo guardado. Saliendo del programa.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Script de verificación de email.')
parser.add_argument('input_file', help='Nombre del archivo de entrada.')
parser.add_argument('output_file', help='Nombre del archivo de salida.')
args = parser.parse_args()
if not args.input_file or not args.output_file:
print('Uso: python script.py <nombre_del_archivo_de_entrada> <nombre_del_archivo_de_salida>')
sys.exit(1)
main(args)
Cómo ejecutar este script en un entorno cerrado
Crear un entorno virtual en la carpeta donde tienes tu script:
python -m venv myenv
Activar el entorno virtual:
myenv\Scripts\activate
Instalar las bibliotecas necesarias en el entorno virtual:
pip install openpyxl dnspython validate_email py3dns email-validator openpyxl
Ejecutar el script:
python limpiar-emails.py
Qué hace este script
A través de la línea de comandos le pasas el archivo en Excel donde tengas los emails, el script considera que están en la segunda columna, y también le pasas el nombre del fichero Excel que quieres que cree con la misma base de datos pero una nueva columna indicando si cada email ha pasado o no la verificación.
A continuación te explico el código en detalle para que puedas hacer tus propias modificaciones:
- Importación de Bibliotecas y Módulos:
- Se importan varios módulos y bibliotecas necesarios para ejecutar diferentes partes del script como
re
,sys
,socket
,smtplib
,time
,dns.resolver
,openpyxl
yemail_validator
.
- Se importan varios módulos y bibliotecas necesarios para ejecutar diferentes partes del script como
- Funciones Auxiliares:
load_whitelist
: Carga una lista de dominios aprobados desde un archivo llamado ‘whitelist.txt’.is_valid_email
: Verifica si un email es válido en términos de sintaxis usando la bibliotecaemail_validator
.domain_exists
: Chequea si un dominio existe haciendo una consulta DNS.mx_record_exists
: Verifica si existe un registro MX en el dominio.smtp_verification
: Realiza una verificación SMTP de una dirección de correo electrónico, lo que incluye conectarse a un servidor SMTP y tratar de validar la dirección de correo electrónico.get_last_column
: Obtiene el índice de la última columna con datos en una hoja de cálculo.
- Función Principal (
main
):- Carga una hoja de cálculo existente y crea una nueva hoja de cálculo.
- Pide al usuario que elija un tipo de verificación (sintaxis, dominio, registro MX, SMTP).
- Itera sobre las filas de la hoja de cálculo de entrada, realiza la verificación seleccionada y agrega los resultados a la nueva hoja de cálculo.
- Guarda la nueva hoja de cálculo en un archivo de salida.
- Ejecución del Script:
- Se define un argument parser para tomar los nombres de los archivos de entrada y salida desde la línea de comando.
- Se verifica que los argumentos de entrada y salida estén presentes, y se llama a la función principal.
Nota sobre la verificación SMTP
No recomiendo usar la verificación de SMTP con grandes cantidades de emails, ya que los servidores de correo que consultamos podrían considerarnos atacantes o spammers al realizar tantas peticiones. Usa esta funcionalidad con moderación.