Source code for etlunit.template_base.sqlalchemy_connector

"""
    This file contains all of the necessary components for the application to connect to a data source via
    SQLAlchemy.
"""

import json
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, MetaData, Table, inspect

[docs]class DB_Connector(): def __init__(self, conn_name): """ This function initializes the DB_Connector class by exposing various parts of the SQLAlchemy architecture to the class. :param conn_name: The name of the connection that well be creating from the already read in connections array. :type conn_name: str. """ from etlunit.template_base.connections_reader import connections connection = connections[conn_name] if connection is None: exit() self.engine = create_engine('{}://{}:{}@{}:{}/{}'.format( connection['dbtype'], connection['user'], connection['pass'], connection['host'], connection['port'], connection['dbname'] )) self.meta = MetaData() self.insp = inspect(self.engine) Session = sessionmaker(bind=self.engine) self.session = Session() self.conn = self.engine.connect()
[docs] def getTable(self, table_name): """ This method gets a table given a name through SQLAlchemy's reflection capability. :param table_name: Name of the table that we want to instantiate. :type table_name: str. :returns: SQLAlchemy Table object. """ table = Table(table_name, self.meta) self.insp.reflecttable(table, None) return table
[docs] def getCount(self, table_name): """ This method gets a count of records given a table_name. :param table_name: Name of the table that we want to count records from. :type table_name: str. :returns: SQLAlchemy result of the count. """ table = self.getTable(table_name) res = self.session.query(table).count() return res
[docs] def insertInto(self, table_name, records): """ This method inserts given records into a table. :param table_name: Name of the table that we want to insert into. :type table_name: str. :param records: A key, value pair of records to insert. :type table_name: arr. """ table = self.getTable(table_name) ins = table.insert().values(records) self.conn.execute(ins)
[docs] def deleteFrom(self, table_name): """ This method deletes all records from a table. :param table_name: Name of the table that we want to delete from. :type table_name: str. """ table = self.getTable(table_name) delete = table.delete() self.conn.execute(delete)
[docs] def selectFrom(self, table_name): """ This method selects all from a table. :param table_name: Name of the table that we want to select from. :type table_name: str. :returns: A jsonified result from SQLAlchemy. """ table = self.getTable(table_name) res = self.session.query(table).all() # This commented block will likely be used instead of the query so we can get individual columns # from sqlalchemy.sql import select # sel = select(columns=['test', 'id'], from_obj=table) # res = list(self.conn.execute(sel)) json_res = self.to_json(res, table) return json_res
[docs] def to_json(self, qry_results, table): """ This method jsonifies the SQLAlchemy query result. :param qry_results: Results from SQLAlchemy's query call. :type qry_results: SQLAlchemy ResultSet. :returns: Jsonified SQLAlchemy ResultSet. """ results = [] if type(qry_results) is list: col_types = dict() for qry_result in qry_results: table_json = {} for col in table._columns: value = getattr(qry_result, col.name) if col.type in col_types.keys() and value is not None: try: table_json[col.name] = col_types[col.type](value) except: table_json[col.name] = "Error: Failed to convert using ", str(col_types[col.type]) elif value is None: table_json[col.name] = str() else: table_json[col.name] = value results.append(table_json) # return json.dumps(results, cls=CustomEncoder) # use the custom encoder to jsonify datetimes return results else: return self.to_json([qry_results], table)
from datetime import datetime # Who can't jsonify datetimes :P
[docs]class CustomEncoder(json.JSONEncoder): """ This class is a custom JSON encoder. Only required if we need to jsonify datetimes. """
[docs] def default(self, obj): """ Method to actually perform the jsonifying action. :param obj: The object that we are jsonifying. :returns: Jsonified object. """ if isinstance(obj, datetime): # is iso format ok, or do we want something different? return obj.isoformat() return json.JSONEncoder.default(self, obj)
if __name__ == '__main__': connector = DB_Connector('test conn') if connector.getCount('test') == 0: connector.insertInto('test', [{'test': 0}]) print connector.selectFrom('test')

Project Versions

This Page