from locust import task, HttpUser import json import random class UniswapUser ( HttpUser ): def on_start ( self ): # Uniswap V3 Subgraph endpoint self .subgraph_url = "https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3" # Popular trading pairs for testing self .token_pairs = [ { "token0" : "WETH" , "token1" : "USDC" , "address" : "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8" }, { "token0" : "WETH" , "token1" : "USDT" , "address" : "0x4e68ccd3e89f51c3074ca5072bbac773960dfa36" }, { "token0" : "WETH" , "token1" : "DAI" , "address" : "0xc2e9f25be6257c210d7adf0d4cd6e3e881ba25f8" }, { "token0" : "USDC" , "token1" : "USDT" , "address" : "0x3416cf6c708da44db2624d63ea0aaef7113527c6" } ] # Common token addresses self .tokens = { "WETH" : "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2" , "USDC" : "0xa0b86a33e6441b8435b662303c0f479c7e1b5b8e" , "USDT" : "0xdac17f958d2ee523a2206206994597c13d831ec7" , "DAI" : "0x6b175474e89094c44da98b954eedeac495271d0f" } @task ( 3 ) def get_pool_data ( self ): """Get pool information from Uniswap""" pair = random.choice( self .token_pairs) query = """ { pool(id: " %s ") { id token0 { symbol decimals } token1 { symbol decimals } feeTier liquidity sqrtPrice tick token0Price token1Price volumeUSD txCount } } """ % pair[ "address" ].lower() payload = { "query" : query} with self .client.post( self .subgraph_url, json = payload, name = "Pool Data" ) as response: if response.status_code == 200 : try : data = response.json() if "data" in data and data[ "data" ][ "pool" ]: pool = data[ "data" ][ "pool" ] token0_symbol = pool[ "token0" ][ "symbol" ] token1_symbol = pool[ "token1" ][ "symbol" ] price = float (pool[ "token0Price" ]) volume = float (pool[ "volumeUSD" ]) print ( f " { token0_symbol } / { token1_symbol } : Price { price :.6f} , Volume $ { volume :,.0f} " ) else : response.failure( "No pool data returned" ) except (json.JSONDecodeError, KeyError , ValueError ) as e: response.failure( f "Failed to parse pool data: { e } " ) else : response.failure( f "Subgraph error: { response.status_code } " ) @task ( 2 ) def get_token_prices ( self ): """Get current token prices""" query = """ { tokens(first: 5, orderBy: volumeUSD, orderDirection: desc) { id symbol name decimals derivedETH volumeUSD txCount } } """ payload = { "query" : query} with self .client.post( self .subgraph_url, json = payload, name = "Token Prices" ) as response: if response.status_code == 200 : try : data = response.json() if "data" in data and "tokens" in data[ "data" ]: tokens = data[ "data" ][ "tokens" ] for token in tokens[: 3 ]: # Show top 3 symbol = token[ "symbol" ] price_eth = float (token[ "derivedETH" ]) volume = float (token[ "volumeUSD" ]) print ( f " { symbol } : { price_eth :.6f} ETH, Volume $ { volume :,.0f} " ) else : response.failure( "No token data returned" ) except (json.JSONDecodeError, KeyError , ValueError ) as e: response.failure( f "Failed to parse token data: { e } " ) else : response.failure( f "Subgraph error: { response.status_code } " ) @task ( 2 ) def get_recent_swaps ( self ): """Get recent swap transactions""" query = """ { swaps(first: 10, orderBy: timestamp, orderDirection: desc) { id timestamp pool { token0 { symbol } token1 { symbol } } amount0 amount1 amountUSD sender } } """ payload = { "query" : query} with self .client.post( self .subgraph_url, json = payload, name = "Recent Swaps" ) as response: if response.status_code == 200 : try : data = response.json() if "data" in data and "swaps" in data[ "data" ]: swaps = data[ "data" ][ "swaps" ] print ( f "Retrieved { len (swaps) } recent swaps" ) if swaps: swap = swaps[ 0 ] # Show most recent token0 = swap[ "pool" ][ "token0" ][ "symbol" ] token1 = swap[ "pool" ][ "token1" ][ "symbol" ] amount_usd = float (swap[ "amountUSD" ]) print ( f "Latest swap: { token0 } / { token1 } - $ { amount_usd :,.2f} " ) else : response.failure( "No swap data returned" ) except (json.JSONDecodeError, KeyError , ValueError ) as e: response.failure( f "Failed to parse swap data: { e } " ) else : response.failure( f "Subgraph error: { response.status_code } " ) @task ( 1 ) def get_pool_stats ( self ): """Get overall Uniswap statistics""" query = """ { uniswapDayDatas(first: 1, orderBy: date, orderDirection: desc) { date volumeUSD tvlUSD txCount } } """ payload = { "query" : query} with self .client.post( self .subgraph_url, json = payload, name = "Pool Stats" ) as response: if response.status_code == 200 : try : data = response.json() if "data" in data and "uniswapDayDatas" in data[ "data" ]: day_data = data[ "data" ][ "uniswapDayDatas" ] if day_data: stats = day_data[ 0 ] volume = float (stats[ "volumeUSD" ]) tvl = float (stats[ "tvlUSD" ]) tx_count = int (stats[ "txCount" ]) print ( f "Daily Volume: $ { volume :,.0f} , TVL: $ { tvl :,.0f} , Txs: { tx_count :,} " ) else : response.failure( "No stats data returned" ) except (json.JSONDecodeError, KeyError , ValueError ) as e: response.failure( f "Failed to parse stats data: { e } " ) else : response.failure( f "Subgraph error: { response.status_code } " )