How to build a telegram bot which retrieve price in real time from a DEX on python
Requirements
Python libraries
You will need to install web3 and telegram-bot python libraries
pipinstallweb3python-telegram-bot--upgrade
Creating bot on telegram
You will first need to contact BotFather on telegram : https://t.me/botfather and follow the instructions
Don't forget to not your token for later !
Coding the bot
Basic configuration
Now let's start coding our bot : we need to import web and some telegram-bot modules and some basic configuration.
Some variables we need
We will need a few variables like the factories addresses, and some ABIs.
Initializing DEXs and pools
Next we need to retrieve pools informations from DEXs in order to do so we will call the factory contract of each DEX and list all pairs available.
We will first define a Dex, Token and Pair class.
We also define a function to update the liquidity on a pair which will be useful later.
Once we have done that we can populate a list of dex with all its pairs.
Nice we now have our list of DEXs but there are no pairs associated with them so we need to write a function to retrieve them.
That's good but there is still a problem we create a new token for each pair even if we alredy know it.
To avoid this we will create a dict containing all tokens we already know about.
And we update our code.
Calculate price
Now we need to know from which pair we want to calculate the price from, we will try all dexes and keep one with the highest avax (or usdt) liquidity.
For each token we will calculate 2 prices : one in AVAX on in USDT.
Once we've done that we need to calculate the price.
Telegram function
Let's code the telegram function now.
If we find a avax or usdt pair for this token we send a message with the current highest liquid price. (we transform avax in wavax since pairs are in wavax)
Final code
What's next ?
There is still some problems : if someone create a new pair on a DEX we can't check its price without rebooting the bot, we could fix this easily by adding a command doing this / since we only update liquidity on request and we do after we found which pair as highest liquidity this pair can't change, here as well we could do a command to update this or a timer to do this automatically, we could improve aswell the output format for super high or super low price coins.
dexes = list()
for dex in dexesAddresses :
dexName = dex[0]
dexAddress = dex[1]
#We instantiate the dex contract
dexContract = w3.eth.contract(address=w3.toCheksumAddress(dexAddress), abi=factoryABI)
#We instantiate a new dex object and add it to our list
newDex = Dex(dexContract, dexName)
newDex.updatePairs()
dexes.append(newDex)
class Dex():
def __init__(self, factoryContract, name):
self.factoryContract = factoryContract
self.name = name
self.pairs = list()
def updatePairs(self):
#We get the number of pairs on this dex
pairsOnDex = self.factoryContract.functions.allPairsLength().call()
#And we want to retrieve pairs we don't know yet
for i in range(len(self.pairs), pairsOnDex) :
#We ask what is the pair i and it returns its address
newPairAddress = self.factoryContract.functions.allPairs(i).call()
#Now we instantiate a pair contract for this pair
newPairContract = w3.eth.contract(address=newPairAddress, abi=poolABI)
token0Address = newPairContract.functions.token0().call()
token1Address = newPairContract.functions.token1().call()
#Now we instantiate a contract for each one
newToken0Contract = w3.eth.contract(address=token0Address, abi=ERC20ABI)
newToken1Contract = w3.eth.contract(address=token1Address, abi=ERC20ABI)
#And a Token object
newToken0Symbol = newToken0Contract.functions.symbol().call()
newToken0Decimals = newToken0Contract.functions.decimals().call()
newToken0Name = newToken0Contract.functions.name().call()
newToken0 = Token(newToken0Contract, token0Address, newToken0Symbol, newToken0Decimals, newToken0Name)
newToken1Symbol = newToken1Contract.functions.symbol().call()
newToken1Decimals = newToken1Contract.functions.decimals().call()
newToken1Name = newToken1Contract.functions.name().call()
newToken1 = Token(newToken1Contract, token1Address, newToken1Symbol, newToken1Decimals, newToken1Name)
#We can create the Pair object
newPair = Pair(newPairContract, newToken0, newToken1)
#And we update its liquidity
#newPair.updateLiquidity()
self.pairs.append(newPair)
tokensWeKnow = dict()
for i in range(len(self.pairs), pairsOnDex) :
#We ask what is the pair i and it returns its address
newPairAddress = self.factoryContract.functions.allPairs(i).call()
#Now we instantiate a pair contract for this pair
newPairContract = w3.eth.contract(address=newPairAddress, abi=poolABI)
token0Address = newPairContract.functions.token0().call()
token1Address = newPairContract.functions.token1().call()
#We check if we already know it
if token0Address in tokensWeKnow :
newtoken0 = tokensWeKnow[token0Address]
else :
newToken0Contract = w3.eth.contract(address=token0Address, abi=ERC20ABI)
#And a Token object
newToken0Symbol = newToken0Contract.functions.symbol().call()
newToken0Decimals = newToken0Contract.functions.decimals().call()
newToken0Name = newToken0Contract.functions.name().call()
newToken0 = Token(newToken0Contract, token0Address, newToken0Symbol, newToken0Decimals, newToken0Name)
if token1Address in tokensWeKnow :
newtoken1 = tokensWeKnow[token1Address]
else :
newToken1Contract = w3.eth.contract(address=token1Address, abi=ERC20ABI)
newToken1Symbol = newToken1Contract.functions.symbol().call()
newToken1Decimals = newToken1Contract.functions.decimals().call()
newToken1Name = newToken1Contract.functions.name().call()
newToken1 = Token(newToken1Contract, token1Address, newToken1Symbol, newToken1Decimals, newToken1Name)
#We can create the Pair object
newPair = Pair(newPairContract, newToken0, newToken1)
#And we update its liquidity
newPair.updateLiquidity()
self.pairs.append(newPair)
def findPair(tokenSymbol, otherSymbol, dexes):
#We must care if there is multiple pair with same Symbol
liquidity = 0
for dex in dexes :
for pair in dex.pairs :
if pair.token0.symbol == tokenSymbol and pair.token1.symbol == otherSymbol or pair.token1.symbol == tokenSymbol and pair.token0.symbol == otherSymbol :
if pair.token0.symbol == tokenSymbol and pair.token1Liquidity > liquidity :
currentPair = pair
liquidity = pair.token1Liquidity
currentDex = dex
elif pair.token1.symbol == tokenSymbol and pair.token0Liquidity > liquidity :
currentPair = pair
liquidity = pair.token0Liquidity
currentDex = dex
try :
return currentPair, currentDex
except :
return None, None
def findAVAXPair(tokenSymbol, dex):
return findPair(tokenSymbol, "WAVAX", dex)
def findUSDTPair(tokenSymbol, dex):
return findPair(tokenSymbol, "USDT", dex)