How To Insert A Pandas Dataframe Into An Existing Hive External Table Using Python (without Pyspark)?
Solution 1:
It seems that you are trying to read into pandas dataframe from Hive table and doing some transformation and saving that back to some Hive external table. Please refer below code as sample. Here I have read from Hive table into pandas dataframe and added some date column to it. Later I have used subprocess module to execute my shell, which will load data into Hive table which is partitioned on some date column.
from pyhive import hive
import pandas as pd
import sqlalchemy
from sqlalchemy.engine import create_engine
import datetime
from subprocess import PIPE, Popen
import subprocess
import sys
conn = hive.Connection(host="yourhost.com", port=10000, username="vikct001")
cursor = conn.cursor()
query="select user_id,country from test_dev_db.test_data"
start_time= datetime.datetime.now()
output_file='/home/vikct001/user/vikrant/python/test_data.csv'
data=pd.read_sql(query,conn)
data['current_date'] = pd.datetime.today().strftime("%Y-%m-%d")
print(data)
data.to_csv(output_file, sep='|', encoding='utf-8',index=None)
hivequery=""" hive --hivevar loaded_date=$(date +"%Y-%m-%d") hive -e 'LOAD DATA LOCAL INPATH "/home/vikct001/user/vikrant/python/test_data.csv" INTO TABLE test_dev_db.test_data_external PARTITION (loaded_date="${hivevar:loaded_date}")';"""defsave_to_hdfs(output_file):
print("I am here")
p=subprocess.Popen(hivequery,shell=True,stderr=subprocess.PIPE)
stdout,stderr = p.communicate()
if p.returncode != 0:
print stderr
sys.exit(1)
save_to_hdfs(output_file)
end_time=datetime.datetime.now()
print'processing ends', (start_time-end_time).seconds/60.0,' minutes'
Table description:
hive (test_dev_db)> desc test_dev_db.test_data_external;
OK
id int
country string
input_date date
loaded_date string# Partition Information# col_name data_type comment
loaded_date string
you can see that data has been loaded and created a partition with current date.
hive(test_dev_db)>showpartitionstest_dev_db.test_data_external;OKloaded_date=2019-08-21hive(test_dev_db)>select*fromtest_dev_db.test_data_external;OK1India2019-08-21 2019-08-212Ukraine2019-08-21 2019-08-211India2019-08-21 2019-08-212Ukraine2019-08-21 2019-08-211India2019-08-21 2019-08-212Ukraine2019-08-21 2019-08-211India2019-08-21 2019-08-21
Solution 2:
Some pointers here before i get to the exact answer.
HDFS is nothing without partitions. In your case you haven't defined any partition. Leaving it as a default is never a good idea. It is your data and you must know how to partition that. So add a proper partition by clause.
Let us suppose LOAD_DATE is the column on which you want to set the partition. Dump the dataframe into a parquet format. I am keeping the same path as the parquet path for the HDFS data. Now go like below:
CREATE EXTERNAL TABLE IFNOT EXISTS school_db.student_credits
(
NAME_STUDENT_INITIAL STRING,
CREDITS_INITIAL STRING,
NAME_STUDENT_FINAL STRING,
CREDITS_FINAL STRING
)
partitioned by (LOAD_DATE STRING
ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat"
location '/user/gradebook/student_credits';set hive.msck.path.validation=ignore;
msck repair table school_db.student_credits;
Here on the repair
command is what you need to fire everyday. That will refresh the data and the new partitions as well.
On how to dump the dataframe into parquet file, use below
df_student_credits.write.mode("append").partitionBy("LOAD_DATE").parquet("user/gradebook/student_credits")
First create the parquet file and then the external table. Let me know if this solves your problem
Post a Comment for "How To Insert A Pandas Dataframe Into An Existing Hive External Table Using Python (without Pyspark)?"