Implementing Authentication with Zitadel for an E-Commerce Chatbot
Before starting to read this blog post, make sure to check out the first post of the series about function tool calls.
In this second post of our series on building an e-commerce chatbot, we'll dive into implementing authentication using Zitadel and OAuth 2.0. Authentication is a critical component in ensuring that user data is accessed securely and only by authorized parties. In this use case, we leverage Zitadel's OAuth 2.0 implementation with PKCE (Proof Key for Code Exchange) to authenticate users.
Setup
For this blog post, we'll use the following setup:
version: '3.8'
services:
psql:
image: postgres:15
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
zitadel:
image: ghcr.io/zitadel/zitadel:v2.67.1
restart: always
command: start-from-init --masterkeyFromEnv --tlsMode disabled
environment:
- ZITADEL_DATABASE_POSTGRES_HOST=psql
- ZITADEL_DATABASE_POSTGRES_PORT=5432
- ZITADEL_DATABASE_POSTGRES_DATABASE=${ZITADEL_DB_NAME:-zitadel}
- ZITADEL_DATABASE_POSTGRES_ADMIN_USERNAME=${ZITADEL_DB_ADMIN_USER:-postgres}
- ZITADEL_DATABASE_POSTGRES_ADMIN_PASSWORD=${ZITADEL_DB_ADMIN_PASSWORD:-postgres}
- ZITADEL_DATABASE_POSTGRES_ADMIN_SSL_MODE=disable
- ZITADEL_DATABASE_POSTGRES_USER_USERNAME=${ZITADEL_DB_USER:-zitadel_user}
- ZITADEL_DATABASE_POSTGRES_USER_PASSWORD=${ZITADEL_DB_PASSWORD:-zitadel_password}
- ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE=disable
- ZITADEL_EXTERNALSECURE=false
- ZITADEL_MASTERKEY=${ZITADEL_MASTERKEY:-MasterkeyNeedsToHave32Characters}
- ZITADEL_FIRSTINSTANCE_ORG_NAME=ZITADEL
ports:
- "8080:8080"
depends_on:
- psql
volumes:
postgres_data:
Before we start, make sure you have a Zitadel instance running to perform the following steps at http://localhost:8080.
Create a new organization
Create a new project
Create a new application of type WEB (PKCE)
Enter the following redirect URI http://localhost:8081
Note down the client id and store it in a .env file as ZITADEL_CLIENT_ID
Authentication Overview
The authentication process in our chatbot example involves several steps:
Generating PKCE Pair: The process starts by generating a code verifier and a code challenge to secure the communication between the client and the authentication server.
Initiating the Authorization Flow: The user is redirected to Zitadel's authentication page, where they can log in.
Receiving the Authorization Code: Once the user logs in, Zitadel redirects them back to the chatbot with an authorization code.
Exchanging the Authorization Code for Tokens: The chatbot exchanges the code for an ID token and an access token.
Validating Tokens: The ID token and access token are validated to ensure their authenticity and integrity.
Using the Tokens: The tokens are used to identify the user and enforce row-level security in the PostgreSQL database.
Implementation
Below is a breakdown of how authentication is implemented in the provided source code:
Step 1: Generating the PKCE Pair
def generate_pkce_pair():
code_verifier = secrets.token_urlsafe(32)
code_challenge = (
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
.decode()
.rstrip("=")
)
return code_verifier, code_challenge
This function generates a random code_verifier and a hashed code_challenge. These values are essential for ensuring that the authorization code cannot be intercepted or reused.
Step 2: Initiating the Authorization Flow
For the sake of this blog post, we'll implement the OAuth 2.0 authentication flow in a Python command line application. This is going to work as follows:
The user calls the cli
The user is redirected to Zitadel's login page in their browser
The user logs in
The user is redirected back to the cli which has started a http server in the background
The cli exchanges the authorization code for tokens
The cli uses the tokens to authenticate the user
def authenticate(client_id: str, zitadel_domain: str, redirect_uri: str):
# Generate PKCE values
code_verifier, code_challenge = generate_pkce_pair()
# Generate authorization URL
auth_params = {
"client_id": client_id,
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": "openid profile email",
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
auth_url = f"http://{zitadel_domain}/oauth/v2/authorize?{urlencode(auth_params)}"
# Get port from the redirect uri
port = int(redirect_uri.split(":")[-1])
# Start local server to receive the callback
server = HTTPServer(("", port), AuthenticationHandler)
server.auth_code = None
server.should_stop = False
# Open browser for authentication
webbrowser.open(auth_url)
# Wait for the callback
while not server.should_stop:
server.handle_request()
Step 3: Receiving the Authorization Code
A lightweight HTTP server listens for the redirect from Zitadel and captures the authorization code:
class AuthenticationHandler(BaseHTTPRequestHandler):
def do_GET(self):
query_components = parse_qs(urlparse(self.path).query)
if "code" in query_components:
self.server.auth_code = query_components["code"][0]
self.send_response(200)
self.end_headers()
self.wfile.write(b"Authentication successful! You can close this window.")
else:
self.send_response(400)
self.end_headers()
self.wfile.write(b"Authentication failed! Please try again.")
self.server.should_stop = True
Step 4: Exchanging the Authorization Code for Tokens
Once the code is received, it is exchanged for tokens via a POST request. The rest of the authenticate function is as follows:
if not server.auth_code:
raise Exception("No authorization code received")
# Exchange authorization code for tokens
token_url = f"http://{zitadel_domain}/oauth/v2/token"
token_data = {
"grant_type": "authorization_code",
"client_id": client_id,
"code_verifier": code_verifier,
"code": server.auth_code,
"redirect_uri": redirect_uri,
}
response = requests.post(token_url, data=token_data)
if response.status_code != 200:
raise Exception(f"Token exchange failed: {response.text}")
tokens = response.json()
return tokens["id_token"], tokens["access_token"]
The response contains the id token and access token upon successful authentication.
Step 5: Validating Tokens
Token validation ensures that the tokens have not been tampered with:
# Perform authentication
id_token, access_token = authenticate(client_id, zitadel_domain, redirect_uri)
oidc_config = requests.get(
f"http://{zitadel_domain}/.well-known/openid-configuration"
).json()
jwks_client = jwt.PyJWKClient(oidc_config["jwks_uri"])
signing_algos = oidc_config["id_token_signing_alg_values_supported"]
# Validate at_hash
signing_key = jwks_client.get_signing_key_from_jwt(id_token)
data = jwt.decode_complete(
id_token,
key=signing_key,
audience=client_id,
algorithms=signing_algos,
)
payload, header = data["payload"], data["header"]
# Get the pyjwt algorithm object
alg_obj = jwt.get_algorithm_by_name(header["alg"])
# Compute at_hash, then validate
digest = alg_obj.compute_hash_digest(access_token.encode())
at_hash = (
base64.urlsafe_b64encode(digest[: (len(digest)
)
assert at_hash == payload["at_hash"]
Main Function
Below is an example of how the main function tying everything together might look like:
def main():
dotenv.load_dotenv()
client_id = os.environ["ZITADEL_CLIENT_ID"]
zitadel_domain = os.environ["ZITADEL_DOMAIN"]
port = 8081
redirect_uri = f"http://localhost:{port}"
# Perform authentication
try:
id_token, access_token = authenticate(client_id, zitadel_domain, redirect_uri)
oidc_config = requests.get(
f"http://{zitadel_domain}/.well-known/openid-configuration"
).json()
jwks_client = jwt.PyJWKClient(oidc_config["jwks_uri"])
signing_algos = oidc_config["id_token_signing_alg_values_supported"]
# Validate at_hash
signing_key = jwks_client.get_signing_key_from_jwt(id_token)
data = jwt.decode_complete(
id_token,
key=signing_key,
audience=client_id,
algorithms=signing_algos,
)
payload, header = data["payload"], data["header"]
# Get the pyjwt algorithm object
alg_obj = jwt.get_algorithm_by_name(header["alg"])
# Compute at_hash, then validate
digest = alg_obj.compute_hash_digest(access_token.encode())
at_hash = (
base64.urlsafe_b64encode(digest[: (len(digest)
)
assert at_hash == payload["at_hash"]
# Get the user_id (=subject) from the payload
user_id = payload["sub"]
except Exception as e:
logger.error("Authentication failed", error=str(e))
return
Wrapping Up
That't it for the authentication part. In the next post, we'll see how to combine function calling with Azure OpenAI and robust user authentication by using PostgreSQL's row-level security. That way, we can ensure that the user can only access their own data when interacting with the chatbot by ensuring that the user authentication context is passed to the database.
Stay tuned!
Disclaimer
This blog post is a simplified example and does not cover all aspects that have to be considered when deploying any LLM powered application to production. In a real-world application, you would need to consider additional security measures.