251 lines
7.7 KiB
Python
251 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Python script to test registerNewCustomer SOAP Web Service
|
|
"""
|
|
|
|
import requests
|
|
import xml.etree.ElementTree as ET
|
|
from xml.dom import minidom
|
|
|
|
|
|
class CustomerRegistrationClient:
|
|
"""Client for Customer Registration SOAP Service"""
|
|
|
|
def __init__(self, service_url="http://localhost:8080/jaxws-hello-world/loan"):
|
|
"""
|
|
Initialize the Customer Registration client
|
|
|
|
Args:
|
|
service_url: The SOAP service endpoint URL
|
|
"""
|
|
self.service_url = service_url
|
|
self.wsdl_url = f"{service_url}?wsdl"
|
|
self.headers = {
|
|
'Content-Type': 'text/xml; charset=utf-8',
|
|
'SOAPAction': ''
|
|
}
|
|
|
|
def create_soap_envelope(self, customer_name, is_blacklisted=False):
|
|
"""
|
|
Create SOAP envelope for registerNewCustomer request
|
|
|
|
Args:
|
|
customer_name: The customer name to register
|
|
is_blacklisted: Whether the customer is blacklisted (default: False)
|
|
|
|
Returns:
|
|
XML string of SOAP envelope
|
|
"""
|
|
blacklisted_value = "true" if is_blacklisted else "false"
|
|
|
|
soap_envelope = f"""<?xml version="1.0" encoding="UTF-8"?>
|
|
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
|
|
xmlns:ser="http://service.example.com/">
|
|
<soapenv:Header/>
|
|
<soapenv:Body>
|
|
<ser:registerNewCustomer>
|
|
<arg0>
|
|
<customerName>{customer_name}</customerName>
|
|
<blacklisted>{blacklisted_value}</blacklisted>
|
|
</arg0>
|
|
</ser:registerNewCustomer>
|
|
</soapenv:Body>
|
|
</soapenv:Envelope>"""
|
|
return soap_envelope
|
|
|
|
def register_customer(self, customer_name, is_blacklisted=False):
|
|
"""
|
|
Register a new customer via SOAP service
|
|
|
|
Args:
|
|
customer_name: The customer name to register
|
|
is_blacklisted: Whether the customer is blacklisted
|
|
|
|
Returns:
|
|
Response message from the service
|
|
"""
|
|
try:
|
|
# Create SOAP envelope
|
|
soap_request = self.create_soap_envelope(customer_name, is_blacklisted)
|
|
|
|
# Make the SOAP request
|
|
response = requests.post(
|
|
self.service_url,
|
|
data=soap_request,
|
|
headers=self.headers,
|
|
timeout=10
|
|
)
|
|
|
|
# Check if request was successful
|
|
response.raise_for_status()
|
|
|
|
# Parse the response
|
|
return self.parse_response(response.text)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return f"Error calling service: {str(e)}"
|
|
|
|
def parse_response(self, response_xml):
|
|
"""
|
|
Parse the SOAP response
|
|
|
|
Args:
|
|
response_xml: The XML response from the service
|
|
|
|
Returns:
|
|
The return value from the service
|
|
"""
|
|
try:
|
|
# Parse XML
|
|
root = ET.fromstring(response_xml)
|
|
|
|
# Find the return element
|
|
# Handle namespaces
|
|
namespaces = {
|
|
'S': 'http://schemas.xmlsoap.org/soap/envelope/',
|
|
'ns2': 'http://service.example.com/'
|
|
}
|
|
|
|
# Find the return value
|
|
return_elem = root.find('.//ns2:registerNewCustomerResponse/return', namespaces)
|
|
|
|
if return_elem is not None:
|
|
return return_elem.text
|
|
else:
|
|
# Check for SOAP fault
|
|
fault_elem = root.find('.//S:Fault', namespaces)
|
|
if fault_elem is not None:
|
|
fault_string = fault_elem.find('.//faultstring', namespaces)
|
|
if fault_string is not None:
|
|
return f"SOAP Fault: {fault_string.text}"
|
|
return "Could not find return value in response"
|
|
|
|
except ET.ParseError as e:
|
|
return f"Error parsing response: {str(e)}"
|
|
|
|
def check_wsdl(self):
|
|
"""
|
|
Check if WSDL is accessible
|
|
|
|
Returns:
|
|
Tuple of (success: bool, message: str)
|
|
"""
|
|
try:
|
|
response = requests.get(self.wsdl_url, timeout=5)
|
|
response.raise_for_status()
|
|
return True, "WSDL is accessible"
|
|
except requests.exceptions.RequestException as e:
|
|
return False, f"Cannot access WSDL: {str(e)}"
|
|
|
|
def pretty_print_xml(self, xml_string):
|
|
"""
|
|
Pretty print XML string
|
|
|
|
Args:
|
|
xml_string: XML string to format
|
|
|
|
Returns:
|
|
Formatted XML string
|
|
"""
|
|
try:
|
|
parsed = minidom.parseString(xml_string)
|
|
return parsed.toprettyxml(indent=" ")
|
|
except Exception:
|
|
return xml_string
|
|
|
|
|
|
def main():
|
|
"""Main function to test the Customer Registration service"""
|
|
|
|
print("=" * 70)
|
|
print("Customer Registration Service Test Client")
|
|
print("=" * 70)
|
|
print()
|
|
|
|
# Create client
|
|
client = CustomerRegistrationClient()
|
|
|
|
# Check WSDL
|
|
print("1. Checking WSDL availability...")
|
|
wsdl_ok, wsdl_msg = client.check_wsdl()
|
|
print(f" {wsdl_msg}")
|
|
if not wsdl_ok:
|
|
print(" Service is not available. Make sure the service is running.")
|
|
return
|
|
print()
|
|
|
|
# Test customer registrations
|
|
test_cases = [
|
|
{"name": "John Doe", "blacklisted": False, "description": "Regular customer"},
|
|
{"name": "Jane Smith", "blacklisted": False, "description": "Regular customer"},
|
|
{"name": "Bad Actor", "blacklisted": True, "description": "Blacklisted customer"},
|
|
{"name": "John Doe", "blacklisted": False, "description": "Duplicate registration (should fail)"},
|
|
{"name": "Alice Johnson", "blacklisted": False, "description": "Regular customer"},
|
|
{"name": "Bob Wilson", "blacklisted": True, "description": "Blacklisted customer"},
|
|
]
|
|
|
|
print("2. Testing Customer Registration...")
|
|
print()
|
|
|
|
for i, test_case in enumerate(test_cases, 1):
|
|
name = test_case["name"]
|
|
blacklisted = test_case["blacklisted"]
|
|
description = test_case["description"]
|
|
|
|
print(f" Test {i}: {description}")
|
|
print(f" Customer Name: {name}")
|
|
print(f" Blacklisted: {blacklisted}")
|
|
|
|
result = client.register_customer(name, blacklisted)
|
|
print(f" Result: {result}")
|
|
print()
|
|
|
|
# Interactive mode
|
|
print("3. Interactive Mode")
|
|
print(" " + "-" * 66)
|
|
print(" Commands:")
|
|
print(" - register <name>: Register a regular customer")
|
|
print(" - blacklist <name>: Register a blacklisted customer")
|
|
print(" - quit: Exit")
|
|
print()
|
|
|
|
try:
|
|
while True:
|
|
user_input = input(" > ").strip()
|
|
|
|
if user_input.lower() in ['quit', 'exit', 'q']:
|
|
break
|
|
|
|
if not user_input:
|
|
print(" Please enter a command.")
|
|
continue
|
|
|
|
parts = user_input.split(maxsplit=1)
|
|
command = parts[0].lower()
|
|
|
|
if command == "register" and len(parts) == 2:
|
|
customer_name = parts[1].strip()
|
|
result = client.register_customer(customer_name, False)
|
|
print(f" Result: {result}")
|
|
print()
|
|
elif command == "blacklist" and len(parts) == 2:
|
|
customer_name = parts[1].strip()
|
|
result = client.register_customer(customer_name, True)
|
|
print(f" Result: {result}")
|
|
print()
|
|
else:
|
|
print(" Invalid command. Use: register <name> or blacklist <name>")
|
|
print()
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n Interrupted by user")
|
|
|
|
print()
|
|
print("=" * 70)
|
|
print("Test completed!")
|
|
print("=" * 70)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|