Tag Archives: asynchronous

Asynchronous DB Operations in Twisted

Twisted is an asynchronous networking framework. Other Database API Implementations have blocking interfaces.

For this reason, twisted.enterprise.adbapi was created. It is a non-blocking interface,which allows you to access a number of different RDBMSes.

General Method to access DB API.

1 ) Create a Connection with db.

 db = dbmodule.connect('dbname','user','password')

2) create a cursor.

 cursor = db.cursor()

3) do a query.

resultset = cursor.query('SELECT * FROM table WHERE condition=expression')

Cursor blocks to response in asynchronous framework. Those delays are unacceptable when using an asynchronous framework such as Twisted.
To Overcome blocking interface, twisted provides asynchronous wrapper for db module such as twisted.enterprise.adbapi

Database Connection using adbapi API.

To use adbapi, we import dependencies as below

 from twisted.enterprise import adbapi

1) Connect Database using adbapi.ConnectionPool

db = adbapi.ConnectionPool("MySQLdb",db="agentdata",user="root",passwd="<yourpassword>")

Here, We do not need to import dbmodule directly.
dbmodule.connect are passed as extra arguments to adbapi.ConnectionPool’s Constructor.

2) Run Database Query

query = ("SELECT * FROM agentdetails WHERE name = '%s'" % (name))
return dbpool.runQuery(query).addCallback(self.receiveResult).addErrback(self.errorquery)

Here, I used ‘%s’ paramstyle for mysql. if you use another database module, you need to use compatible paramstyle. for more, use DB-API specification.

Twisted doesn’t attempt to offer any sort of magic parameter munging – runQuery(query,params,…) maps directly onto cursor.execute(query,params,…).

This query returns Deferred, which allows arbitrary callbacks to be called upon completion (or failure).

Demo : Select, Insert and Update query in Database.

from twisted.enterprise import adbapi
import datetime,logging
from twisted.internet import reactor


"""
Test DB : This File do database connection and basic operation.
"""

log = logging.getLogger("Test DB")

dbpool = adbapi.ConnectionPool("MySQLdb",db="agentdata",user="root",passwd="<yourpassword>")

class AgentDB():

    def getTime(self):
        log.info("Get Current Time from System.")
        time = str(datetime.datetime.now()).split('.')[0]
        return time

    def doSelect(self,name):
        log.info("Select operation in Database.")
        query = ("SELECT * FROM agentdetails WHERE name = '%s'" % (name))
        return dbpool.runQuery(query).addCallback(self.receiveResult).addErrback(self.errorquery)

    def receiveResult(self,result):
        print "Receive Result"
        print result
        # general purpose method to receive result from defer.
        return result

    def errorquery(self,result):
        print "error received", result
        return result

    def doInsert(self,name,status,state):
        log.info("Insert operation in Database.")
        query = """INSERT INTO agentdetails (name,state,status,logtime) values('%s','%s','%s','%s')""" % (
        name,state,status,self.getTime())
        return dbpool.runQuery(query).addCallback(self.receiveResult)

    def doUpdate(self,name,status,state):
        log.info("Update operation in Database.")
        query = ("UPDATE agentdetails SET status = '%s', state = '%s',logtime = '%s' WHERE name = '%s'" % (
        status,state, self.getTime(),name))
        return dbpool.runQuery(query).addCallback(self.receiveResult)

    def checkDB(self):
        self.doSelect('1011')
        self.doInsert('Test','Test','Test')
        self.doUpdate('Test','SecondTest','SecondTest')



a= AgentDB()
a.checkDB()
reactor.run()

Here, I have used MySQLdb api, agentdata as a database name, root as a user, 123456 as a password.
Also, I have created select, insert and update query for select, insert and update operation respectively.
runQuery method returns deferred. For this, add callback and error back to handle success and failure respectively.