Perform read & write on SQL Server using Python

Perform read & write on SQL Server using Python

·

10 min read

This article depicts about the process of connecting to SQL Server using Python.

1. Install lib using Pip

  • !pip install sqlalchemy

  • !pip install pyodbc

import sqlalchemy, pyodbc, os
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import OperationalError
os.chdir(r'path\orders')
os.getcwd()
os.listdir()

2. Connecting to Database

i. Check the driver

pyodbc.drivers()

ii. Configure the connection string

connection_url = URL.create(
    "mssql+pyodbc",
    username="test",
    password="test",
    host=".",
    port=1433,
    database="test",
    query={
        "driver": "ODBC Driver 18 for SQL Server",
         "TrustServerCertificate": "yes", # When yes, the transport layer will use SSL to encrypt the channel and bypass walking the certificate chain to validate trust. Useful when using self-signed certificates or when the certificate chain cannot be validated.
        "authentication": "SqlPassword", # use SQL login credentials instead of Windows authentication.
        "pool_size": "1", # to limit the number of sessions to one
    },
)

iii. Create an engine using the create_engine() function, specifying the database URL

engine = create_engine(connection_url)

iv. Define a function that checks the connection using connection.execute("SELECT 1") method

def check_connection():
    try:
        with engine.connect() as connection:
            connection.execute(text('SELECT 1'))
            connection.commit() # Important otherwise will be in open transaction state will eventually leads to slow running queries and leads to blocking.
        return True
    except OperationalError as error:
        print("Error:", str(error))
        return False

v. Use the check_connection() function to verify the connection before executing any database operations

  • To Close the session use session.close()
if check_connection():
    # Connection is valid, proceed with database operations
    # Create a session factory
    Session = sessionmaker(bind=engine)

    # Create a session
    session = Session()
    # Perform database operations using the session
    print(" Successfully established a connection to the database.")
else:
    # Connection is not valid, handle the error or retry the connection
    print("Error: Unable to establish a connection to the database.")

Successfully established a connection to the database.

3. Construct the query

# Construct the query
qlist_tables = """
    SELECT TOP 10000 *
    FROM [db].INFORMATION_SCHEMA.TABLES
    WHERE TABLE_TYPE IN ('BASE TABLE')
    ORDER BY TABLE_NAME ASC
"""
qslt_orders = """
SELECT TOP 10000 
*
FROM [db].[dbo].[orders]
"""

qcrtab_instcus = """
BEGIN TRY
    BEGIN TRANSACTION T1CRTAB

    IF OBJECT_ID(N'[db].[dbo].[instnwnd Customers]', N'U') IS NOT NULL
        DROP TABLE [db].[dbo].[instnwnd Customers]

    CREATE TABLE [db].[dbo].[instnwnd Customers] (
        "CustomerID" nchar (5) NOT NULL ,
        "CompanyName" nvarchar (40) NOT NULL ,
        "ContactName" nvarchar (30) NULL ,
        "ContactTitle" nvarchar (30) NULL ,
        "Address" nvarchar (60) NULL ,
        "City" nvarchar (15) NULL ,
        "Region" nvarchar (15) NULL ,
        "PostalCode" nvarchar (10) NULL ,
        "Country" nvarchar (15) NULL ,
        "Phone" nvarchar (24) NULL ,
        "Fax" nvarchar (24) NULL ,
    )
    COMMIT TRANSACTION T1CRTAB
END TRY

BEGIN CATCH
    ROLLBACK TRANSACTION T1CRTAB
    PRINT 'Error occurred: ' + ERROR_MESSAGE();
END CATCH
"""

qslt_instcus = """
SELECT TOP 10000 
*
FROM [db].[dbo].[instnwnd Customers]
"""


qinst_instcus = """
BEGIN TRY
    BEGIN TRANSACTION T1INSTINCUS

    IF OBJECT_ID(N'[db].[dbo].[instnwnd Customers]', N'U') IS NOT NULL
    BEGIN
        INSERT INTO [db].[dbo].[instnwnd Customers] VALUES('ALFKI','Alfreds Futterkiste','Maria Anders','Sales Representative','Obere Str. 57','Berlin',NULL,'12209','Germany','030-0074321','030-0076545')
        INSERT INTO [db].[dbo].[instnwnd Customers] VALUES('ANATR','Ana Trujillo Emparedados y helados','Ana Trujillo','Owner','Avda. de la Constitución 2222','México D.F.',NULL,'05021','Mexico','(5) 555-4729','(5) 555-3745')
        INSERT INTO [db].[dbo].[instnwnd Customers] VALUES('ANTON','Antonio Moreno Taquería','Antonio Moreno','Owner','Mataderos  2312','México D.F.',NULL,'05023','Mexico','(5) 555-3932',NULL)
    END
    COMMIT TRANSACTION T1INSTINCUS
END TRY

BEGIN CATCH
    ROLLBACK TRANSACTION T1INSTINCUS
    PRINT 'Error occurred: ' + ERROR_MESSAGE();
END CATCH
"""

4. Construct the dboperation function

def dboperaiton(query, commit = 'nocommit'):
    try:
        result = session.execute(text(query)) # engine.connect().execute(text(query))
        # print(result.keys(), result.fetchall())
        if commit == 'commit':
            session.commit() # Important otherwise will be in open transaction state will eventually leads to slow running queries and leads to blocking.
        # session.close()
        return result, result.keys(), result.fetchall()
    except Exception as e:
        print(f"An error occurred: {str(e)}")

5. using execute() method

i. DQL: Select

# try:
#     result = engine.connect().execute(text(qlist_tables))
#     print(result.all(), result.keys())
# except Exception as e:
#     print(f"An error occurred: {str(e)}")

dboperaiton(result, qlist_tables)

try:
    result2 = session.execute(text(qslt_orders)) # engine.connect() will create another session so use session
    print(type(result2), result2.fetchall(), result2.keys())
except Exception as e:
    print(f"An error occurred: {str(e)}")

Pd dataframe:

# data = engine.connect().execute(text(qslt_orders))

data = dboperaiton(qslt_orders)

# Convert the result to a pandas DataFrame
#df_result2 = pd.DataFrame(data.fetchall(), columns=data.keys())

df_result2 = pd.DataFrame(data[2], columns=data[1])
df_result2

ii. DDL: Create

dboperaiton(qcrtab_instcus, 'commit')
dboperaiton(qslt_instcus)

iii. DML: Insert

dboperaiton(qinst_instcus, 'commit')
dboperaiton(qslt_instcus)

6. Using Pandas read_sql_query() menthod - DQL

i. DQL: Select

df = pd.read_sql_query(qlist_tables,engine)
df
df2 = pd.read_sql_query(qslt_orders,engine)
df2

ii. DDL: Create

# Create a DataFrame
data = {
    'Name': ['John', 'Jane', 'Mike'],
    'Age': [25, 30, 35],
    'City': ['New York', 'London', 'Paris']
}
df = pd.DataFrame(data)

# Write the DataFrame to a SQL table
table_name = 'tabpdperson'
df.to_sql(table_name, engine, if_exists='replace', index=False)
# Confirm the table creation
query = f"""
SELECT * 
FROM {table_name}
"""
result = pd.read_sql(query, engine)
result

iii. DML: Insert

insert_data = {
    'Name': ['Mary', 'Peter', 'Mike'],
    'Age': [52, 40, 45],
    'City': ['Colorado', 'Texas', 'Arizonia']
}

df = pd.DataFrame(insert_data)

table_name = 'tabpdperson'

df.to_sql(table_name, engine, if_exists = 'append', index = False)

table_name = 'tabpdperson'

query = f"""
SELECT TOP 10000 
*
FROM [dballpurpose].[dbo].[tabpdperson]
"""

df = pd.read_sql_query(query, engine)
df


Error:

Observed Long running queries and multiple sessions & multiple open transactions

Wiaittype: LCK_M_S

Sql query from sql handle

DBCC OPENTRAN:

killed all open sessions.

7. Bulk insert CSV Files (Data Files) to SQL Server

Change the current working directory to the specified path where order files are located.

os.chdir(r'path\orders')

List all files and directories in the current working directory.

os.listdir()

Retrieve and return the current working directory of the process.

os.getcwd()

Create a list of tuples containing file information for all CSV files in the current directory.

# Create a list of tuples containing file information for all CSV files in the current directory.
# Each tuple will contain a formatted string and the absolute path to the file.
data_files = [
    [
        f'{os.path.basename(os.getcwd())} {fn.split(".")[0]}',  # Create a formatted string with the directory name and the file name (without extension)
        os.path.join(os.getcwd(), fn)                           # Generate the full path to the file
    ] 
    for fn in os.listdir(os.getcwd())                           # List all files in the current directory
    if fn.endswith('.csv')                                      # Filter the list to include only files that end with '.csv'
]

data_files  # This line evaluates to the list of data_files, which can be used for further processing or output.

i. Read CSV data into pandas dataframe

orderSchema = {
    "SalesOrderNumber": str,
    "SalesOrderLineNumber": int,
    "OrderDate": 'category',
    "CustomerName": str,
    "Email": str,
    "Item": str,
    "Quantity": int,
    "UnitPrice": float,
    "Tax": float
}
df = pd.DataFrame()
for path in data_files:
    newdata = pd.read_csv(path[1], header= None, names=orderSchema.keys(), dtype=orderSchema, parse_dates=["OrderDate"])
    df = pd.concat([df, newdata], ignore_index=True)
df

Group the DataFrame 'df' by the year part of the 'OrderDate' column.

df.groupby(pd.to_datetime(df['OrderDate']).dt.year).count()

df[['OrderDate', 'SalesOrderNumber']].groupby(pd.to_datetime(df['OrderDate']).dt.year)['SalesOrderNumber'].count().reset_index(name='SalesOrderNumber')

df.groupby(pd.to_datetime(df['OrderDate']).dt.year).agg({
    'SalesOrderNumber':'count','SalesOrderLineNumber':'count',
    'CustomerName':'count','Email':'count',
    'Item':'count', 'Quantity':'count',
    'UnitPrice':'count', 'Tax':'count'
    }).reset_index()

ii. Clean df

Find duplicates in SalesOrderNumber column:

df['SalesOrderNumber'].duplicated().sum()

check duplicates in each year

df_subset = df[['OrderDate','SalesOrderNumber']].loc[
    pd.to_datetime(df['OrderDate']).dt.year == 2019
]
df_subset['SalesOrderNumber'].duplicated().sum()
df.groupby(pd.to_datetime(df['OrderDate']).dt.year)['SalesOrderNumber'].apply(lambda x: x.duplicated().sum()).reset_index(name='DuplicatedSalesOrderNumber')df_subset = df.groupby(pd.to_datetime(df['OrderDate']).dt.year).agg({
    'SalesOrderNumber':'count','SalesOrderLineNumber':'count',
    'CustomerName':'count','Email':'count',
    'Item':'count', 'Quantity':'count',
    'UnitPrice':'count', 'Tax':'count'
    }).reset_index()
df_subset['DuplicatedSalesOrderNumber'] = df.groupby(pd.to_datetime(df['OrderDate']).dt.year)['SalesOrderNumber'].apply(lambda x: x.duplicated().sum()).to_list()
df_subset

plot to visualize the duplicates:

sns.set_theme(style="ticks")
fig, ax = plt.subplots()

# Set the width of each bar
bar_width = 0.35

# Calculate the position of each bar on the x-axis
r1 = df_subset['OrderDate'].to_list() # [2019, 2020, 2021]
r2 = [x + bar_width for x in r1]

# Plot the Sales Order Count
ax.bar(r1, df_subset['SalesOrderNumber'], color='steelblue', width=bar_width, label='Sales Order Count')

# Plot the Duplicated Sales Order Count
ax.bar(r2, df_subset['DuplicatedSalesOrderNumber'], color='red', width=bar_width, label='Duplicated Sales Order Count')

# Set labels and title
ax.set_xlabel('Year')
ax.set_ylabel('Count')
ax.set_title('Sales Order Count and Duplicated Sales Order Count by Year')

# Set the x-axis tick labels
ax.set_xticks([r + bar_width/2 for r in r1])
ax.set_xticklabels(df_subset['OrderDate'])

# Add legend
ax.legend()

# Display the plot
plt.show()

using sns barplot:

sns.set_theme(style="ticks")
fig, axs = plt.subplots(1, 2, figsize = (10,5), sharey= True)

# Flatten the array of axes (subplots) for easier iteration
axs = axs.flatten()

# Plot the Sales Order Count
sns.barplot(data=df_subset, x='OrderDate', y='SalesOrderNumber', ax=axs[0])
axs[0].set_xlabel('Year')
axs[0].set_ylabel('')

# Plot the Duplicated Sales Order Count
sns.barplot(data=df_subset, x='OrderDate', y='DuplicatedSalesOrderNumber', ax=axs[1], color='black')
axs[1].set_xlabel('Year')
axs[1].set_ylabel('Duplicated Sales Order Count')

# Add a legend to the figure with the labels 'Unemployement' and 'Participation'
fig.legend(labels = ['Sales Order Count', 'Duplicated Sales Order Count'])

# Display the plot
plt.show()

using sns barplot adjacent

sns.set_style("whitegrid")

df_subset = df_subset[['OrderDate', 'SalesOrderNumber', 'DuplicatedSalesOrderNumber']].melt(id_vars='OrderDate', var_name='Variable', value_name='Count')

sns.barplot(x='OrderDate', y='Count', hue='Variable', data=df_subset)

plt.xlabel('Order Date')
plt.ylabel('Count')
plt.title('Sales Order Numbers vs Duplicated Sales Order Numbers')
plt.legend(title='Variable')
plt.show()

drop duplicate

df_clean = df.drop_duplicates(subset = 'SalesOrderNumber')
df_clean.duplicated().sum()
df_clean.shape

check nulls

df_clean.isnull().sum(axis=1).plot()

rebuild indexes:

df_clean = df.drop_duplicates(subset = 'SalesOrderNumber')
df_clean.reset_index(drop=True, inplace=True)
df_clean.duplicated().sum()
df_clean.shape

iii. Send data to SQL Server using pandas to_sql() method

df_clean.to_sql(table_name, engine, if_exists='replace', index=False)

df_clean

iii. Create data for orders year 2022

Generate random number function

def genran(type = 'int'):
    if type == 'int':
        ran = int((random.randint(1,9)*random.random()*random.random())*1000) 
    else:
        ran = round((random.randint(1,9)*random.random()*random.random())*1000, random.randint(2,4))

    return ran if ran < df['SalesOrderNumber'].count() else None

Generate date function

strtdate = datetime.date(2022,1,1)
enddate = datetime.date(2022,12,31)
delta = datetime.timedelta(days=1)
datevar = strtdate

def get_date():
    global datevar
    if datevar <= enddate and datevar != strtdate:
        storedate = datevar
        datevar = datevar + delta
        return storedate
    else:
        datevar = datevar + delta
        return strtdate

Generating Sales Order Data with Incrementing Sales Order Numbers

sta_son = int(df_clean['SalesOrderNumber'].str[2:].max()) + 1

key_list = df.columns.to_list()
dict_data = {key: [] for key in key_list}

for row in range(0,1000):

    dict_data['SalesOrderNumber'].append('SO' + str(sta_son))

    dict_data['SalesOrderLineNumber'].append(random.randint(1,7))

    dict_data['OrderDate'].append(get_date())

    cusema = df_clean[['CustomerName', 'Email']].loc[
    df_clean.index == genran()]

    dict_data['CustomerName'].append(cusema.values[0][0])
    dict_data['Email'].append(cusema.values[0][1])

    dict_data['Item'].append(df_clean['Item'].loc[
    df_clean.index == genran()].values[0])

    dict_data['Quantity'].append(1)

    dict_data['UnitPrice'].append(genran('round'))

    dict_data['Tax'].append(genran('round'))

    sta_son += 1

dict_data

Create dataframe from the dict data

df_gendata = pd.DataFrame(dict_data)
df_gendata

iv. Add the generated orders data for year 2022 to df_clean

16459 old + 2000 added = 18459 rows

df_clean2 = pd.concat([df_clean, pd.DataFrame(dict_data)], ignore_index=True)
df_clean2

df_clean2.groupby(pd.to_datetime(df_clean2['OrderDate']).dt.year).count() #[['OrderDate','CustomerName']]

Data in df_clean2.

df_clean2.loc[
    pd.to_datetime(df_clean2['OrderDate']).dt.year == 2022,
    ['CustomerName', 'OrderDate', 'Item', 'Quantity', 'UnitPrice', 'Tax']
]

check data from sql table

table_name = 'Sales orders'

query = f"""
SELECT TOP 10000 
YEAR(OrderDate) AS year,
COUNT(SalesOrderNumber) AS [No of records],
SUM(COUNT(SalesOrderNumber)) OVER(ORDER BY YEAR(OrderDate)) AS [Running records count]
FROM [dballpurpose].[dbo].[Sales orders]
GROUP BY YEAR(OrderDate)
ORDER BY YEAR(OrderDate)
"""

df_salesorders = pd.read_sql_query(query, engine)
df_salesorders

v. Send data for orders year 2022 to SQL Server

table_name = 'Sales orders'
df_clean2.to_sql(table_name, engine, if_exists= 'append', index=False)

Errors:

  1. TypeError: the dtype datetime64[ns] is not supported for parsing, pass this column using parse_dates instead.

    • "OrderDate": 'category', specify parse_dates=["OrderDate"]
  2. TypeError: cannot concatenate object of type '<class 'list'>'; only Series and DataFrame objs are valid

    • df = pd.DataFrame() initialize an empty DataFrame
  3. ValueError: If using all scalar values, you must pass an index

    • To fix this error, you can pass an index when creating the DataFrame
  4. IndexError: index 0 is out of bounds for axis 0 with size 0

    • rebuild indexes.

    • before rebuild indexes:

  • after rebuild indexes: df_clean.reset_index(drop=True, inplace=True)
  1. IndexError: Boolean index has wrong length: 11038 instead of 18459

  2. MemoryError:

Conclusion

Learning Objectives,

  1. Python, sqlalchemy lib, pandas lib. seaborn lib,

  2. connect to sql server using python.

  3. verify the connection.

  4. perform DQL select and DDL create, DML write operations using python

  5. Using Pandas read_sql_query() menthod - DQL

  6. Bulk insert CSV Files

  7. finding duplicates, check nulls, clean data, rebuild indexes:

  8. Generate data for orders year 2022

  9. Send data for orders year 2022 to SQL Server

Source: [Link]

Author: Dheeraj. Yss

Connect with me:

Did you find this article valuable?

Support dheerajy blog by becoming a sponsor. Any amount is appreciated!