Table of contents
- 1. Install lib using Pip
- 2. Connecting to Database
- i. Check the driver
- ii. Configure the connection string
- iii. Create an engine using the create_engine() function, specifying the database URL
- iv. Define a function that checks the connection using connection.execute("SELECT 1") method
- v. Use the check_connection() function to verify the connection before executing any database operations
- 3. Construct the query
- 4. Construct the dboperation function
- 5. using execute() method
- 6. Using Pandas read_sql_query() menthod - DQL
- 7. Bulk insert CSV Files (Data Files) to SQL Server
- Conclusion
- Source: [Link]
- Author: Dheeraj. Yss
- Connect with me:
This article depicts about the process of connecting to SQL Server using Python.
1. Install lib using Pip
!pip install sqlalchemy
!pip install pyodbc
import sqlalchemy, pyodbc, os
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import OperationalError
os.chdir(r'path\orders')
os.getcwd()
os.listdir()
2. Connecting to Database
i. Check the driver
pyodbc.drivers()
ii. Configure the connection string
connection_url = URL.create(
"mssql+pyodbc",
username="test",
password="test",
host=".",
port=1433,
database="test",
query={
"driver": "ODBC Driver 18 for SQL Server",
"TrustServerCertificate": "yes", # When yes, the transport layer will use SSL to encrypt the channel and bypass walking the certificate chain to validate trust. Useful when using self-signed certificates or when the certificate chain cannot be validated.
"authentication": "SqlPassword", # use SQL login credentials instead of Windows authentication.
"pool_size": "1", # to limit the number of sessions to one
},
)
iii. Create an engine using the create_engine() function, specifying the database URL
engine = create_engine(connection_url)
iv. Define a function that checks the connection using connection.execute("SELECT 1") method
def check_connection():
try:
with engine.connect() as connection:
connection.execute(text('SELECT 1'))
connection.commit() # Important otherwise will be in open transaction state will eventually leads to slow running queries and leads to blocking.
return True
except OperationalError as error:
print("Error:", str(error))
return False
v. Use the check_connection() function to verify the connection before executing any database operations
- To Close the session use session.close()
if check_connection():
# Connection is valid, proceed with database operations
# Create a session factory
Session = sessionmaker(bind=engine)
# Create a session
session = Session()
# Perform database operations using the session
print(" Successfully established a connection to the database.")
else:
# Connection is not valid, handle the error or retry the connection
print("Error: Unable to establish a connection to the database.")
Successfully established a connection to the database.
3. Construct the query
# Construct the query
qlist_tables = """
SELECT TOP 10000 *
FROM [db].INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE IN ('BASE TABLE')
ORDER BY TABLE_NAME ASC
"""
qslt_orders = """
SELECT TOP 10000
*
FROM [db].[dbo].[orders]
"""
qcrtab_instcus = """
BEGIN TRY
BEGIN TRANSACTION T1CRTAB
IF OBJECT_ID(N'[db].[dbo].[instnwnd Customers]', N'U') IS NOT NULL
DROP TABLE [db].[dbo].[instnwnd Customers]
CREATE TABLE [db].[dbo].[instnwnd Customers] (
"CustomerID" nchar (5) NOT NULL ,
"CompanyName" nvarchar (40) NOT NULL ,
"ContactName" nvarchar (30) NULL ,
"ContactTitle" nvarchar (30) NULL ,
"Address" nvarchar (60) NULL ,
"City" nvarchar (15) NULL ,
"Region" nvarchar (15) NULL ,
"PostalCode" nvarchar (10) NULL ,
"Country" nvarchar (15) NULL ,
"Phone" nvarchar (24) NULL ,
"Fax" nvarchar (24) NULL ,
)
COMMIT TRANSACTION T1CRTAB
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION T1CRTAB
PRINT 'Error occurred: ' + ERROR_MESSAGE();
END CATCH
"""
qslt_instcus = """
SELECT TOP 10000
*
FROM [db].[dbo].[instnwnd Customers]
"""
qinst_instcus = """
BEGIN TRY
BEGIN TRANSACTION T1INSTINCUS
IF OBJECT_ID(N'[db].[dbo].[instnwnd Customers]', N'U') IS NOT NULL
BEGIN
INSERT INTO [db].[dbo].[instnwnd Customers] VALUES('ALFKI','Alfreds Futterkiste','Maria Anders','Sales Representative','Obere Str. 57','Berlin',NULL,'12209','Germany','030-0074321','030-0076545')
INSERT INTO [db].[dbo].[instnwnd Customers] VALUES('ANATR','Ana Trujillo Emparedados y helados','Ana Trujillo','Owner','Avda. de la Constitución 2222','México D.F.',NULL,'05021','Mexico','(5) 555-4729','(5) 555-3745')
INSERT INTO [db].[dbo].[instnwnd Customers] VALUES('ANTON','Antonio Moreno Taquería','Antonio Moreno','Owner','Mataderos 2312','México D.F.',NULL,'05023','Mexico','(5) 555-3932',NULL)
END
COMMIT TRANSACTION T1INSTINCUS
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION T1INSTINCUS
PRINT 'Error occurred: ' + ERROR_MESSAGE();
END CATCH
"""
4. Construct the dboperation function
def dboperaiton(query, commit = 'nocommit'):
try:
result = session.execute(text(query)) # engine.connect().execute(text(query))
# print(result.keys(), result.fetchall())
if commit == 'commit':
session.commit() # Important otherwise will be in open transaction state will eventually leads to slow running queries and leads to blocking.
# session.close()
return result, result.keys(), result.fetchall()
except Exception as e:
print(f"An error occurred: {str(e)}")
5. using execute() method
i. DQL: Select
# try:
# result = engine.connect().execute(text(qlist_tables))
# print(result.all(), result.keys())
# except Exception as e:
# print(f"An error occurred: {str(e)}")
dboperaiton(result, qlist_tables)
try:
result2 = session.execute(text(qslt_orders)) # engine.connect() will create another session so use session
print(type(result2), result2.fetchall(), result2.keys())
except Exception as e:
print(f"An error occurred: {str(e)}")
Pd dataframe:
# data = engine.connect().execute(text(qslt_orders))
data = dboperaiton(qslt_orders)
# Convert the result to a pandas DataFrame
#df_result2 = pd.DataFrame(data.fetchall(), columns=data.keys())
df_result2 = pd.DataFrame(data[2], columns=data[1])
df_result2
ii. DDL: Create
dboperaiton(qcrtab_instcus, 'commit')
dboperaiton(qslt_instcus)
iii. DML: Insert
dboperaiton(qinst_instcus, 'commit')
dboperaiton(qslt_instcus)
6. Using Pandas read_sql_query() menthod - DQL
i. DQL: Select
df = pd.read_sql_query(qlist_tables,engine)
df
df2 = pd.read_sql_query(qslt_orders,engine)
df2
ii. DDL: Create
# Create a DataFrame
data = {
'Name': ['John', 'Jane', 'Mike'],
'Age': [25, 30, 35],
'City': ['New York', 'London', 'Paris']
}
df = pd.DataFrame(data)
# Write the DataFrame to a SQL table
table_name = 'tabpdperson'
df.to_sql(table_name, engine, if_exists='replace', index=False)
# Confirm the table creation
query = f"""
SELECT *
FROM {table_name}
"""
result = pd.read_sql(query, engine)
result
iii. DML: Insert
insert_data = {
'Name': ['Mary', 'Peter', 'Mike'],
'Age': [52, 40, 45],
'City': ['Colorado', 'Texas', 'Arizonia']
}
df = pd.DataFrame(insert_data)
table_name = 'tabpdperson'
df.to_sql(table_name, engine, if_exists = 'append', index = False)
table_name = 'tabpdperson'
query = f"""
SELECT TOP 10000
*
FROM [dballpurpose].[dbo].[tabpdperson]
"""
df = pd.read_sql_query(query, engine)
df
Error:
Observed Long running queries and multiple sessions & multiple open transactions
Wiaittype: LCK_M_S
Sql query from sql handle
DBCC OPENTRAN:
killed all open sessions.
7. Bulk insert CSV Files (Data Files) to SQL Server
Change the current working directory to the specified path where order files are located.
os.chdir(r'path\orders')
List all files and directories in the current working directory.
os.listdir()
Retrieve and return the current working directory of the process.
os.getcwd()
Create a list of tuples containing file information for all CSV files in the current directory.
# Create a list of tuples containing file information for all CSV files in the current directory.
# Each tuple will contain a formatted string and the absolute path to the file.
data_files = [
[
f'{os.path.basename(os.getcwd())} {fn.split(".")[0]}', # Create a formatted string with the directory name and the file name (without extension)
os.path.join(os.getcwd(), fn) # Generate the full path to the file
]
for fn in os.listdir(os.getcwd()) # List all files in the current directory
if fn.endswith('.csv') # Filter the list to include only files that end with '.csv'
]
data_files # This line evaluates to the list of data_files, which can be used for further processing or output.
i. Read CSV data into pandas dataframe
orderSchema = {
"SalesOrderNumber": str,
"SalesOrderLineNumber": int,
"OrderDate": 'category',
"CustomerName": str,
"Email": str,
"Item": str,
"Quantity": int,
"UnitPrice": float,
"Tax": float
}
df = pd.DataFrame()
for path in data_files:
newdata = pd.read_csv(path[1], header= None, names=orderSchema.keys(), dtype=orderSchema, parse_dates=["OrderDate"])
df = pd.concat([df, newdata], ignore_index=True)
df
Group the DataFrame 'df' by the year part of the 'OrderDate' column.
df.groupby(pd.to_datetime(df['OrderDate']).dt.year).count()
df[['OrderDate', 'SalesOrderNumber']].groupby(pd.to_datetime(df['OrderDate']).dt.year)['SalesOrderNumber'].count().reset_index(name='SalesOrderNumber')
df.groupby(pd.to_datetime(df['OrderDate']).dt.year).agg({
'SalesOrderNumber':'count','SalesOrderLineNumber':'count',
'CustomerName':'count','Email':'count',
'Item':'count', 'Quantity':'count',
'UnitPrice':'count', 'Tax':'count'
}).reset_index()
ii. Clean df
Find duplicates in SalesOrderNumber column:
df['SalesOrderNumber'].duplicated().sum()
check duplicates in each year
df_subset = df[['OrderDate','SalesOrderNumber']].loc[
pd.to_datetime(df['OrderDate']).dt.year == 2019
]
df_subset['SalesOrderNumber'].duplicated().sum()
df.groupby(pd.to_datetime(df['OrderDate']).dt.year)['SalesOrderNumber'].apply(lambda x: x.duplicated().sum()).reset_index(name='DuplicatedSalesOrderNumber')df_subset = df.groupby(pd.to_datetime(df['OrderDate']).dt.year).agg({
'SalesOrderNumber':'count','SalesOrderLineNumber':'count',
'CustomerName':'count','Email':'count',
'Item':'count', 'Quantity':'count',
'UnitPrice':'count', 'Tax':'count'
}).reset_index()
df_subset['DuplicatedSalesOrderNumber'] = df.groupby(pd.to_datetime(df['OrderDate']).dt.year)['SalesOrderNumber'].apply(lambda x: x.duplicated().sum()).to_list()
df_subset
plot to visualize the duplicates:
sns.set_theme(style="ticks")
fig, ax = plt.subplots()
# Set the width of each bar
bar_width = 0.35
# Calculate the position of each bar on the x-axis
r1 = df_subset['OrderDate'].to_list() # [2019, 2020, 2021]
r2 = [x + bar_width for x in r1]
# Plot the Sales Order Count
ax.bar(r1, df_subset['SalesOrderNumber'], color='steelblue', width=bar_width, label='Sales Order Count')
# Plot the Duplicated Sales Order Count
ax.bar(r2, df_subset['DuplicatedSalesOrderNumber'], color='red', width=bar_width, label='Duplicated Sales Order Count')
# Set labels and title
ax.set_xlabel('Year')
ax.set_ylabel('Count')
ax.set_title('Sales Order Count and Duplicated Sales Order Count by Year')
# Set the x-axis tick labels
ax.set_xticks([r + bar_width/2 for r in r1])
ax.set_xticklabels(df_subset['OrderDate'])
# Add legend
ax.legend()
# Display the plot
plt.show()
using sns barplot:
sns.set_theme(style="ticks")
fig, axs = plt.subplots(1, 2, figsize = (10,5), sharey= True)
# Flatten the array of axes (subplots) for easier iteration
axs = axs.flatten()
# Plot the Sales Order Count
sns.barplot(data=df_subset, x='OrderDate', y='SalesOrderNumber', ax=axs[0])
axs[0].set_xlabel('Year')
axs[0].set_ylabel('')
# Plot the Duplicated Sales Order Count
sns.barplot(data=df_subset, x='OrderDate', y='DuplicatedSalesOrderNumber', ax=axs[1], color='black')
axs[1].set_xlabel('Year')
axs[1].set_ylabel('Duplicated Sales Order Count')
# Add a legend to the figure with the labels 'Unemployement' and 'Participation'
fig.legend(labels = ['Sales Order Count', 'Duplicated Sales Order Count'])
# Display the plot
plt.show()
using sns barplot adjacent
sns.set_style("whitegrid")
df_subset = df_subset[['OrderDate', 'SalesOrderNumber', 'DuplicatedSalesOrderNumber']].melt(id_vars='OrderDate', var_name='Variable', value_name='Count')
sns.barplot(x='OrderDate', y='Count', hue='Variable', data=df_subset)
plt.xlabel('Order Date')
plt.ylabel('Count')
plt.title('Sales Order Numbers vs Duplicated Sales Order Numbers')
plt.legend(title='Variable')
plt.show()
drop duplicate
df_clean = df.drop_duplicates(subset = 'SalesOrderNumber')
df_clean.duplicated().sum()
df_clean.shape
check nulls
df_clean.isnull().sum(axis=1).plot()
rebuild indexes:
df_clean = df.drop_duplicates(subset = 'SalesOrderNumber')
df_clean.reset_index(drop=True, inplace=True)
df_clean.duplicated().sum()
df_clean.shape
iii. Send data to SQL Server using pandas to_sql() method
df_clean.to_sql(table_name, engine, if_exists='replace', index=False)
df_clean
iii. Create data for orders year 2022
Generate random number function
def genran(type = 'int'):
if type == 'int':
ran = int((random.randint(1,9)*random.random()*random.random())*1000)
else:
ran = round((random.randint(1,9)*random.random()*random.random())*1000, random.randint(2,4))
return ran if ran < df['SalesOrderNumber'].count() else None
Generate date function
strtdate = datetime.date(2022,1,1)
enddate = datetime.date(2022,12,31)
delta = datetime.timedelta(days=1)
datevar = strtdate
def get_date():
global datevar
if datevar <= enddate and datevar != strtdate:
storedate = datevar
datevar = datevar + delta
return storedate
else:
datevar = datevar + delta
return strtdate
Generating Sales Order Data with Incrementing Sales Order Numbers
sta_son = int(df_clean['SalesOrderNumber'].str[2:].max()) + 1
key_list = df.columns.to_list()
dict_data = {key: [] for key in key_list}
for row in range(0,1000):
dict_data['SalesOrderNumber'].append('SO' + str(sta_son))
dict_data['SalesOrderLineNumber'].append(random.randint(1,7))
dict_data['OrderDate'].append(get_date())
cusema = df_clean[['CustomerName', 'Email']].loc[
df_clean.index == genran()]
dict_data['CustomerName'].append(cusema.values[0][0])
dict_data['Email'].append(cusema.values[0][1])
dict_data['Item'].append(df_clean['Item'].loc[
df_clean.index == genran()].values[0])
dict_data['Quantity'].append(1)
dict_data['UnitPrice'].append(genran('round'))
dict_data['Tax'].append(genran('round'))
sta_son += 1
dict_data
Create dataframe from the dict data
df_gendata = pd.DataFrame(dict_data)
df_gendata
iv. Add the generated orders data for year 2022 to df_clean
16459 old + 2000 added = 18459 rows
df_clean2 = pd.concat([df_clean, pd.DataFrame(dict_data)], ignore_index=True)
df_clean2
df_clean2.groupby(pd.to_datetime(df_clean2['OrderDate']).dt.year).count() #[['OrderDate','CustomerName']]
Data in df_clean2.
df_clean2.loc[
pd.to_datetime(df_clean2['OrderDate']).dt.year == 2022,
['CustomerName', 'OrderDate', 'Item', 'Quantity', 'UnitPrice', 'Tax']
]
check data from sql table
table_name = 'Sales orders'
query = f"""
SELECT TOP 10000
YEAR(OrderDate) AS year,
COUNT(SalesOrderNumber) AS [No of records],
SUM(COUNT(SalesOrderNumber)) OVER(ORDER BY YEAR(OrderDate)) AS [Running records count]
FROM [dballpurpose].[dbo].[Sales orders]
GROUP BY YEAR(OrderDate)
ORDER BY YEAR(OrderDate)
"""
df_salesorders = pd.read_sql_query(query, engine)
df_salesorders
v. Send data for orders year 2022 to SQL Server
table_name = 'Sales orders'
df_clean2.to_sql(table_name, engine, if_exists= 'append', index=False)
Errors:
TypeError: the dtype datetime64[ns] is not supported for parsing, pass this column using parse_dates instead.
"OrderDate": 'category'
, specifyparse_dates=["OrderDate"]
TypeError: cannot concatenate object of type '<class 'list'>'; only Series and DataFrame objs are valid
df = pd.DataFrame()
initialize an empty DataFrame
ValueError: If using all scalar values, you must pass an index
- To fix this error, you can pass an index when creating the DataFrame
IndexError: index 0 is out of bounds for axis 0 with size 0
rebuild indexes.
before rebuild indexes:
- after rebuild indexes:
df_clean.reset_index(drop=True, inplace=True)
IndexError: Boolean index has wrong length: 11038 instead of 18459
MemoryError:
Conclusion
Learning Objectives,
Python, sqlalchemy lib, pandas lib. seaborn lib,
connect to sql server using python.
verify the connection.
perform DQL select and DDL create, DML write operations using python
Using Pandas read_sql_query() menthod - DQL
Bulk insert CSV Files
finding duplicates, check nulls, clean data, rebuild indexes:
Generate data for orders year 2022
Send data for orders year 2022 to SQL Server