0
respostas

Exportar dados Spark

Estou tendo um problema no código após tentar utilizar o comando: df.coalesce(1).write.csv(destino, header=True), quando utilizo o abaixo, ele exporta porém não fica a alteração feita pelo programa: df_pandas = df.select('*').toPandas() df_pandas.to_csv(destino, index=False, encoding="utf-8",line_terminator = '\n',sep = ';')

fazendo alguns testes, é alterado o tipo de dado da coluna corretamente, mas quando tento exportar o dado do programa, ele não efetua o processo. não consigo anexar todo o código, mas ele em suma é isto, Código:

import os
import re
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col,udf
import pyspark.sql.types as T
from pyspark.sql.types import DataType 
import pyspark.pandas as ps
import pyspark.sql.functions as F
import findspark
import pandas as pd
from datetime import date, datetime , timedelta
from pathlib import *
findspark.init()

class funcoes:
    def deal_intS(val):
        if isinstance(val, str):
            int_norm = re.compile(r'(-?\d+)([\,,\.]0)?')
            match = re.search(int_norm, val)
            if match is not None:
                return int(match.group(1))
            else:
                return None
        elif isinstance(val, int):
            return val
        elif isinstance(val, float) and not pd.isnull(val):
            return int(val)
        elif isinstance(val, bool):
            if val is True:
                return 1
            elif val is False:
                return 0
            else:
                return None
        else:
            return None


    deal_int = udf(deal_intS, T.IntegerType())

today = date.today()
strdia = today.strftime('%d-%m-%Y')
sc = SparkContext('local','First')
spark = SparkSession(sc).builder\
    .appName("basedados") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.debug.maxToStringFields", 1000) \
    .config("spark.memory.offHeap.enabled","true") \
    .config("spark.memory.offHeap.size","12g")\
    .getOrCreate()

def Ajustadados(nomearq, tipoArq):
    destino = ('C:/tmp'+ nomearq + strdia + 'norm.csv') 
    path = ('C:/tmp/normalizacao/'+ nomearq + '-' + strdia)

    df = spark.read.csv((path + '.' + tipoArq),inferSchema=True,header=True,sep=';')

    campos_int =['SERASA_SEVERIDADE_EXTERNA',    
        'SCR_SEVERIDADE_EXTERNA', 
        'SCPC_SEVERIDADE_EXTERNA', 
        'REC_FEDERAL_SEVERIDADE_EXTERNA' 
    ]
    campos_float = ['VLR_SPC_NACIONAL',
        'VLR_VENC_SCR',
        'DAT_BASE',
        'VLR_SEVERIDADE_INTERNA'
    ]
    campos_string = ['RAIZ']
    campos_timestamp = []
    campos_bool = []
    for campo in campos_int:
        if(df.columns.__contains__(campo)):
            df = df.withColumn(campo, funcoes.deal_int(campo))
        else:
            pass    
    for campo in campos_float:
        if(df.columns.__contains__(campo)):
            df = df.withColumn(campo, funcoes.deal_float(campo))
        else:
            pass   
    for campo in campos_string:
        if(df.columns.__contains__(campo)):
            df = df.withColumn(campo, funcoes.deal_string(campo))
        else:
            pass
    for campo in campos_timestamp:
        if(df.columns.__contains__(campo)):
           df = df.withColumn(campo, funcoes.deal_timestamp(campo))
        else:
            pass
    for campo in campos_bool:
        if(df.columns.__contains__(campo)):
            df = df.withColumn(campo, funcoes.deal_bool(campo))
        else:
            pass


    print(df.dtypes)
    df.coalesce(1).write.csv(destino, header=True)
    df_pandas = df.select('*').toPandas() #erro no envio para csv ter=star no notebook particular 
    print(df_pandas.dtypes)
    df_pandas.to_csv(destino, index=False, encoding="utf-8",line_terminator = '\n',sep = ';')


Ajustadados('arquivo','csv')'
erro:
An error occurred while calling o289.csv.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:638)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:278)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
    at ....