from locust import HttpUser, task, between
import random
import json
import uuid
from datetime import datetime, timedelta
class DataRetentionUser(HttpUser):
"""Test data retention policies and lifecycle management"""
wait_time = between(1, 3)
def on_start(self):
"""Initialize data retention testing"""
self.user_id = str(uuid.uuid4())
# Data retention policies (in days)
self.retention_policies = {
'user_data': 2555, # 7 years
'transaction_data': 2555, # 7 years (financial)
'analytics_data': 730, # 2 years
'session_data': 30, # 30 days
'log_data': 90, # 90 days
'temporary_data': 1, # 1 day
'marketing_data': 365 # 1 year
}
print(f"Data retention testing initialized for user: {self.user_id}")
@task(3)
def test_data_creation_with_retention(self):
"""Test data creation with proper retention metadata"""
data_type = random.choice(list(self.retention_policies.keys()))
retention_days = self.retention_policies[data_type]
# Calculate retention expiry date
expiry_date = (datetime.utcnow() + timedelta(days=retention_days)).isoformat()
test_data = {
'user_id': self.user_id,
'data_type': data_type,
'content': f'Test data for {data_type}',
'created_at': datetime.utcnow().isoformat(),
'retention_policy': data_type,
'retention_expiry': expiry_date
}
with self.client.post(
'/api/v1/data/create',
json=test_data,
name=f"Create Data - {data_type}",
catch_response=True
) as response:
if response.status_code in [200, 201]:
try:
result = response.json()
if 'data_id' in result and 'retention_expiry' in result:
data_id = result['data_id']
print(f"Data created with retention: {data_id}")
response.success()
else:
response.failure("Missing retention metadata in response")
except json.JSONDecodeError:
response.failure("Invalid data creation response")
else:
response.failure(f"Data creation failed: {response.status_code}")
@task(2)
def test_retention_policy_query(self):
"""Test querying retention policies"""
with self.client.get(
'/api/v1/data/retention-policies',
name="Retention Policies Query",
catch_response=True
) as response:
if response.status_code == 200:
try:
policies = response.json()
print(f"Retention policies validated: {len(policies.get('policies', []))} policies")
response.success()
except json.JSONDecodeError:
response.failure("Invalid retention policies response")
else:
response.failure(f"Retention policies query failed: {response.status_code}")
@task(2)
def test_data_archival_process(self):
"""Test data archival for long-term retention"""
archival_request = {
'user_id': self.user_id,
'data_category': 'user_data',
'archive_reason': 'compliance_requirement',
'requested_by': 'system_test'
}
with self.client.post(
'/api/v1/data/archive',
json=archival_request,
name="Data Archival Request",
catch_response=True
) as response:
if response.status_code in [200, 202]:
try:
result = response.json()
if 'archive_id' in result:
archive_id = result['archive_id']
print(f"Data archival initiated: {archive_id}")
response.success()
else:
response.failure("Missing archive ID in response")
except json.JSONDecodeError:
response.failure("Invalid archival response")
else:
response.failure(f"Data archival failed: {response.status_code}")
@task(1)
def test_automated_deletion_process(self):
"""Test automated data deletion for expired data"""
deletion_request = {
'data_type': random.choice(list(self.retention_policies.keys())),
'deletion_reason': 'retention_expired',
'dry_run': True # Test mode
}
with self.client.post(
'/api/v1/data/automated-deletion',
json=deletion_request,
name="Automated Deletion Test",
catch_response=True
) as response:
if response.status_code == 200:
try:
result = response.json()
if 'deletion_summary' in result:
summary = result['deletion_summary']
records_to_delete = summary.get('records_to_delete', 0)
print(f"Automated deletion summary: {records_to_delete} records")
response.success()
else:
response.failure("Missing deletion summary")
except json.JSONDecodeError:
response.failure("Invalid deletion response")
else:
response.failure(f"Automated deletion test failed: {response.status_code}")