pySpark(파일(csv)에서 PostgreSQL로)


개발 서버에서 Spark를 사용하여 위의 파이프라인 데모를 구성했습니다.

Spark(PySpark)를 통해 특정 파일 확장자(CSV, JSON 등)를 RDB에 저장합니다.

> 샘플 파일


file_to_postgres

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
ip = "10.65.41.141"
port = 5432
user = "isharkk"
passwd = "rplinux"
db = "testt"
spark = SparkSession \
        .builder \
        .appName('SparkByExamples.com') \
        .config("spark.driver.extraClassPath", "/root/spark-3.2.2-bin-hadoop3/jars/postgresql-42.5.4.jar") \
        .getOrCreate()
df = spark.read.csv("file:///root/self.csv")
df2 = spark.read.option("header",True) \
     .csv("file:///root/self.csv")
df_with_schema.printSchema()
df2.write.option("header",True) \
 .csv("/tmp/spark_output/self.csv")
query1 = df2.write.format("jdbc")\
          .option("url","jdbc:postgresql://{0}:{1}/{2}".format(ip, port, db)) \
          .option("driver", "org.postgresql.Driver") \
          .option("dbtable", "ishark.self") \
          .option("user", user) \
          .option("password", passwd) \
          .save()

> CSV 파일의 해당 스키마 속성


> 적절한 CSV 파일 헤더 형식 추가


> PostgreSQL에 로드된 CSV 데이터