from locust import task, HttpUser import json import time import random from web3 import Web3 from eth_account import Account class DeFiProtocolUser ( HttpUser ): abstract = True def on_start ( self ): # Web3 provider setup (using Infura/Alchemy) self .infura_url = "https://mainnet.infura.io/v3/your-project-id" self .w3 = Web3(Web3.HTTPProvider( self .infura_url)) # Test wallet setup (use test accounts only) self .test_private_key = "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef" self .test_account = Account.from_key( self .test_private_key) self .wallet_address = self .test_account.address # Common DeFi contract addresses (Ethereum mainnet) self .contracts = { # Uniswap V3 "uniswap_v3_router" : "0xE592427A0AEce92De3Edee1F18E0157C05861564" , "uniswap_v3_factory" : "0x1F98431c8aD98523631AE4a59f267346ea31F984" , "uniswap_v3_quoter" : "0xb27308f9F90D607463bb33eA1BeBb41C27CE5AB6" , # Aave V3 "aave_pool" : "0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2" , "aave_pool_data_provider" : "0x7B4EB56E7CD4b454BA8ff71E4518426369a138a3" , # Compound V3 "compound_comet_usdc" : "0xc3d688B66703497DAA19211EEdff47f25384cdc3" , # Common ERC20 tokens "usdc" : "0xA0b86a33E6441b8435b662303c0f479c7e1b5b8e" , "usdt" : "0xdAC17F958D2ee523a2206206994597C13D831ec7" , "weth" : "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" , "dai" : "0x6B175474E89094C44Da98b954EedeAC495271d0F" } # Common test amounts (in wei for ETH, smallest unit for tokens) self .test_amounts = { "small" : int ( 0.01 * 10 ** 18 ), # 0.01 ETH "medium" : int ( 0.1 * 10 ** 18 ), # 0.1 ETH "large" : int ( 1 * 10 ** 18 ) # 1 ETH } class UniswapV3User ( DeFiProtocolUser ): """Test Uniswap V3 DEX functionality""" @task ( 3 ) def test_uniswap_quote ( self ): """Test Uniswap V3 price quotes""" # Test WETH -> USDC quote token_in = self .contracts[ "weth" ] token_out = self .contracts[ "usdc" ] amount_in = self .test_amounts[ "small" ] fee = 3000 # 0.3% fee tier # Quoter contract ABI (simplified) quoter_abi = [ { "inputs" : [ { "name" : "tokenIn" , "type" : "address" }, { "name" : "tokenOut" , "type" : "address" }, { "name" : "fee" , "type" : "uint24" }, { "name" : "amountIn" , "type" : "uint256" }, { "name" : "sqrtPriceLimitX96" , "type" : "uint160" } ], "name" : "quoteExactInputSingle" , "outputs" : [{ "name" : "amountOut" , "type" : "uint256" }], "type" : "function" } ] start_time = time.time() try : quoter_contract = self .w3.eth.contract( address = self .contracts[ "uniswap_v3_quoter" ], abi = quoter_abi ) # Get quote amount_out = quoter_contract.functions.quoteExactInputSingle( token_in, token_out, fee, amount_in, 0 ).call() response_time = (time.time() - start_time) * 1000 if amount_out > 0 : # Calculate price impact and slippage eth_price_estimate = amount_out / (amount_in / 10 ** 18 ) / 10 ** 6 # Rough USDC price # Log successful quote self .environment.events.request.fire( request_type = "Uniswap_Quote" , name = f "WETH->USDC Quote: $ { eth_price_estimate :.2f} " , start_time = start_time, response_time = response_time, response_length = len ( str (amount_out)), context = { "token_pair" : "WETH/USDC" , "amount_in" : amount_in, "amount_out" : amount_out, "estimated_price" : eth_price_estimate, "fee_tier" : fee }, url = "/uniswap/quote" , exception = None , ) else : raise Exception ( "Invalid quote: amount_out is 0" ) except Exception as e: response_time = (time.time() - start_time) * 1000 self .environment.events.request.fire( request_type = "Uniswap_Quote" , name = "WETH->USDC Quote Failed" , start_time = start_time, response_time = response_time, response_length = 0 , context = { "error" : str (e)}, url = "/uniswap/quote" , exception = e, ) @task ( 2 ) def test_uniswap_pool_info ( self ): """Test Uniswap V3 pool information""" # WETH/USDC 0.3% pool pool_address = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8" # Pool contract ABI (simplified) pool_abi = [ { "inputs" : [], "name" : "slot0" , "outputs" : [ { "name" : "sqrtPriceX96" , "type" : "uint160" }, { "name" : "tick" , "type" : "int24" }, { "name" : "observationIndex" , "type" : "uint16" }, { "name" : "observationCardinality" , "type" : "uint16" }, { "name" : "observationCardinalityNext" , "type" : "uint16" }, { "name" : "feeProtocol" , "type" : "uint8" }, { "name" : "unlocked" , "type" : "bool" } ], "type" : "function" }, { "inputs" : [], "name" : "liquidity" , "outputs" : [{ "name" : "" , "type" : "uint128" }], "type" : "function" } ] start_time = time.time() try : pool_contract = self .w3.eth.contract( address = pool_address, abi = pool_abi) # Get pool state slot0 = pool_contract.functions.slot0().call() liquidity = pool_contract.functions.liquidity().call() response_time = (time.time() - start_time) * 1000 sqrt_price = slot0[ 0 ] current_tick = slot0[ 1 ] # Calculate current price from sqrtPriceX96 price = (sqrt_price / ( 2 ** 96 )) ** 2 self .environment.events.request.fire( request_type = "Uniswap_Pool" , name = f "WETH/USDC Pool Info - Tick: { current_tick } " , start_time = start_time, response_time = response_time, response_length = len ( str (liquidity)), context = { "pool_address" : pool_address, "sqrt_price" : sqrt_price, "current_tick" : current_tick, "liquidity" : liquidity, "calculated_price" : price }, url = "/uniswap/pool-info" , exception = None , ) except Exception as e: response_time = (time.time() - start_time) * 1000 self .environment.events.request.fire( request_type = "Uniswap_Pool" , name = "Pool Info Failed" , start_time = start_time, response_time = response_time, response_length = 0 , context = { "error" : str (e)}, url = "/uniswap/pool-info" , exception = e, ) @task ( 1 ) def test_uniswap_transaction_simulation ( self ):