from locust import task, HttpUser import random class OpenSeaUser ( HttpUser ): def on_start ( self ): # OpenSea API base URL self .base_url = "https://api.opensea.io/api/v1" # Popular NFT collections for testing self .collections = [ "boredapeyachtclub" , "mutant-ape-yacht-club" , "cryptopunks" , "azuki" , "doodles-official" , "clonex" , "proof-moonbirds" ] # API headers (no key required for basic endpoints) self .headers = { "Accept" : "application/json" , "User-Agent" : "LoadForge-Test" } @task ( 3 ) def get_collection_stats ( self ): """Get statistics for a random NFT collection""" collection = random.choice( self .collections) with self .client.get( f " { self .base_url } /collection/ { collection } /stats" , headers = self .headers, name = "Collection Stats" ) as response: if response.status_code == 200 : data = response.json() stats = data.get( "stats" , {}) floor_price = stats.get( "floor_price" , 0 ) total_supply = stats.get( "total_supply" , 0 ) print ( f " { collection } : Floor { floor_price } ETH, Supply { total_supply } " ) elif response.status_code == 429 : response.failure( "Rate limited by OpenSea" ) else : response.failure( f "Failed to get stats: { response.status_code } " ) @task ( 2 ) def get_collection_info ( self ): """Get basic information about a collection""" collection = random.choice( self .collections) with self .client.get( f " { self .base_url } /collection/ { collection } " , headers = self .headers, name = "Collection Info" ) as response: if response.status_code == 200 : data = response.json() collection_data = data.get( "collection" , {}) name = collection_data.get( "name" , "Unknown" ) description = collection_data.get( "description" , "" )[: 100 ] print ( f "Collection: { name } - { description } ..." ) elif response.status_code == 429 : response.failure( "Rate limited by OpenSea" ) @task ( 2 ) def get_assets ( self ): """Get assets from a random collection""" collection = random.choice( self .collections) params = { "collection" : collection, "limit" : 20 , "offset" : random.randint( 0 , 100 ) } with self .client.get( f " { self .base_url } /assets" , params = params, headers = self .headers, name = "Collection Assets" ) as response: if response.status_code == 200 : data = response.json() assets = data.get( "assets" , []) print ( f "Retrieved { len (assets) } assets from { collection } " ) # Check first asset details if assets: asset = assets[ 0 ] token_id = asset.get( "token_id" ) name = asset.get( "name" , "Unnamed" ) print ( f "First asset: { name } (# { token_id } )" ) elif response.status_code == 429 : response.failure( "Rate limited by OpenSea" ) @task ( 1 ) def get_single_asset ( self ): """Get details for a specific NFT asset""" collection = random.choice( self .collections) # Use common token IDs that likely exist token_id = random.randint( 1 , 1000 ) # For this example, we'll use a known contract address (BAYC) contract_address = "0xBC4CA0EdA7647A8aB7C2061c2E118A18a936f13D" with self .client.get( f " { self .base_url } /asset/ { contract_address } / { token_id } " , headers = self .headers, name = "Single Asset" ) as response: if response.status_code == 200 : data = response.json() name = data.get( "name" , "Unnamed" ) description = data.get( "description" , "" )[: 50 ] print ( f "Asset: { name } - { description } ..." ) elif response.status_code == 404 : # This is expected for non-existent tokens pass elif response.status_code == 429 : response.failure( "Rate limited by OpenSea" ) @task ( 1 ) def search_collections ( self ): """Search for collections by keyword""" search_terms = [ "ape" , "punk" , "cat" , "dog" , "art" , "pixel" ] term = random.choice(search_terms) params = { "q" : term, "limit" : 10 } with self .client.get( f " { self .base_url } /collections" , params = params, headers = self .headers, name = "Search Collections" ) as response: if response.status_code == 200 : data = response.json() collections = data.get( "collections" , []) print ( f "Found { len (collections) } collections for ' { term } '" ) elif response.status_code == 429 : response.failure( "Rate limited by OpenSea" )