Catalog API Invoke API
Summary
The invoke catalog API is used to upload items to the Amazon Just WalkOut API.
Note: Make sure to follow your organization's process to prepare the code for production release.
Python Code
import json
import boto3
import urllib3
import os
from datetime import datetime
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from urllib.parse import urlparse
# Initialize urllib3 PoolManager
http = urllib3.PoolManager()
def get_table():
"""Initialize DynamoDB table resource"""
dynamodb = boto3.resource('dynamodb')
return dynamodb.Table('CatalogAPICalls')
def sign_request(url, method, body):
"""Sign AWS request with SigV4"""
session = boto3.Session()
credentials = session.get_credentials()
parsed_url = urlparse(url)
region = parsed_url.netloc.split('.')[0]
request = AWSRequest(
method=method,
url=url,
data=json.dumps(body)
)
request.headers.add_header('Content-Type', 'application/json')
SigV4Auth(credentials, 'execute-api', region).add_auth(request)
return dict(request.headers)
def validate_payload(payload):
"""Validate the catalog items payload structure"""
if not isinstance(payload, dict):
return False
if 'catalogItems' not in payload:
return False
if not isinstance(payload['catalogItems'], list):
return False
if not payload['catalogItems']:
return False
required_fields = [
'item_sku', 'external_product_id', 'external_product_id_type',
'item_name', 'store_id', 'standard_price', 'brand_name',
'product_category', 'product_subcategory'
]
return all(all(field in item for field in required_fields)
for item in payload['catalogItems'])
def lambda_handler(event, context):
"""Main Lambda handler function"""
print("Received event: " + json.dumps(event, indent=2))
# Validate environment variables
api_url = "<REPLACE_WITH_API_URL>"
if not api_url:
return {
'statusCode': 500,
'body': json.dumps('variable is not set')
}
# Define the payload
payload = {
"catalogItems": [
{
"item_sku": "96385074",
"external_product_id": "96385074",
"external_product_id_type": "EAN8",
"item_name": "Life Water",
"store_id": "<REPLACE_WITH_STORE>",
"standard_price": "1.00",
"brand_name": "Life Water",
"product_category": "Drink",
"product_subcategory": "Water"
}
]
}
# Validate payload structure
if not validate_payload(payload):
return {
'statusCode': 400,
'body': json.dumps('Invalid payload structure')
}
try:
# Get DynamoDB table
table = get_table()
# Sign the request
headers = sign_request(api_url, 'POST', payload)
# Make API request
response = http.request(
'POST',
api_url,
body=json.dumps(payload),
headers=headers
)
if response.status != 201:
raise urllib3.exceptions.HTTPError(f"Request failed with status {response.status}")
# Process response
api_response = json.loads(response.data.decode('utf-8'))
print("API Response:", api_response)
# Store in DynamoDB
ingestion_id = datetime.utcnow().isoformat()
table.put_item(Item={
'ingestion_id': ingestion_id,
'api_response': api_response,
'timestamp': datetime.utcnow().isoformat()
})
return {
'statusCode': 201,
'body': json.dumps('Successfully processed catalog items')
}
except urllib3.exceptions.HTTPError as e:
print(f"Error: {str(e)}")
error_content = response.data.decode('utf-8') if 'response' in locals() else 'No response content'
print(f"Response content: {error_content}")
return {
'statusCode': 500,
'body': json.dumps(f'Error processing catalog items: {str(e)}')
}
except Exception as e:
print(f"Unexpected error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps(f'Unexpected error processing catalog items: {str(e)}')
}
Unit tests
import unittest
from unittest.mock import Mock, patch, MagicMock
import json
from datetime import datetime
import sys
import os
# Add parent directory to path to import the module
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from catalog_temp import validate_payload, sign_request, lambda_handler
class TestValidatePayload(unittest.TestCase):
def test_valid_payload(self):
payload = {
"catalogItems": [{
"item_sku": "123",
"external_product_id": "456",
"external_product_id_type": "UPC",
"item_name": "Test Item",
"store_id": "STORE-001",
"standard_price": "10.00",
"brand_name": "Test Brand",
"product_category": "Category",
"product_subcategory": "Subcategory"
}]
}
self.assertTrue(validate_payload(payload))
def test_invalid_payload_not_dict(self):
self.assertFalse(validate_payload("not a dict"))
def test_invalid_payload_missing_catalog_items(self):
payload = {"other_field": "value"}
self.assertFalse(validate_payload(payload))
def test_invalid_payload_catalog_items_not_list(self):
payload = {"catalogItems": "not a list"}
self.assertFalse(validate_payload(payload))
def test_invalid_payload_empty_catalog_items(self):
payload = {"catalogItems": []}
self.assertFalse(validate_payload(payload))
def test_invalid_payload_missing_required_field(self):
payload = {
"catalogItems": [{
"item_sku": "123",
"external_product_id": "456"
# Missing other required fields
}]
}
self.assertFalse(validate_payload(payload))
class TestSignRequest(unittest.TestCase):
@patch('catalog_temp.boto3.Session')
@patch('catalog_temp.SigV4Auth')
def test_sign_request(self, mock_sigv4, mock_session):
mock_credentials = Mock()
mock_session.return_value.get_credentials.return_value = mock_credentials
url = "https://us-east-1.example.com/api"
method = "POST"
body = {"test": "data"}
headers = sign_request(url, method, body)
self.assertIsInstance(headers, dict)
mock_session.return_value.get_credentials.assert_called_once()
class TestLambdaHandler(unittest.TestCase):
def setUp(self):
self.context = Mock()
self.context.aws_request_id = "test-request-id"
@patch('catalog_temp.get_table')
@patch('catalog_temp.sign_request')
@patch('catalog_temp.http')
def test_lambda_handler_success(self, mock_http, mock_sign, mock_get_table):
# Mock API response
mock_response = Mock()
mock_response.status = 201
mock_response.data = json.dumps({"ingestionId": "test-123"}).encode('utf-8')
mock_http.request.return_value = mock_response
# Mock DynamoDB table
mock_table = Mock()
mock_get_table.return_value = mock_table
# Mock sign_request
mock_sign.return_value = {"Authorization": "test-auth"}
event = {}
with patch.dict(os.environ, {'API_URL': 'https://test.example.com/api'}):
result = lambda_handler(event, self.context)
self.assertEqual(result['statusCode'], 201)
self.assertIn('Successfully processed', result['body'])
mock_table.put_item.assert_called_once()
def test_lambda_handler_missing_api_url(self):
event = {}
with patch.dict(os.environ, {}, clear=True):
result = lambda_handler(event, self.context)
self.assertEqual(result['statusCode'], 500)
@patch('catalog_temp.validate_payload')
def test_lambda_handler_invalid_payload(self, mock_validate):
mock_validate.return_value = False
event = {}
with patch.dict(os.environ, {'API_URL': 'https://test.example.com/api'}):
result = lambda_handler(event, self.context)
self.assertEqual(result['statusCode'], 400)
self.assertIn('Invalid payload', result['body'])
@patch('catalog_temp.get_table')
@patch('catalog_temp.sign_request')
@patch('catalog_temp.http')
def test_lambda_handler_api_error(self, mock_http, mock_sign, mock_get_table):
# Mock API error response
mock_response = Mock()
mock_response.status = 500
mock_http.request.return_value = mock_response
mock_sign.return_value = {"Authorization": "test-auth"}
event = {}
with patch.dict(os.environ, {'API_URL': 'https://test.example.com/api'}):
result = lambda_handler(event, self.context)
self.assertEqual(result['statusCode'], 500)
self.assertIn('Error processing', result['body'])
@patch('catalog_temp.get_table')
@patch('catalog_temp.sign_request')
@patch('catalog_temp.http')
def test_lambda_handler_exception(self, mock_http, mock_sign, mock_get_table):
# Mock exception
mock_http.request.side_effect = Exception("Network error")
mock_sign.return_value = {"Authorization": "test-auth"}
event = {}
with patch.dict(os.environ, {'API_URL': 'https://test.example.com/api'}):
result = lambda_handler(event, self.context)
self.assertEqual(result['statusCode'], 500)
self.assertIn('Unexpected error', result['body'])
if __name__ == '__main__':
unittest.main()

