Skip to main content
All webhooks are signed with RSA-SHA256. Always verify signatures before processing events to ensure they genuinely came from Ark and haven’t been tampered with.

Signature Headers

Each webhook request includes:
HeaderDescription
X-Ark-SignatureBase64-encoded RSA-SHA256 signature of the request body
X-Ark-Signature-KIDKey ID identifying which public key was used

Public Key

Fetch the public key from Ark’s JWKS endpoint:
GET https://mail.arkhq.io/.well-known/jwks.json
Cache the public key to avoid fetching on every webhook. Only refresh if verification fails — the key rarely changes.

Verification Implementation

import json
import time
import base64
import requests
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers
from cryptography.hazmat.backends import default_backend

JWKS_URL = "https://mail.arkhq.io/.well-known/jwks.json"
CACHE_TTL = 3600  # 1 hour
MAX_AGE_SECONDS = 300  # 5 minutes

_jwks_cache = {"data": None, "expires_at": 0}


def fetch_jwks():
    now = time.time()
    if _jwks_cache["data"] and now < _jwks_cache["expires_at"]:
        return _jwks_cache["data"]

    response = requests.get(JWKS_URL, timeout=5)
    jwks = response.json()

    _jwks_cache["data"] = jwks
    _jwks_cache["expires_at"] = now + CACHE_TTL
    return jwks


def get_public_key(key_id):
    jwks = fetch_jwks()
    key_data = next((k for k in jwks["keys"] if k["kid"] == key_id), None)
    if not key_data:
        return None

    # Decode modulus and exponent
    n_bytes = base64.urlsafe_b64decode(key_data["n"] + "==")
    e_bytes = base64.urlsafe_b64decode(key_data["e"] + "==")

    n = int.from_bytes(n_bytes, "big")
    e = int.from_bytes(e_bytes, "big")

    return RSAPublicNumbers(e, n).public_key(default_backend())


def verify_webhook(request):
    signature = request.headers.get("X-Ark-Signature")
    key_id = request.headers.get("X-Ark-Signature-KID")

    if not signature or not key_id:
        raise ValueError("Missing signature headers")

    body = request.get_data()
    public_key = get_public_key(key_id)

    if not public_key:
        raise ValueError(f"Unknown key ID: {key_id}")

    decoded_signature = base64.b64decode(signature)

    public_key.verify(
        decoded_signature,
        body,
        padding.PKCS1v15(),
        hashes.SHA256()
    )

    return json.loads(body)


# Flask example
@app.post("/webhooks/ark")
def handle_webhook():
    try:
        payload = verify_webhook(request)
    except Exception as e:
        return str(e), 401

    # Check timestamp to prevent replay attacks
    timestamp = payload.get("timestamp", 0)
    if abs(time.time() - timestamp) > MAX_AGE_SECONDS:
        return "Timestamp too old", 400

    # Process the webhook
    process_event(payload)
    return "OK", 200

Best Practices

Never process webhooks without verifying the RSA signature. This ensures the webhook came from Ark and hasn’t been tampered with.
Reject webhooks with timestamps older than 5 minutes to prevent replay attacks. Combined with signature verification, this ensures webhooks are both authentic and recent.
Webhooks may be delivered more than once due to retries. Use the message token combined with status to track processed events and skip duplicates.
Return a 2xx within 5 seconds. Queue events for async processing using background jobs (Sidekiq, Celery, Bull, etc.) if processing takes longer.
Webhook URLs must use HTTPS. Ark will not deliver to HTTP endpoints.
Set up alerts for webhook delivery failures in your monitoring system to catch configuration issues early.