A ideia que eu tive foi de utilizar o try/except para verificar se havia a música no DataFrame e então retornar uma string explicando que a música não foi encontrada. E também coletei o cluster e os componentes da música de uma vez, ao invés de ter que buscar os componentes de novo realizando exatamente a mesma consulta
def recomendador(nome_musica):
# Colentando número no cluster e os componentes PCA da música base
try: # Verificando se a música base está na base de dados
cluster, componentes_musica = projection_kmeans\
.where(projection_kmeans['artists_song'] == nome_musica)\
.select('cluster_pca', 'pca_features')\
.collect()[0]
except:
return "Música não encontrada"
# Selecionando todas as músicas que estão no cluster acima
musicas_recomendadadas = projection_kmeans\
.where(projection_kmeans['cluster_pca'] == cluster)\
.select('artists_song', 'id', 'pca_features')
# Criando função para calcular a distância euclidiana
def calcula_distancia(value):
return euclidean(componentes_musica, value)
# Transformando a função acima de forma que o pyspark consiga utilizá-la
udf_calcula_distancia = f.udf(calcula_distancia, FloatType())
# Calculando a distância euclidiana de cada música no cluster em relação a música base
musicas_recomendadadas_dist = musicas_recomendadadas\
.withColumn('dist', udf_calcula_distancia('pca_features'))\
.where(musicas_recomendadadas['artists_song'] != nome_musica) # Retirando a música base da seleção
# Ordenando as musicas recomendadas e selecionando as 10 primeiras e apenas as colunas necessárias
recomendadas = musicas_recomendadadas_dist.sort('Dist').limit(10).select(['artists_song', 'id', 'Dist'])
# Mostrando o artista e o nome de cada música recomendada
recomendadas.select('artists_song').show(truncate=False)