from locust import task, HttpUser import json import random class InfuraUser ( HttpUser ): def on_start ( self ): # Replace with your Infura project ID self .infura_project_id = "your-project-id-here" self .rpc_url = f "https://mainnet.infura.io/v3/ { self .infura_project_id } " # Test addresses for balance checks self .test_addresses = [ "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045" , # Vitalik "0xA0b86a33E6441b8435b662303c0f479c7e1b5b8e" , # USDC "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2" # WETH ] @task ( 3 ) def get_latest_block ( self ): """Get the latest block number""" payload = { "jsonrpc" : "2.0" , "method" : "eth_blockNumber" , "params" : [], "id" : 1 } with self .client.post( self .rpc_url, json = payload, headers = { "Content-Type" : "application/json" }, name = "Get Latest Block" ) as response: if response.status_code == 200 : data = response.json() if "result" in data: block_number = int (data[ "result" ], 16 ) print ( f "Latest block: { block_number } " ) else : response.failure( "No result in response" ) @task ( 2 ) def get_gas_price ( self ): """Get current gas price""" payload = { "jsonrpc" : "2.0" , "method" : "eth_gasPrice" , "params" : [], "id" : 1 } with self .client.post( self .rpc_url, json = payload, headers = { "Content-Type" : "application/json" }, name = "Get Gas Price" ) as response: if response.status_code == 200 : data = response.json() if "result" in data: gas_price = int (data[ "result" ], 16 ) / 1e9 # Convert to Gwei print ( f "Gas price: { gas_price :.2f} Gwei" ) @task ( 2 ) def check_balance ( self ): """Check ETH balance for random address""" address = random.choice( self .test_addresses) payload = { "jsonrpc" : "2.0" , "method" : "eth_getBalance" , "params" : [address, "latest" ], "id" : 1 } with self .client.post( self .rpc_url, json = payload, headers = { "Content-Type" : "application/json" }, name = "Check Balance" ) as response: if response.status_code == 200 : data = response.json() if "result" in data: balance = int (data[ "result" ], 16 ) / 1e18 # Convert to ETH print ( f "Balance for { address } : { balance :.4f} ETH" ) @task ( 1 ) def get_block_details ( self ): """Get details of the latest block""" payload = { "jsonrpc" : "2.0" , "method" : "eth_getBlockByNumber" , "params" : [ "latest" , False ], "id" : 1 } with self .client.post( self .rpc_url, json = payload, headers = { "Content-Type" : "application/json" }, name = "Get Block Details" ) as response: if response.status_code == 200 : data = response.json() if "result" in data and data[ "result" ]: block = data[ "result" ] tx_count = len (block.get( "transactions" , [])) print ( f "Block { block.get( 'number' ) } has { tx_count } transactions" )