from locust import HttpUser, task, between import random import json import uuid from datetime import datetime, timedelta class DataRetentionUser ( HttpUser ): """Test data retention policies and lifecycle management""" wait_time = between( 1 , 3 ) def on_start ( self ): """Initialize data retention testing""" self .user_id = str (uuid.uuid4()) # Data retention policies (in days) self .retention_policies = { 'user_data' : 2555 , # 7 years 'transaction_data' : 2555 , # 7 years (financial) 'analytics_data' : 730 , # 2 years 'session_data' : 30 , # 30 days 'log_data' : 90 , # 90 days 'temporary_data' : 1 , # 1 day 'marketing_data' : 365 # 1 year } print ( f "Data retention testing initialized for user: { self .user_id } " ) @task ( 3 ) def test_data_creation_with_retention ( self ): """Test data creation with proper retention metadata""" data_type = random.choice( list ( self .retention_policies.keys())) retention_days = self .retention_policies[data_type] # Calculate retention expiry date expiry_date = (datetime.utcnow() + timedelta( days = retention_days)).isoformat() test_data = { 'user_id' : self .user_id, 'data_type' : data_type, 'content' : f 'Test data for { data_type } ' , 'created_at' : datetime.utcnow().isoformat(), 'retention_policy' : data_type, 'retention_expiry' : expiry_date } with self .client.post( '/api/v1/data/create' , json = test_data, name = f "Create Data - { data_type } " , catch_response = True ) as response: if response.status_code in [ 200 , 201 ]: try : result = response.json() if 'data_id' in result and 'retention_expiry' in result: data_id = result[ 'data_id' ] print ( f "Data created with retention: { data_id } " ) response.success() else : response.failure( "Missing retention metadata in response" ) except json.JSONDecodeError: response.failure( "Invalid data creation response" ) else : response.failure( f "Data creation failed: { response.status_code } " ) @task ( 2 ) def test_retention_policy_query ( self ): """Test querying retention policies""" with self .client.get( '/api/v1/data/retention-policies' , name = "Retention Policies Query" , catch_response = True ) as response: if response.status_code == 200 : try : policies = response.json() print ( f "Retention policies validated: { len (policies.get( 'policies' , [])) } policies" ) response.success() except json.JSONDecodeError: response.failure( "Invalid retention policies response" ) else : response.failure( f "Retention policies query failed: { response.status_code } " ) @task ( 2 ) def test_data_archival_process ( self ): """Test data archival for long-term retention""" archival_request = { 'user_id' : self .user_id, 'data_category' : 'user_data' , 'archive_reason' : 'compliance_requirement' , 'requested_by' : 'system_test' } with self .client.post( '/api/v1/data/archive' , json = archival_request, name = "Data Archival Request" , catch_response = True ) as response: if response.status_code in [ 200 , 202 ]: try : result = response.json() if 'archive_id' in result: archive_id = result[ 'archive_id' ] print ( f "Data archival initiated: { archive_id } " ) response.success() else : response.failure( "Missing archive ID in response" ) except json.JSONDecodeError: response.failure( "Invalid archival response" ) else : response.failure( f "Data archival failed: { response.status_code } " ) @task ( 1 ) def test_automated_deletion_process ( self ): """Test automated data deletion for expired data""" deletion_request = { 'data_type' : random.choice( list ( self .retention_policies.keys())), 'deletion_reason' : 'retention_expired' , 'dry_run' : True # Test mode } with self .client.post( '/api/v1/data/automated-deletion' , json = deletion_request, name = "Automated Deletion Test" , catch_response = True ) as response: if response.status_code == 200 : try : result = response.json() if 'deletion_summary' in result: summary = result[ 'deletion_summary' ] records_to_delete = summary.get( 'records_to_delete' , 0 ) print ( f "Automated deletion summary: { records_to_delete } records" ) response.success() else : response.failure( "Missing deletion summary" ) except json.JSONDecodeError: response.failure( "Invalid deletion response" ) else : response.failure( f "Automated deletion test failed: { response.status_code } " )