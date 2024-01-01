from locust import task, HttpUser import random import json class CMSTestUser ( HttpUser ): def on_start ( self ): # Content types for testing self .content_types = [ "article" , "page" , "blog_post" , "product" ] # Sample content data self .content_samples = { "article" : { "title" : "Sample Article" , "content" : "This is a sample article content for testing purposes." , "tags" : [ "test" , "article" , "sample" ] }, "page" : { "title" : "Test Page" , "content" : "This is test page content with some sample text." , "slug" : "test-page" }, "blog_post" : { "title" : "Test Blog Post" , "content" : "Sample blog post content for CMS testing." , "category" : "Technology" } } # API endpoints self .cms_endpoints = { "content" : "/api/content" , "publish" : "/api/content/ {id} /publish" , "search" : "/api/content/search" } # Track created content for cleanup self .created_content = [] @task ( 4 ) def test_create_content ( self ): """Test creating new content""" content_type = random.choice( self .content_types) # Create content based on type if content_type in self .content_samples: content_data = self .content_samples[content_type].copy() else : content_data = { "title" : f "Test { content_type.title() } " , "content" : f "Sample { content_type } content for testing." } content_data.update({ "type" : content_type, "status" : "draft" , "author" : "test_user" }) with self .client.post( self .cms_endpoints[ "content" ], json = content_data, name = "Create Content" ) as response: if response.status_code in [ 200 , 201 ]: try : result = response.json() content_id = result.get( "id" ) if content_id: self .created_content.append(content_id) print ( f "Created { content_type } : { content_id } " ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) else : response.failure( f "Content creation failed: { response.status_code } " ) @task ( 3 ) def test_update_content ( self ): """Test updating existing content""" if not self .created_content: return content_id = random.choice( self .created_content) update_data = { "title" : f "Updated Content { random.randint( 100 , 999 ) } " , "content" : "This content has been updated for testing purposes." , "last_modified" : "2024-01-01T12:00:00Z" } with self .client.put( f " { self .cms_endpoints[ 'content' ] } / { content_id } " , json = update_data, name = "Update Content" ) as response: if response.status_code == 200 : print ( f "Updated content: { content_id } " ) elif response.status_code == 404 : print ( f "Content not found: { content_id } " ) # Remove from our list if content_id in self .created_content: self .created_content.remove(content_id) else : response.failure( f "Content update failed: { response.status_code } " ) @task ( 3 ) def test_publish_content ( self ): """Test publishing content""" if not self .created_content: return content_id = random.choice( self .created_content) publish_data = { "status" : "published" , "publish_date" : "2024-01-01T10:00:00Z" } publish_url = self .cms_endpoints[ "publish" ].format( id = content_id) with self .client.post( publish_url, json = publish_data, name = "Publish Content" ) as response: if response.status_code == 200 : print ( f "Published content: { content_id } " ) elif response.status_code == 404 : print ( f "Content not found for publishing: { content_id } " ) if content_id in self .created_content: self .created_content.remove(content_id) else : response.failure( f "Content publishing failed: { response.status_code } " ) @task ( 2 ) def test_retrieve_content ( self ): """Test retrieving content by ID""" if not self .created_content: return content_id = random.choice( self .created_content) with self .client.get( f " { self .cms_endpoints[ 'content' ] } / { content_id } " , name = "Retrieve Content" ) as response: if response.status_code == 200 : try : content = response.json() title = content.get( "title" , "Unknown" ) status = content.get( "status" , "Unknown" ) print ( f "Retrieved content: { title } ( { status } )" ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) elif response.status_code == 404 : print ( f "Content not found: { content_id } " ) if content_id in self .created_content: self .created_content.remove(content_id) else : response.failure( f "Content retrieval failed: { response.status_code } " ) @task ( 2 ) def test_search_content ( self ): """Test content search functionality""" search_terms = [ "test" , "sample" , "article" , "content" ] search_term = random.choice(search_terms) search_params = { "q" : search_term, "limit" : 10 , "type" : random.choice( self .content_types) } with self .client.get( self .cms_endpoints[ "search" ], params = search_params, name = "Search Content" ) as response: if response.status_code == 200 : try : results = response.json() # Handle different response formats if isinstance (results, dict ): items = results.get( "results" , results.get( "data" , [])) total = results.get( "total" , len (items)) else : items = results if isinstance (results, list ) else [] total = len (items) print ( f "Search ' { search_term } ': { len (items) } results" ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) else : response.failure( f "Content search failed: { response.status_code } " ) @task ( 1 ) def test_list_content ( self ): """Test listing all content with pagination""" list_params = { "page" : random.randint( 1 , 3 ), "per_page" : 20 , "status" : random.choice([ "all" , "published" , "draft" ]) } with self .client.get( self .cms_endpoints[ "content" ], params = list_params, name = "List Content" ) as response: if response.status_code == 200 : try : data = response.json() if isinstance (data, dict ): items = data.get( "content" , data.get( "data" , [])) page = data.get( "page" , 1 ) total = data.get( "total" , len (items)) else : items = data if isinstance (data, list ) else [] page = list_params[ "page" ] total = len (items) print ( f "Listed content: { len (items) } items, page { page } " ) except json.JSONDecodeError: response.failure( "Invalid JSON response" ) else : response.failure( f "Content listing failed: { response.status_code } " )