Estou tendo um problema no código após tentar utilizar o comando:
df.coalesce(1).write.csv(destino, header=True)
, quando utilizo o abaixo, ele exporta porém não fica a alteração feita pelo programa:
df_pandas = df.select('*').toPandas()
df_pandas.to_csv(destino, index=False, encoding="utf-8",line_terminator = '\n',sep = ';')
fazendo alguns testes, é alterado o tipo de dado da coluna corretamente, mas quando tento exportar o dado do programa, ele não efetua o processo. não consigo anexar todo o código, mas ele em suma é isto, Código:
import os
import re
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col,udf
import pyspark.sql.types as T
from pyspark.sql.types import DataType
import pyspark.pandas as ps
import pyspark.sql.functions as F
import findspark
import pandas as pd
from datetime import date, datetime , timedelta
from pathlib import *
findspark.init()
class funcoes:
def deal_intS(val):
if isinstance(val, str):
int_norm = re.compile(r'(-?\d+)([\,,\.]0)?')
match = re.search(int_norm, val)
if match is not None:
return int(match.group(1))
else:
return None
elif isinstance(val, int):
return val
elif isinstance(val, float) and not pd.isnull(val):
return int(val)
elif isinstance(val, bool):
if val is True:
return 1
elif val is False:
return 0
else:
return None
else:
return None
deal_int = udf(deal_intS, T.IntegerType())
today = date.today()
strdia = today.strftime('%d-%m-%Y')
sc = SparkContext('local','First')
spark = SparkSession(sc).builder\
.appName("basedados") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.sql.debug.maxToStringFields", 1000) \
.config("spark.memory.offHeap.enabled","true") \
.config("spark.memory.offHeap.size","12g")\
.getOrCreate()
def Ajustadados(nomearq, tipoArq):
destino = ('C:/tmp'+ nomearq + strdia + 'norm.csv')
path = ('C:/tmp/normalizacao/'+ nomearq + '-' + strdia)
df = spark.read.csv((path + '.' + tipoArq),inferSchema=True,header=True,sep=';')
campos_int =['SERASA_SEVERIDADE_EXTERNA',
'SCR_SEVERIDADE_EXTERNA',
'SCPC_SEVERIDADE_EXTERNA',
'REC_FEDERAL_SEVERIDADE_EXTERNA'
]
campos_float = ['VLR_SPC_NACIONAL',
'VLR_VENC_SCR',
'DAT_BASE',
'VLR_SEVERIDADE_INTERNA'
]
campos_string = ['RAIZ']
campos_timestamp = []
campos_bool = []
for campo in campos_int:
if(df.columns.__contains__(campo)):
df = df.withColumn(campo, funcoes.deal_int(campo))
else:
pass
for campo in campos_float:
if(df.columns.__contains__(campo)):
df = df.withColumn(campo, funcoes.deal_float(campo))
else:
pass
for campo in campos_string:
if(df.columns.__contains__(campo)):
df = df.withColumn(campo, funcoes.deal_string(campo))
else:
pass
for campo in campos_timestamp:
if(df.columns.__contains__(campo)):
df = df.withColumn(campo, funcoes.deal_timestamp(campo))
else:
pass
for campo in campos_bool:
if(df.columns.__contains__(campo)):
df = df.withColumn(campo, funcoes.deal_bool(campo))
else:
pass
print(df.dtypes)
df.coalesce(1).write.csv(destino, header=True)
df_pandas = df.select('*').toPandas() #erro no envio para csv ter=star no notebook particular
print(df_pandas.dtypes)
df_pandas.to_csv(destino, index=False, encoding="utf-8",line_terminator = '\n',sep = ';')
Ajustadados('arquivo','csv')'
erro:
An error occurred while calling o289.csv.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:638)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:278)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
at ....