Skip to content Skip to sidebar Skip to footer

How To Insert A Pandas Dataframe Into An Existing Hive External Table Using Python (without Pyspark)?

I'm creating a connection string to Hive and running some SELECT queries on the Hive tables on that connection. After performing some transfomrations upon the retrieved data, I'm c

Solution 1:

It seems that you are trying to read into pandas dataframe from Hive table and doing some transformation and saving that back to some Hive external table. Please refer below code as sample. Here I have read from Hive table into pandas dataframe and added some date column to it. Later I have used subprocess module to execute my shell, which will load data into Hive table which is partitioned on some date column.

from pyhive import hive
import pandas as pd
import sqlalchemy
from sqlalchemy.engine import create_engine
import datetime
from subprocess import PIPE, Popen
import subprocess
import sys

conn = hive.Connection(host="yourhost.com", port=10000, username="vikct001")
cursor = conn.cursor()

query="select user_id,country from test_dev_db.test_data"

start_time= datetime.datetime.now()

output_file='/home/vikct001/user/vikrant/python/test_data.csv'

data=pd.read_sql(query,conn)
data['current_date'] = pd.datetime.today().strftime("%Y-%m-%d")
print(data)

data.to_csv(output_file, sep='|', encoding='utf-8',index=None)

hivequery=""" hive --hivevar loaded_date=$(date +"%Y-%m-%d") hive -e 'LOAD DATA LOCAL INPATH "/home/vikct001/user/vikrant/python/test_data.csv" INTO TABLE test_dev_db.test_data_external PARTITION (loaded_date="${hivevar:loaded_date}")';"""defsave_to_hdfs(output_file):
        print("I am here")
        p=subprocess.Popen(hivequery,shell=True,stderr=subprocess.PIPE)
        stdout,stderr = p.communicate()
        if p.returncode != 0:
            print stderr
            sys.exit(1)


save_to_hdfs(output_file)
end_time=datetime.datetime.now()

print'processing ends', (start_time-end_time).seconds/60.0,' minutes'

Table description:

hive (test_dev_db)> desc test_dev_db.test_data_external;
OK
id                      int
country                 string
input_date              date
loaded_date             string# Partition Information# col_name              data_type               comment

loaded_date             string

you can see that data has been loaded and created a partition with current date.

hive(test_dev_db)>showpartitionstest_dev_db.test_data_external;OKloaded_date=2019-08-21hive(test_dev_db)>select*fromtest_dev_db.test_data_external;OK1India2019-08-21      2019-08-212Ukraine2019-08-21      2019-08-211India2019-08-21      2019-08-212Ukraine2019-08-21      2019-08-211India2019-08-21      2019-08-212Ukraine2019-08-21      2019-08-211India2019-08-21      2019-08-21

Solution 2:

Some pointers here before i get to the exact answer.

HDFS is nothing without partitions. In your case you haven't defined any partition. Leaving it as a default is never a good idea. It is your data and you must know how to partition that. So add a proper partition by clause.

Let us suppose LOAD_DATE is the column on which you want to set the partition. Dump the dataframe into a parquet format. I am keeping the same path as the parquet path for the HDFS data. Now go like below:

CREATE EXTERNAL TABLE IFNOT EXISTS school_db.student_credits
(
NAME_STUDENT_INITIAL STRING,
CREDITS_INITIAL STRING,
NAME_STUDENT_FINAL STRING,
CREDITS_FINAL STRING
)
partitioned by (LOAD_DATE STRING
ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
  STORED AS
    INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
    OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat"
location '/user/gradebook/student_credits';set hive.msck.path.validation=ignore;
msck repair table school_db.student_credits;

Here on the repair command is what you need to fire everyday. That will refresh the data and the new partitions as well.

On how to dump the dataframe into parquet file, use below

df_student_credits.write.mode("append").partitionBy("LOAD_DATE").parquet("user/gradebook/student_credits")

First create the parquet file and then the external table. Let me know if this solves your problem

Post a Comment for "How To Insert A Pandas Dataframe Into An Existing Hive External Table Using Python (without Pyspark)?"