
Python lleva el “keyword research” a la era de la ciencia de datos: hoy podemos convertir miles de consultas en puntos de un espacio vectorial, agruparlas por similitud semántica y etiquetarlas según la intención de búsqueda con apenas unas líneas de código. El núcleo es sencillo: obtenemos embeddings (BERT, Word2Vec, OpenAI, etc.), aplicamos un algoritmo de clustering (K-Means, HDBSCAN, BERTopic…) o un clasificador supervisado, y el resultado son clústeres temáticos listos para diseñar silos de contenido o mapas de intención.
1. De palabras sueltas a vectores semánticos
Las redes de lenguaje modernas (BERT, RoBERTa, Universal Sentence Encoder…) generan vectores que capturan la cercanía conceptual entre frases; la librería sentence-transformers facilita ese paso en Python con una única llamada.
Una vez vectorizadas, técnicas de clustering como K-Means (muy rápida y disponible en scikit-learn) permiten detectar grupos coherentes de keywords. Artículos de la comunidad SEO demuestran cómo BERT + clustering descubre agrupaciones más naturales que la coincidencia de prefijos clásica.
Para grandes volúmenes, los embeddings comerciales de OpenAI son otra opción (0,13 $/M tokens para text-embedding-3-large, mayo 2025) .
Cuando el objetivo es clasificar intención de búsqueda, basta entrenar un modelo supervisado (Logistic Regression, SVM, e incluso BERT afinado) sobre ejemplos etiquetados como “informacional”, “navegacional” o “transaccional”.
2. Prompt #1 — Clustering de keywords con embeddings + K-MeansPrompt curado
Prompt:
Eres un **Ingeniero SEO** experto en procesamiento de lenguaje natural y análisis de datos. Tu tarea es generar un **script Python de un solo fichero** que haga lo siguiente:
1. **Lectura de datos**
- Abra un archivo `keywords.csv` que contenga una columna `kw` con una lista de keywords.
- Valide la existencia del fichero y lance un error amigable si no existe o está mal formateado.
2. **Cálculo de embeddings**
- Utilice el modelo `all-mpnet-base-v2` de `sentence-transformers`.
- Procese las keywords en lote para optimizar memoria y tiempos.
- Maneje posibles errores de conexión o carga del modelo.
3. **Clustering**
- Aplique **K-Means** (de `scikit-learn`) sobre los embeddings.
- Permita configurar el número de clusters (`n_clusters`) vía argumento de línea de comandos, con un valor por defecto razonable.
- Nombre cada cluster usando la keyword más frecuente dentro de ese grupo (o, en caso de empate, la de menor longitud).
4. **Exportación de resultados**
- Genere un CSV `clusters.csv` con las columnas:
- `kw` (keyword original)
- `cluster` (índice numérico del cluster)
- `label` (keyword que nombra al cluster)
- Aplique el formateo correcto y codificación UTF-8.
5. **Buenas prácticas**
- Incluya:
- **Shebang** y encoding al inicio.
- **Docstrings** y comentarios claros en funciones y clases.
- **Type hints** en todas las funciones.
- **Bloque `if __name__ == "__main__":`** para ejecución directa.
- **Manejo de excepciones** con mensajes útiles.
- **Logging** configurable en nivel INFO/DEBUG.
- Un **argument parser** para parámetros:
- `--input` (ruta a keywords.csv)
- `--output` (ruta a clusters.csv)
- `--n_clusters` (número de clusters)
- `--batch_size` (tamaño de lote para embeddings)
- Optimice memoria y rendimiento evitando recargas innecesarias.
6. **Salida esperada**
- Al ejecutarlo, el script imprimirá en consola un resumen:
- Número de keywords leídas.
- Número de clusters creados.
- Tiempo total de ejecución.
Genera el código completo en un único fichero `.py`, siguiendo este prompt al pie de la letra, listo para usarse en producción.```
Script:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script para leer keywords desde un CSV, calcular sus embeddings con
sentence-transformers, agruparlas con K-Means y exportar los resultados.
Uso:
python cluster_keywords.py \
--input keywords.csv \
--output clusters.csv \
--n_clusters 5 \
--batch_size 64 \
[--log_level DEBUG]
"""
import os
import sys
import time
import logging
import argparse
from typing import List, Tuple, Dict
import pandas as pd
from sentence_transformers import SentenceTransformer, LoggingHandler
from sklearn.cluster import KMeans
from collections import Counter
def load_keywords(input_path: str) -> List[str]:
"""
Carga keywords desde un archivo CSV que tenga columna 'kw'.
Args:
input_path: Ruta al archivo CSV de entrada.
Returns:
Lista de keywords como strings.
Raises:
FileNotFoundError: Si el archivo no existe.
ValueError: Si el CSV está mal formateado o no contiene la columna 'kw'.
"""
if not os.path.isfile(input_path):
raise FileNotFoundError(f"El fichero '{input_path}' no existe.")
try:
df = pd.read_csv(input_path, usecols=['kw'])
except Exception as e:
raise ValueError(f"Error leyendo '{input_path}': {e}")
return df['kw'].astype(str).tolist()
def compute_embeddings(keywords: List[str],
model_name: str,
batch_size: int,
logger: logging.Logger) -> List[List[float]]:
"""
Calcula embeddings para una lista de keywords usando SentenceTransformer.
Args:
keywords: Lista de strings a embedir.
model_name: Nombre del modelo de sentence-transformers.
batch_size: Tamaño de lote para procesamiento.
logger: Logger configurado.
Returns:
Lista de embeddings (listas de floats).
"""
try:
logger.info(f"Cargando modelo '{model_name}'")
model = SentenceTransformer(model_name)
except Exception as e:
logger.error(f"No se pudo cargar el modelo '{model_name}': {e}")
sys.exit(1)
embeddings = []
total = len(keywords)
for start in range(0, total, batch_size):
end = min(start + batch_size, total)
batch = keywords[start:end]
logger.debug(f"Procesando batch {start}:{end}")
try:
emb = model.encode(batch, show_progress_bar=False)
except Exception as e:
logger.error(f"Error al calcular embeddings: {e}")
sys.exit(1)
embeddings.extend(emb.tolist() if hasattr(emb, 'tolist') else emb)
return embeddings # type: ignore
def cluster_embeddings(embeddings: List[List[float]],
n_clusters: int,
logger: logging.Logger) -> List[int]:
"""
Aplica K-Means a los embeddings.
Args:
embeddings: Lista de vectores de embedding.
n_clusters: Número de clusters a generar.
logger: Logger configurado.
Returns:
Lista de etiquetas de cluster para cada embedding.
"""
logger.info(f"Iniciando K-Means con {n_clusters} clusters")
try:
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
labels = kmeans.fit_predict(embeddings)
except Exception as e:
logger.error(f"Error durante clustering: {e}")
sys.exit(1)
return labels.tolist()
def name_clusters(keywords: List[str],
labels: List[int],
logger: logging.Logger) -> Dict[int, str]:
"""
Nombra cada cluster usando la keyword más frecuente o, en caso de empate,
la de menor longitud.
Args:
keywords: Lista original de keywords.
labels: Etiqueta de cluster para cada keyword.
logger: Logger configurado.
Returns:
Diccionario mapping cluster -> nombre (keyword).
"""
cluster_map: Dict[int, List[str]] = {}
for kw, lbl in zip(keywords, labels):
cluster_map.setdefault(lbl, []).append(kw)
name_map: Dict[int, str] = {}
for lbl, kws in cluster_map.items():
counter = Counter(kws)
most_common = counter.most_common()
max_freq = most_common[0][1]
# Filtrar por frecuencia máxima
candidates = [kw for kw, cnt in most_common if cnt == max_freq]
# Elegir la de menor longitud
label_kw = min(candidates, key=len)
name_map[lbl] = label_kw
logger.debug(f"Cluster {lbl}: label='{label_kw}' (freq={max_freq})")
return name_map
def export_results(keywords: List[str],
labels: List[int],
name_map: Dict[int, str],
output_path: str,
logger: logging.Logger) -> None:
"""
Exporta los resultados a un CSV con columnas 'kw', 'cluster', 'label'.
Args:
keywords: Lista de keywords originales.
labels: Etiquetas de cluster para cada keyword.
name_map: Mapeo cluster -> nombre de cluster.
output_path: Ruta de salida para el CSV.
logger: Logger configurado.
"""
logger.info(f"Exportando resultados a '{output_path}'")
df_out = pd.DataFrame({
'kw': keywords,
'cluster': labels,
'label': [name_map[lbl] for lbl in labels]
})
try:
df_out.to_csv(output_path, index=False, encoding='utf-8')
except Exception as e:
logger.error(f"Error escribiendo '{output_path}': {e}")
sys.exit(1)
def parse_args() -> argparse.Namespace:
"""
Define y parsea los argumentos de línea de comandos.
Returns:
Namespace con los argumentos.
"""
parser = argparse.ArgumentParser(
description="Clustering de keywords usando embeddings y K-Means"
)
parser.add_argument(
"--input", "-i", type=str, default="keywords.csv",
help="Ruta al CSV de entrada con columna 'kw'"
)
parser.add_argument(
"--output", "-o", type=str, default="clusters.csv",
help="Ruta al CSV de salida con clusters"
)
parser.add_argument(
"--n_clusters", "-k", type=int, default=5,
help="Número de clusters para K-Means"
)
parser.add_argument(
"--batch_size", "-b", type=int, default=64,
help="Tamaño de lote para cálculo de embeddings"
)
parser.add_argument(
"--log_level", "-l", type=str, default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Nivel de logging"
)
return parser.parse_args()
def main() -> None:
"""
Función principal: orquesta la carga, cálculo de embeddings,
clustering y exportación de resultados.
"""
args = parse_args()
# Configurar logging
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(), LoggingHandler()]
)
logger = logging.getLogger(__name__)
start_time = time.time()
try:
keywords = load_keywords(args.input)
except Exception as e:
logger.error(e)
sys.exit(1)
num_keywords = len(keywords)
logger.info(f"Leídas {num_keywords} keywords")
embeddings = compute_embeddings(
keywords,
model_name="all-mpnet-base-v2",
batch_size=args.batch_size,
logger=logger
)
labels = cluster_embeddings(
embeddings,
n_clusters=args.n_clusters,
logger=logger
)
name_map = name_clusters(keywords, labels, logger)
export_results(keywords, labels, name_map, args.output, logger)
elapsed = time.time() - start_time
print("\nResumen de ejecución:")
print(f"- Keywords leídas: {num_keywords}")
print(f"- Clusters creados: {args.n_clusters}")
print(f"- Tiempo total: {elapsed:.2f} segundos")
if __name__ == "__main__":
main()
Por qué funciona
- all-mpnet-base-v2 obtiene > 0,80 Spearman en STS Benchmark, ideal para similitud corta.
- K-Means es muy rápido y escalable; repetir inicializaciones (n_init) evita mínimos locales.
- Nombrar clústeres con las 2 palabras más frecuentes genera etiquetas comprensibles para el equipo editorial.
3. Prompt #2 — Clasificación de intención con Logistic RegressionPrompt curado
Prompt:
**Rol:** Data Scientist / Analista de NLP con experiencia en machine learning.
**Objetivo:** Generar un único script Python (`script_intencion.py`) que:
1. **Carga y validación de datos**
- Lea `train.csv` (columnas: `kw`, `label`) y `keywords.csv` (columna: `kw`).
- Compruebe la existencia de los archivos y valide que no estén vacíos.
2. **Preprocesamiento**
- Limpie y normalice las cadenas de texto (minúsculas, eliminación de signos, tokens especiales).
- Transforme el texto a características numéricas usando `TfidfVectorizer` (o `CountVectorizer`).
3. **Construcción de pipeline y entrenamiento**
- Defina un pipeline de sklearn que incluya vectorización y `LogisticRegression` multiclas.
- Divida datos en entrenamiento y prueba (por ejemplo, 80/20) usando `train_test_split` con semilla fija.
4. **Ajuste de hiperparámetros**
- Emplee `GridSearchCV` (o `RandomizedSearchCV`) para optimizar al menos el parámetro `C` de la regresión.
- Use validación cruzada (por ejemplo, CV=5) y fije `random_state` para reproducibilidad.
5. **Evaluación**
- Calcule y muestre métricas clave: Accuracy, Precision, Recall, F1-score y matriz de confusión.
- Genere un reporte (por consola o archivo de texto).
6. **Guardado del modelo entrenado**
- Serialice el pipeline final a disco (por ejemplo, `joblib.dump(..., "model.joblib")`).
7. **Predicción de nuevas keywords**
- Cargue `keywords.csv`, aplique el pipeline entrenado y genere un DataFrame con columnas `kw` y `predicted_label`.
- Exporte a `intent.csv`.
8. **Estructura y robustez**
- Organice el código en funciones con docstrings estilo Google.
- Use `argparse` para parametrizar rutas de entrada/salida.
- Incorpore el módulo `logging` para registro de eventos e información de debug.
- Añada manejo de excepciones donde corresponda.
9. **Reproducibilidad y estilo**
- Fije `random_state` en todas las operaciones aleatorias.
- Aplique buenas prácticas de código (PEP8, control de versiones de dependencias).
- Termine con un bloque:
```python
if __name__ == "__main__":
main()
Script:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
script_intencion.py
Carga datos de entrenamiento, entrena un modelo de intención basado en LogisticRegression,
ajusta hiperparámetros con GridSearchCV, evalúa el modelo, lo guarda en disco y
predice nuevas keywords.
"""
import os
import re
import sys
import argparse
import logging
import pandas as pd
import joblib
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
classification_report, confusion_matrix
)
# Constante para reproducibilidad
RANDOM_STATE = 42
def setup_logging(level: str = "INFO"):
"""Configura el módulo de logging."""
numeric_level = getattr(logging, level.upper(), None)
logging.basicConfig(
level=numeric_level,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def load_data(file_path: str) -> pd.DataFrame:
"""
Carga y valida un CSV.
Args:
file_path (str): Ruta al archivo CSV.
Returns:
pd.DataFrame: Datos cargados.
Raises:
FileNotFoundError: Si el archivo no existe.
ValueError: Si el archivo está vacío.
"""
if not os.path.exists(file_path):
logging.error(f"El archivo no existe: {file_path}")
raise FileNotFoundError(f"No se encontró: {file_path}")
df = pd.read_csv(file_path)
if df.empty:
logging.error(f"El archivo está vacío: {file_path}")
raise ValueError(f"El archivo está vacío: {file_path}")
logging.info(f"Datos cargados correctamente desde: {file_path} (filas={len(df)})")
return df
def clean_text(text: str) -> str:
"""
Limpia y normaliza una cadena de texto.
- Pasa a minúsculas.
- Elimina signos de puntuación y caracteres no alfanuméricos.
Args:
text (str): Texto original.
Returns:
str: Texto limpio.
"""
text = text.lower()
# eliminar todo lo que no sea letra, número o espacio
text = re.sub(r'[^a-z0-9áéíóúüñ ]+', ' ', text)
# normalizar espacios
text = re.sub(r'\s+', ' ', text).strip()
return text
def train_model(
X: pd.Series, y: pd.Series
) -> GridSearchCV:
"""
Construye pipeline con TfidfVectorizer + LogisticRegression y ajusta hiperparámetros.
Args:
X (pd.Series): Serie de textos.
y (pd.Series): Serie de etiquetas.
Returns:
GridSearchCV: Objeto ajustado con el mejor estimador.
"""
pipeline = Pipeline([
("vectorizer", TfidfVectorizer(preprocessor=clean_text)),
("clf", LogisticRegression(multi_class="multinomial", solver="lbfgs",
random_state=RANDOM_STATE, max_iter=1000)),
])
param_grid = {
"clf__C": [0.01, 0.1, 1, 10, 100],
}
grid = GridSearchCV(
estimator=pipeline,
param_grid=param_grid,
cv=5,
n_jobs=-1,
verbose=1,
)
grid.fit(X, y)
logging.info(f"Mejor parámetro C: {grid.best_params_['clf__C']}")
return grid
def evaluate_model(
model, X_test: pd.Series, y_test: pd.Series, report_path: str = None
):
"""
Evalúa el modelo calculando métricas y matriz de confusión.
Args:
model: Estimador entrenado.
X_test (pd.Series): Datos de prueba.
y_test (pd.Series): Etiquetas verdaderas.
report_path (str, opcional): Ruta para guardar reporte de texto. Si no, solo imprime.
"""
y_pred = model.predict(X_test)
acc = accuracy_score(y_test, y_pred)
prec = precision_score(y_test, y_pred, average="macro", zero_division=0)
rec = recall_score(y_test, y_pred, average="macro", zero_division=0)
f1 = f1_score(y_test, y_pred, average="macro", zero_division=0)
cm = confusion_matrix(y_test, y_pred)
report = (
f"Accuracy: {acc:.4f}\n"
f"Precision: {prec:.4f}\n"
f"Recall: {rec:.4f}\n"
f"F1-score: {f1:.4f}\n\n"
"Classification Report:\n"
f"{classification_report(y_test, y_pred, zero_division=0)}\n"
"Matriz de confusión:\n"
f"{cm}\n"
)
if report_path:
with open(report_path, "w", encoding="utf-8") as f:
f.write(report)
logging.info(f"Reporte de evaluación guardado en: {report_path}")
else:
print(report)
def save_model(model, output_path: str):
"""
Serializa el pipeline entrenado a disco.
Args:
model: Estimador entrenado.
output_path (str): Ruta de salida (.joblib).
"""
joblib.dump(model, output_path)
logging.info(f"Modelo guardado en: {output_path}")
def predict_keywords(
model, keywords_df: pd.DataFrame, output_path: str
):
"""
Aplica el modelo a nuevas keywords y guarda resultados.
Args:
model: Estimador entrenado.
keywords_df (pd.DataFrame): DataFrame con columna 'kw'.
output_path (str): Ruta de salida CSV.
"""
if "kw" not in keywords_df.columns:
logging.error("keywords.csv debe contener columna 'kw'")
raise ValueError("Columna 'kw' no encontrada en keywords.csv")
kws = keywords_df["kw"].astype(str)
preds = model.predict(kws)
out_df = pd.DataFrame({"kw": kws, "predicted_label": preds})
out_df.to_csv(output_path, index=False, encoding="utf-8-sig")
logging.info(f"Predicciones guardadas en: {output_path}")
def parse_args():
"""Define y parsea los argumentos de línea de comandos."""
parser = argparse.ArgumentParser(
description="Entrena y aplica un modelo de intención sobre keywords."
)
parser.add_argument(
"--train_path", type=str, default="train.csv",
help="Ruta al CSV con datos de entrenamiento (kw,label)."
)
parser.add_argument(
"--keywords_path", type=str, default="keywords.csv",
help="Ruta al CSV con nuevas keywords (kw)."
)
parser.add_argument(
"--model_output", type=str, default="model.joblib",
help="Ruta donde guardar el modelo entrenado."
)
parser.add_argument(
"--intent_output", type=str, default="intent.csv",
help="Ruta donde guardar las predicciones."
)
parser.add_argument(
"--report_output", type=str, default=None,
help="Ruta para salvar reporte de evaluación (opcional)."
)
parser.add_argument(
"--log_level", type=str, default="INFO",
help="Nivel de logging: DEBUG, INFO, WARNING, ERROR, CRITICAL."
)
return parser.parse_args()
def main():
"""Función principal."""
args = parse_args()
setup_logging(args.log_level)
try:
df_train = load_data(args.train_path)
df_keywords = load_data(args.keywords_path)
except (FileNotFoundError, ValueError) as e:
logging.critical(f"Error al cargar datos: {e}")
sys.exit(1)
# Separar X, y
X = df_train["kw"].astype(str)
y = df_train["label"]
# Split train/test
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y
)
# Entrenamiento y ajuste de hiperparámetros
grid = train_model(X_train, y_train)
# Evaluación
evaluate_model(grid.best_estimator_, X_test, y_test, args.report_output)
# Guardar modelo
save_model(grid.best_estimator_, args.model_output)
# Predicción sobre nuevas keywords
predict_keywords(grid.best_estimator_, df_keywords, args.intent_output)
if __name__ == "__main__":
main()
Instrucciones de uso:
- Asegúrate de tener instaladas las dependencias mínimas:
pip install pandas scikit-learn joblib
- Coloca
train.csv
(con columnaskw,label
) ykeywords.csv
(columnakw
) en el mismo directorio, o indícalo con--train_path
y--keywords_path
. - Ejecuta:
python script_intencion.py --train_path=train.csv --keywords_path=keywords.csv \ --model_output=model.joblib --intent_output=intent.csv --report_output=report.txt
- Obtendrás:
model.joblib
: pipeline serializado.intent.csv
: predicciones de las nuevas keywords.report.txt
(si especificas): métricas y matriz de confusión.
Buenas prácticas
- El TF-IDF bigrama captura frases tipo “comprar + producto” claves para intención transaccional .
- Logistic Regression es rápida y alcanza resultados competitivos con buen feature engineering .
- Para mayor precisión se puede afinar un modelo BERT en Hugging Face siguiendo guías de intent classification .
4. Tabla comparativa de enfoques y costes
Modelo / Algoritmo | Tipo | Complejidad | Coste aprox.* | Pros | Contras |
---|---|---|---|---|---|
SBERT + K-Means | OSS local | Media | Gratis | Embeddings SOTA, muy rápido | Elige k manualmente |
SBERT + HDBSCAN | OSS local | Media | Gratis | Detecta densidades, k automático | Parámetros sensibles |
BERTopic | OSS local | Media | Gratis | Clusters + etiquetas automáticas | RAM alta > 6 GB |
Word2Vec / FastText + K-Means | OSS | Media | Gratis | Embeddings entrenables in-house | Menor contexto frase |
OpenAI embeddings + K-Means | SaaS | Baja | 0.13 $/M tok | Calidad top, sin GPU | Coste variable; datos nube |
Semrush Keyword Strategy Builder | SaaS | Baja | 20–200 €/mes | UI lista, AI clustering | Caja negra, $$ |
LogisticReg TF-IDF | OSS | Baja | Gratis | <1 s entrenamiento, interpretable | Solo superficie léxica |
BERT fine-tune | OSS / SaaS | Alta | GPU o 0.06 $/1k tok HF | Máxima precisión | Hardware / coste |
Rule-based intent labels | Manual | Baja | 0 | Control total | Poca escalabilidad |
AutoML Vertex AI | SaaS | Baja | 2 $–3 $/h training | Sin código, métricas | Precio; lock-in |
*Precios estimados mayo 2025.
5. Conclusiones
El machine learning permite automatizar tareas que, a mano, resultan tediosas o a veces inviables:
- Clustering semántico crea grupos de keywords coherentes para planificar silos y evitar canibalizaciones. SBERT + K-Means cubre el 90 % de los casos en segundos.
- Clasificación de intención orienta el tipo de contenido a producir (guía informacional, landing transaccional, etc.) y se entrena con unas decenas de ejemplos etiquetados.
- Herramientas comerciales (Semrush, Ahrefs) ofrecen estas funciones, pero dominar el stack Python brinda transparencia, ahorro y la opción de adaptar parámetros ad hoc.
Combina ambos enfoques: clúster + intención = mapa completo de lo que pide el usuario y cómo servirlo. Con los prompts y alternativas anteriores, ya tienes un laboratorio de ML listo para convertir listados de keywords en estrategias de contenido basadas en datos. ¡A entrenar y a escalar tu autoridad semántica!