Boa tarde, tudo bem?
Estou tentando fazer um buscador semântico com o PySpark no Colab. Tentei de diversas formas, mas não consegui de nenhuma. A idéia seria indexar um dataframe pyspark com faiss ou o elastic search. Será que alguém poderia me ajudar?
Esse é o código:
Instalações:
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark[sql]
!pip install pyspark[pandas_on_spark] plotly
!pip install -q findspark
!pip install -q pyspark==3.3.0 spark-nlp==4.2.0
!pip install elasticsearch
!wget -q https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
A partir dessa parte, acredito que eu precise colocar algum tipo de config próprio do elasticsearch na SparkSession, mas não sei como fazer isso:
findspark.init()
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"
spark = SparkSession.builder \
.master('local[*]') \
.appName("Iniciando com Spark") \
.config('spark.ui.port', '4050') \
.getOrCreate()
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels
E, a partir daqui, preciso que o elastic search rode no google colab, mas também não sei como fazer essa configuração.
Depois disso, acredito que seria algo assim para criar a indexação:
assembler = DocumentAssembler()\
.setInputCol("value")\
.setOutputCol("document")\
tokenizer = Tokenizer()\
.setInputCols(['document'])\
.setOutputCol('tokens')
lemmatizer = LemmatizerModel.pretrained()\
.setInputCols(['tokens'])\
.setOutputCol('lemmas')
normalizer = Normalizer()\
.setCleanupPatterns([
'[^a-zA-Z.-]+',
'^[^a-zA-Z]+',
'[^a-zA-Z]+$',
])\
.setInputCols(['lemmas'])\
.setOutputCol('normalized')\
.setLowercase(True)
finisher = Finisher()\
.setInputCols(['normalized'])\
.setOutputCols(['normalized'])
nlp_pipeline = Pipeline().setStages([assembler, tokenizer,lemmatizer, normalizer, finisher]).fit(text_data)
processed = nlp_pipeline.transform(text_data)
Por fim, a query precisa passar pela mesma pipeline que o texto?
Obrigado